MongoDB C++ Driver  legacy-1.1.2
message.h
1 // message.h
2 
3 /* Copyright 2009 10gen Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include <vector>
21 
22 #include "mongo/platform/atomic_word.h"
23 #include "mongo/platform/cstdint.h"
24 #include "mongo/base/data_view.h"
25 #include "mongo/base/encoded_value_storage.h"
26 #include "mongo/util/mongoutils/str.h"
27 #include "mongo/util/net/hostandport.h"
28 #include "mongo/util/net/operation.h"
29 #include "mongo/util/net/sock.h"
30 
31 namespace mongo {
32 
36 const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
37 
38 class Message;
39 class MessagingPort;
40 class PiggyBackData;
41 
42 typedef uint32_t MSGID;
43 
44 bool doesOpGetAResponse(int op);
45 
46 inline const char* opToString(int op) {
47  switch (op) {
48  case 0:
49  return "none";
50  case opReply:
51  return "reply";
52  case dbMsg:
53  return "msg";
54  case dbUpdate:
55  return "update";
56  case dbInsert:
57  return "insert";
58  case dbQuery:
59  return "query";
60  case dbGetMore:
61  return "getmore";
62  case dbDelete:
63  return "remove";
64  case dbKillCursors:
65  return "killcursors";
66  default:
67  massert(16141, str::stream() << "cannot translate opcode " << op, !op);
68  return "";
69  }
70 }
71 
72 inline bool opIsWrite(int op) {
73  switch (op) {
74  case 0:
75  case opReply:
76  case dbMsg:
77  case dbQuery:
78  case dbGetMore:
79  case dbKillCursors:
80  return false;
81 
82  case dbUpdate:
83  case dbInsert:
84  case dbDelete:
85  return true;
86 
87  default:
88  verify(0);
89  return "";
90  }
91 }
92 
93 namespace MSGHEADER {
94 #pragma pack(1)
95 /* see http://dochub.mongodb.org/core/mongowireprotocol
96 */
97 struct Layout {
98  int32_t messageLength; // total message size, including this
99  int32_t requestID; // identifier for this message
100  int32_t responseTo; // requestID from the original request
101  // (used in responses from db)
102  int32_t opCode;
103 };
104 #pragma pack()
105 
106 class ConstView {
107 public:
108  typedef ConstDataView view_type;
109 
110  ConstView(const char* data) : _data(data) {}
111 
112  const char* view2ptr() const {
113  return data().view();
114  }
115 
116  int32_t getMessageLength() const {
117  return data().readLE<int32_t>(offsetof(Layout, messageLength));
118  }
119 
120  int32_t getRequestID() const {
121  return data().readLE<int32_t>(offsetof(Layout, requestID));
122  }
123 
124  int32_t getResponseTo() const {
125  return data().readLE<int32_t>(offsetof(Layout, responseTo));
126  }
127 
128  int32_t getOpCode() const {
129  return data().readLE<int32_t>(offsetof(Layout, opCode));
130  }
131 
132 protected:
133  const view_type& data() const {
134  return _data;
135  }
136 
137 private:
138  view_type _data;
139 };
140 
141 class View : public ConstView {
142 public:
143  typedef DataView view_type;
144 
145  View(char* data) : ConstView(data) {}
146 
147  using ConstView::view2ptr;
148  char* view2ptr() {
149  return data().view();
150  }
151 
152  void setMessageLength(int32_t value) {
153  data().writeLE(value, offsetof(Layout, messageLength));
154  }
155 
156  void setRequestID(int32_t value) {
157  data().writeLE(value, offsetof(Layout, requestID));
158  }
159 
160  void setResponseTo(int32_t value) {
161  data().writeLE(value, offsetof(Layout, responseTo));
162  }
163 
164  void setOpCode(int32_t value) {
165  data().writeLE(value, offsetof(Layout, opCode));
166  }
167 
168 private:
169  view_type data() const {
170  return const_cast<char*>(ConstView::view2ptr());
171  }
172 };
173 
174 class Value : public EncodedValueStorage<Layout, ConstView, View> {
175 public:
176  Value() {
177  BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
178  }
179 
181 };
182 
183 } // namespace MSGHEADER
184 
185 namespace MsgData {
186 #pragma pack(1)
187 struct Layout {
188  MSGHEADER::Layout header;
189  char data[4];
190 };
191 #pragma pack()
192 
193 class ConstView {
194 public:
195  ConstView(const char* storage) : _storage(storage) {}
196 
197  const char* view2ptr() const {
198  return storage().view();
199  }
200 
201  int32_t getLen() const {
202  return header().getMessageLength();
203  }
204 
205  MSGID getId() const {
206  return header().getRequestID();
207  }
208 
209  MSGID getResponseTo() const {
210  return header().getResponseTo();
211  }
212 
213  int32_t getOperation() const {
214  return header().getOpCode();
215  }
216 
217  const char* data() const {
218  return storage().view(offsetof(Layout, data));
219  }
220 
221  bool valid() const {
222  if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))
223  return false;
224  if (getOperation() < 0 || getOperation() > 30000)
225  return false;
226  return true;
227  }
228 
229  int64_t getCursor() const {
230  verify(getResponseTo() > 0);
231  verify(getOperation() == opReply);
232  return ConstDataView(data() + sizeof(int32_t)).readLE<int64_t>();
233  }
234 
235  int dataLen() const; // len without header
236 
237 protected:
238  const ConstDataView& storage() const {
239  return _storage;
240  }
241 
242  MSGHEADER::ConstView header() const {
243  return storage().view(offsetof(Layout, header));
244  }
245 
246 private:
247  ConstDataView _storage;
248 };
249 
250 class View : public ConstView {
251 public:
252  View(char* storage) : ConstView(storage) {}
253 
254  using ConstView::view2ptr;
255  char* view2ptr() {
256  return storage().view();
257  }
258 
259  void setLen(int value) {
260  return header().setMessageLength(value);
261  }
262 
263  void setId(MSGID value) {
264  return header().setRequestID(value);
265  }
266 
267  void setResponseTo(MSGID value) {
268  return header().setResponseTo(value);
269  }
270 
271  void setOperation(int value) {
272  return header().setOpCode(value);
273  }
274 
275  using ConstView::data;
276  char* data() {
277  return storage().view(offsetof(Layout, data));
278  }
279 
280 private:
281  DataView storage() const {
282  return const_cast<char*>(ConstView::view2ptr());
283  }
284 
285  MSGHEADER::View header() const {
286  return storage().view(offsetof(Layout, header));
287  }
288 };
289 
290 class Value : public EncodedValueStorage<Layout, ConstView, View> {
291 public:
292  Value() {
293  BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
294  }
295 
297 };
298 
299 const int MsgDataHeaderSize = sizeof(Value) - 4;
300 inline int ConstView::dataLen() const {
301  return getLen() - MsgDataHeaderSize;
302 }
303 } // namespace MsgData
304 
305 class Message {
306 public:
307  // we assume here that a vector with initial size 0 does no allocation (0 is the default, but
308  // wanted to make it explicit).
309  Message() : _buf(0), _data(0), _freeIt(false) {}
310  Message(void* data, bool freeIt) : _buf(0), _data(0), _freeIt(false) {
311  _setData(reinterpret_cast<char*>(data), freeIt);
312  };
313  Message(Message& r) : _buf(0), _data(0), _freeIt(false) {
314  *this = r;
315  }
316  ~Message() {
317  reset();
318  }
319 
320  SockAddr _from;
321 
322  MsgData::View header() const {
323  verify(!empty());
324  return _buf ? _buf : _data[0].first;
325  }
326 
327  int operation() const {
328  return header().getOperation();
329  }
330 
331  MsgData::View singleData() const {
332  massert(13273, "single data buffer expected", _buf);
333  return header();
334  }
335 
336  bool empty() const {
337  return !_buf && _data.empty();
338  }
339 
340  int size() const {
341  int res = 0;
342  if (_buf) {
343  res = MsgData::ConstView(_buf).getLen();
344  } else {
345  for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
346  res += it->second;
347  }
348  }
349  return res;
350  }
351 
352  int dataSize() const {
353  return size() - sizeof(MSGHEADER::Value);
354  }
355 
356  // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
357  // can get rid of this if we make response handling smarter
358  void concat() {
359  if (_buf || empty()) {
360  return;
361  }
362 
363  verify(_freeIt);
364  int totalSize = 0;
365  for (std::vector<std::pair<char*, int> >::const_iterator i = _data.begin();
366  i != _data.end();
367  ++i) {
368  totalSize += i->second;
369  }
370  char* buf = (char*)malloc(totalSize);
371  char* p = buf;
372  for (std::vector<std::pair<char*, int> >::const_iterator i = _data.begin();
373  i != _data.end();
374  ++i) {
375  memcpy(p, i->first, i->second);
376  p += i->second;
377  }
378  reset();
379  _setData(buf, true);
380  }
381 
382  // vector swap() so this is fast
383  Message& operator=(Message& r) {
384  verify(empty());
385  verify(r._freeIt);
386  _buf = r._buf;
387  r._buf = 0;
388  if (r._data.size() > 0) {
389  _data.swap(r._data);
390  }
391  r._freeIt = false;
392  _freeIt = true;
393  return *this;
394  }
395 
396  void reset() {
397  if (_freeIt) {
398  if (_buf) {
399  free(_buf);
400  }
401  for (std::vector<std::pair<char*, int> >::const_iterator i = _data.begin();
402  i != _data.end();
403  ++i) {
404  free(i->first);
405  }
406  }
407  _buf = 0;
408  _data.clear();
409  _freeIt = false;
410  }
411 
412  // use to add a buffer
413  // assumes message will free everything
414  void appendData(char* d, int size) {
415  if (size <= 0) {
416  return;
417  }
418  if (empty()) {
419  MsgData::View md = d;
420  md.setLen(size); // can be updated later if more buffers added
421  _setData(md.view2ptr(), true);
422  return;
423  }
424  verify(_freeIt);
425  if (_buf) {
426  _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen()));
427  _buf = 0;
428  }
429  _data.push_back(std::make_pair(d, size));
430  header().setLen(header().getLen() + size);
431  }
432 
433  // use to set first buffer if empty
434  void setData(char* d, bool freeIt) {
435  verify(empty());
436  _setData(d, freeIt);
437  }
438  void setData(int operation, const char* msgtxt) {
439  setData(operation, msgtxt, strlen(msgtxt) + 1);
440  }
441  void setData(int operation, const char* msgdata, size_t len) {
442  verify(empty());
443  size_t dataLen = len + sizeof(MsgData::Value) - 4;
444  MsgData::View d = reinterpret_cast<char*>(malloc(dataLen));
445  memcpy(d.data(), msgdata, len);
446  d.setLen(dataLen);
447  d.setOperation(operation);
448  _setData(d.view2ptr(), true);
449  }
450 
451  bool doIFreeIt() {
452  return _freeIt;
453  }
454 
455  void send(MessagingPort& p, const char* context);
456 
457  std::string toString() const;
458 
459 private:
460  void _setData(char* d, bool freeIt) {
461  _freeIt = freeIt;
462  _buf = d;
463  }
464  // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
465  char* _buf;
466  // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage
467  // instead
468  typedef std::vector<std::pair<char*, int> > MsgVec;
469  MsgVec _data;
470  bool _freeIt;
471 };
472 
473 
474 MSGID nextMessageId();
475 
476 
477 } // namespace mongo
Definition: message.h:305
Utility functions for parsing numbers from strings.
Definition: compare_numbers.h:20
the idea here is to make one liners easy.
Definition: str.h:44
Definition: message.h:193
Definition: data_view.h:30
Definition: message.h:97
Definition: message.h:290
Definition: message.h:174
wrapped around os representation of network address
Definition: sock.h:96
Definition: encoded_value_storage.h:24
Definition: message.h:250
const size_t MaxMessageSizeBytes
Maximum accepted message size on the wire protocol.
Definition: message.h:36
Definition: message.h:187
Definition: message.h:106
Definition: message.h:141
Definition: encoded_value_storage.h:31
Definition: data_view.h:71
Definition: message_port.h:74