MongoDB C++ Driver  legacy-1.0.5
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
message.h
1 // message.h
2 
3 /* Copyright 2009 10gen Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include <vector>
21 
22 #include "mongo/platform/atomic_word.h"
23 #include "mongo/platform/cstdint.h"
24 #include "mongo/base/data_view.h"
25 #include "mongo/base/encoded_value_storage.h"
26 #include "mongo/util/mongoutils/str.h"
27 #include "mongo/util/net/hostandport.h"
28 #include "mongo/util/net/operation.h"
29 #include "mongo/util/net/sock.h"
30 
31 namespace mongo {
32 
36  const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
37 
38  class Message;
39  class MessagingPort;
40  class PiggyBackData;
41 
42  typedef uint32_t MSGID;
43 
44  bool doesOpGetAResponse( int op );
45 
46  inline const char * opToString( int op ) {
47  switch ( op ) {
48  case 0: return "none";
49  case opReply: return "reply";
50  case dbMsg: return "msg";
51  case dbUpdate: return "update";
52  case dbInsert: return "insert";
53  case dbQuery: return "query";
54  case dbGetMore: return "getmore";
55  case dbDelete: return "remove";
56  case dbKillCursors: return "killcursors";
57  default:
58  massert( 16141, str::stream() << "cannot translate opcode " << op, !op );
59  return "";
60  }
61  }
62 
63  inline bool opIsWrite( int op ) {
64  switch ( op ) {
65 
66  case 0:
67  case opReply:
68  case dbMsg:
69  case dbQuery:
70  case dbGetMore:
71  case dbKillCursors:
72  return false;
73 
74  case dbUpdate:
75  case dbInsert:
76  case dbDelete:
77  return true;
78 
79  default:
80  verify(0);
81  return "";
82  }
83 
84  }
85 
86  namespace MSGHEADER {
87 #pragma pack(1)
88  /* see http://dochub.mongodb.org/core/mongowireprotocol
89  */
90  struct Layout {
91  int32_t messageLength; // total message size, including this
92  int32_t requestID; // identifier for this message
93  int32_t responseTo; // requestID from the original request
94  // (used in responses from db)
95  int32_t opCode;
96  };
97 #pragma pack()
98 
99  class ConstView {
100  public:
101  typedef ConstDataView view_type;
102 
103  ConstView(const char* data) : _data(data) { }
104 
105  const char* view2ptr() const {
106  return data().view();
107  }
108 
109  int32_t getMessageLength() const {
110  return data().readLE<int32_t>(offsetof(Layout, messageLength));
111  }
112 
113  int32_t getRequestID() const {
114  return data().readLE<int32_t>(offsetof(Layout, requestID));
115  }
116 
117  int32_t getResponseTo() const {
118  return data().readLE<int32_t>(offsetof(Layout, responseTo));
119  }
120 
121  int32_t getOpCode() const {
122  return data().readLE<int32_t>(offsetof(Layout, opCode));
123  }
124 
125  protected:
126  const view_type& data() const {
127  return _data;
128  }
129 
130  private:
131  view_type _data;
132  };
133 
134  class View : public ConstView {
135  public:
136  typedef DataView view_type;
137 
138  View(char* data) : ConstView(data) {}
139 
140  using ConstView::view2ptr;
141  char* view2ptr() {
142  return data().view();
143  }
144 
145  void setMessageLength(int32_t value) {
146  data().writeLE(value, offsetof(Layout, messageLength));
147  }
148 
149  void setRequestID(int32_t value) {
150  data().writeLE(value, offsetof(Layout, requestID));
151  }
152 
153  void setResponseTo(int32_t value) {
154  data().writeLE(value, offsetof(Layout, responseTo));
155  }
156 
157  void setOpCode(int32_t value) {
158  data().writeLE(value, offsetof(Layout, opCode));
159  }
160 
161  private:
162  view_type data() const {
163  return const_cast<char *>(ConstView::view2ptr());
164  }
165  };
166 
167  class Value : public EncodedValueStorage<Layout, ConstView, View> {
168  public:
169  Value() {
170  BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
171  }
172 
173  Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
174  };
175 
176  } // namespace MSGHEADER
177 
178  namespace MsgData {
179 #pragma pack(1)
180  struct Layout {
181  MSGHEADER::Layout header;
182  char data[4];
183  };
184 #pragma pack()
185 
186  class ConstView {
187  public:
188  ConstView(const char* storage) : _storage(storage) { }
189 
190  const char* view2ptr() const {
191  return storage().view();
192  }
193 
194  int32_t getLen() const {
195  return header().getMessageLength();
196  }
197 
198  MSGID getId() const {
199  return header().getRequestID();
200  }
201 
202  MSGID getResponseTo() const {
203  return header().getResponseTo();
204  }
205 
206  int32_t getOperation() const {
207  return header().getOpCode();
208  }
209 
210  const char* data() const {
211  return storage().view(offsetof(Layout, data));
212  }
213 
214  bool valid() const {
215  if ( getLen() <= 0 || getLen() > ( 4 * BSONObjMaxInternalSize ) )
216  return false;
217  if ( getOperation() < 0 || getOperation() > 30000 )
218  return false;
219  return true;
220  }
221 
222  int64_t getCursor() const {
223  verify( getResponseTo() > 0 );
224  verify( getOperation() == opReply );
225  return ConstDataView(data() + sizeof(int32_t)).readLE<int64_t>();
226  }
227 
228  int dataLen() const; // len without header
229 
230  protected:
231  const ConstDataView& storage() const {
232  return _storage;
233  }
234 
235  MSGHEADER::ConstView header() const {
236  return storage().view(offsetof(Layout, header));
237  }
238 
239  private:
240  ConstDataView _storage;
241  };
242 
243  class View : public ConstView {
244  public:
245  View(char* storage) : ConstView(storage) {}
246 
247  using ConstView::view2ptr;
248  char* view2ptr() {
249  return storage().view();
250  }
251 
252  void setLen(int value) {
253  return header().setMessageLength(value);
254  }
255 
256  void setId(MSGID value) {
257  return header().setRequestID(value);
258  }
259 
260  void setResponseTo(MSGID value) {
261  return header().setResponseTo(value);
262  }
263 
264  void setOperation(int value) {
265  return header().setOpCode(value);
266  }
267 
268  using ConstView::data;
269  char* data() {
270  return storage().view(offsetof(Layout, data));
271  }
272 
273  private:
274  DataView storage() const {
275  return const_cast<char *>(ConstView::view2ptr());
276  }
277 
278  MSGHEADER::View header() const {
279  return storage().view(offsetof(Layout, header));
280  }
281  };
282 
283  class Value : public EncodedValueStorage<Layout, ConstView, View> {
284  public:
285  Value() {
286  BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
287  }
288 
289  Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
290  };
291 
292  const int MsgDataHeaderSize = sizeof(Value) - 4;
293  inline int ConstView::dataLen() const {
294  return getLen() - MsgDataHeaderSize;
295  }
296  } // namespace MsgData
297 
298  class Message {
299  public:
300  // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
301  Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
302  Message( void * data , bool freeIt ) :
303  _buf( 0 ), _data( 0 ), _freeIt( false ) {
304  _setData( reinterpret_cast< char* >( data ), freeIt );
305  };
306  Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
307  *this = r;
308  }
309  ~Message() {
310  reset();
311  }
312 
313  SockAddr _from;
314 
315  MsgData::View header() const {
316  verify( !empty() );
317  return _buf ? _buf : _data[ 0 ].first;
318  }
319 
320  int operation() const { return header().getOperation(); }
321 
322  MsgData::View singleData() const {
323  massert( 13273, "single data buffer expected", _buf );
324  return header();
325  }
326 
327  bool empty() const { return !_buf && _data.empty(); }
328 
329  int size() const {
330  int res = 0;
331  if ( _buf ) {
332  res = MsgData::ConstView(_buf).getLen();
333  }
334  else {
335  for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
336  res += it->second;
337  }
338  }
339  return res;
340  }
341 
342  int dataSize() const { return size() - sizeof(MSGHEADER::Value); }
343 
344  // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
345  // can get rid of this if we make response handling smarter
346  void concat() {
347  if ( _buf || empty() ) {
348  return;
349  }
350 
351  verify( _freeIt );
352  int totalSize = 0;
353  for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
354  i != _data.end(); ++i) {
355  totalSize += i->second;
356  }
357  char *buf = (char*)malloc( totalSize );
358  char *p = buf;
359  for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
360  i != _data.end(); ++i) {
361  memcpy( p, i->first, i->second );
362  p += i->second;
363  }
364  reset();
365  _setData( buf, true );
366  }
367 
368  // vector swap() so this is fast
369  Message& operator=(Message& r) {
370  verify( empty() );
371  verify( r._freeIt );
372  _buf = r._buf;
373  r._buf = 0;
374  if ( r._data.size() > 0 ) {
375  _data.swap( r._data );
376  }
377  r._freeIt = false;
378  _freeIt = true;
379  return *this;
380  }
381 
382  void reset() {
383  if ( _freeIt ) {
384  if ( _buf ) {
385  free( _buf );
386  }
387  for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
388  i != _data.end(); ++i) {
389  free(i->first);
390  }
391  }
392  _buf = 0;
393  _data.clear();
394  _freeIt = false;
395  }
396 
397  // use to add a buffer
398  // assumes message will free everything
399  void appendData(char *d, int size) {
400  if ( size <= 0 ) {
401  return;
402  }
403  if ( empty() ) {
404  MsgData::View md = d;
405  md.setLen(size); // can be updated later if more buffers added
406  _setData( md.view2ptr(), true );
407  return;
408  }
409  verify( _freeIt );
410  if ( _buf ) {
411  _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen()));
412  _buf = 0;
413  }
414  _data.push_back(std::make_pair(d, size));
415  header().setLen(header().getLen() + size);
416  }
417 
418  // use to set first buffer if empty
419  void setData(char* d, bool freeIt) {
420  verify( empty() );
421  _setData( d, freeIt );
422  }
423  void setData(int operation, const char *msgtxt) {
424  setData(operation, msgtxt, strlen(msgtxt)+1);
425  }
426  void setData(int operation, const char *msgdata, size_t len) {
427  verify( empty() );
428  size_t dataLen = len + sizeof(MsgData::Value) - 4;
429  MsgData::View d = reinterpret_cast<char *>(malloc(dataLen));
430  memcpy(d.data(), msgdata, len);
431  d.setLen(dataLen);
432  d.setOperation(operation);
433  _setData( d.view2ptr(), true );
434  }
435 
436  bool doIFreeIt() {
437  return _freeIt;
438  }
439 
440  void send( MessagingPort &p, const char *context );
441 
442  std::string toString() const;
443 
444  private:
445  void _setData( char* d, bool freeIt ) {
446  _freeIt = freeIt;
447  _buf = d;
448  }
449  // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
450  char* _buf;
451  // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
452  typedef std::vector< std::pair< char*, int > > MsgVec;
453  MsgVec _data;
454  bool _freeIt;
455  };
456 
457 
458  MSGID nextMessageId();
459 
460 
461 } // namespace mongo
Definition: message.h:298
the main MongoDB namespace
Definition: bulk_operation_builder.h:24
the idea here is to make one liners easy.
Definition: str.h:44
Definition: message.h:186
Definition: message.h:90
Definition: message.h:283
Definition: message.h:167
wrapped around os representation of network address
Definition: sock.h:94
Definition: message.h:243
const size_t MaxMessageSizeBytes
Maximum accepted message size on the wire protocol.
Definition: message.h:36
Definition: message.h:180
Definition: message.h:99
Definition: message.h:134
Definition: message_port.h:68