MongoDB C++ Driver legacy-1.0.0
Loading...
Searching...
No Matches
message.h
1// message.h
2
3/* Copyright 2009 10gen Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#pragma once
19
20#include <vector>
21
22#include "mongo/platform/atomic_word.h"
23#include "mongo/platform/cstdint.h"
24#include "mongo/base/data_view.h"
25#include "mongo/base/encoded_value_storage.h"
26#include "mongo/util/mongoutils/str.h"
27#include "mongo/util/net/hostandport.h"
28#include "mongo/util/net/operation.h"
29#include "mongo/util/net/sock.h"
30
31namespace mongo {
32
36 const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
37
38 class Message;
39 class MessagingPort;
40 class PiggyBackData;
41
42 typedef uint32_t MSGID;
43
44 bool doesOpGetAResponse( int op );
45
46 inline const char * opToString( int op ) {
47 switch ( op ) {
48 case 0: return "none";
49 case opReply: return "reply";
50 case dbMsg: return "msg";
51 case dbUpdate: return "update";
52 case dbInsert: return "insert";
53 case dbQuery: return "query";
54 case dbGetMore: return "getmore";
55 case dbDelete: return "remove";
56 case dbKillCursors: return "killcursors";
57 default:
58 massert( 16141, str::stream() << "cannot translate opcode " << op, !op );
59 return "";
60 }
61 }
62
63 inline bool opIsWrite( int op ) {
64 switch ( op ) {
65
66 case 0:
67 case opReply:
68 case dbMsg:
69 case dbQuery:
70 case dbGetMore:
71 case dbKillCursors:
72 return false;
73
74 case dbUpdate:
75 case dbInsert:
76 case dbDelete:
77 return true;
78
79 default:
80 verify(0);
81 return "";
82 }
83
84 }
85
86 namespace MSGHEADER {
87#pragma pack(1)
88 /* see http://dochub.mongodb.org/core/mongowireprotocol
89 */
90 struct Layout {
91 int32_t messageLength; // total message size, including this
92 int32_t requestID; // identifier for this message
93 int32_t responseTo; // requestID from the original request
94 // (used in responses from db)
95 int32_t opCode;
96 };
97#pragma pack()
98
99 class ConstView {
100 public:
101 typedef ConstDataView view_type;
102
103 ConstView(const char* data) : _data(data) { }
104
105 const char* view2ptr() const {
106 return data().view();
107 }
108
109 int32_t getMessageLength() const {
110 return data().readLE<int32_t>(offsetof(Layout, messageLength));
111 }
112
113 int32_t getRequestID() const {
114 return data().readLE<int32_t>(offsetof(Layout, requestID));
115 }
116
117 int32_t getResponseTo() const {
118 return data().readLE<int32_t>(offsetof(Layout, responseTo));
119 }
120
121 int32_t getOpCode() const {
122 return data().readLE<int32_t>(offsetof(Layout, opCode));
123 }
124
125 protected:
126 const view_type& data() const {
127 return _data;
128 }
129
130 private:
131 view_type _data;
132 };
133
134 class View : public ConstView {
135 public:
136 typedef DataView view_type;
137
138 View(char* data) : ConstView(data) {}
139
140 using ConstView::view2ptr;
141 char* view2ptr() {
142 return data().view();
143 }
144
145 void setMessageLength(int32_t value) {
146 data().writeLE(value, offsetof(Layout, messageLength));
147 }
148
149 void setRequestID(int32_t value) {
150 data().writeLE(value, offsetof(Layout, requestID));
151 }
152
153 void setResponseTo(int32_t value) {
154 data().writeLE(value, offsetof(Layout, responseTo));
155 }
156
157 void setOpCode(int32_t value) {
158 data().writeLE(value, offsetof(Layout, opCode));
159 }
160
161 private:
162 view_type data() const {
163 return const_cast<char *>(ConstView::view2ptr());
164 }
165 };
166
167 class Value : public EncodedValueStorage<Layout, ConstView, View> {
168 public:
169 Value() {
170 BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
171 }
172
173 Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
174 };
175
176 } // namespace MSGHEADER
177
178 namespace MsgData {
179#pragma pack(1)
180 struct Layout {
181 MSGHEADER::Layout header;
182 char data[4];
183 };
184#pragma pack()
185
186 class ConstView {
187 public:
188 ConstView(const char* storage) : _storage(storage) { }
189
190 const char* view2ptr() const {
191 return storage().view();
192 }
193
194 int32_t getLen() const {
195 return header().getMessageLength();
196 }
197
198 MSGID getId() const {
199 return header().getRequestID();
200 }
201
202 MSGID getResponseTo() const {
203 return header().getResponseTo();
204 }
205
206 int32_t getOperation() const {
207 return header().getOpCode();
208 }
209
210 const char* data() const {
211 return storage().view(offsetof(Layout, data));
212 }
213
214 bool valid() const {
215 if ( getLen() <= 0 || getLen() > ( 4 * BSONObjMaxInternalSize ) )
216 return false;
217 if ( getOperation() < 0 || getOperation() > 30000 )
218 return false;
219 return true;
220 }
221
222 int64_t getCursor() const {
223 verify( getResponseTo() > 0 );
224 verify( getOperation() == opReply );
225 return ConstDataView(data() + sizeof(int32_t)).readLE<int64_t>();
226 }
227
228 int dataLen() const; // len without header
229
230 protected:
231 const ConstDataView& storage() const {
232 return _storage;
233 }
234
235 MSGHEADER::ConstView header() const {
236 return storage().view(offsetof(Layout, header));
237 }
238
239 private:
240 ConstDataView _storage;
241 };
242
243 class View : public ConstView {
244 public:
245 View(char* storage) : ConstView(storage) {}
246
247 using ConstView::view2ptr;
248 char* view2ptr() {
249 return storage().view();
250 }
251
252 void setLen(int value) {
253 return header().setMessageLength(value);
254 }
255
256 void setId(MSGID value) {
257 return header().setRequestID(value);
258 }
259
260 void setResponseTo(MSGID value) {
261 return header().setResponseTo(value);
262 }
263
264 void setOperation(int value) {
265 return header().setOpCode(value);
266 }
267
268 using ConstView::data;
269 char* data() {
270 return storage().view(offsetof(Layout, data));
271 }
272
273 private:
274 DataView storage() const {
275 return const_cast<char *>(ConstView::view2ptr());
276 }
277
278 MSGHEADER::View header() const {
279 return storage().view(offsetof(Layout, header));
280 }
281 };
282
283 class Value : public EncodedValueStorage<Layout, ConstView, View> {
284 public:
285 Value() {
286 BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
287 }
288
289 Value(ZeroInitTag_t zit) : EncodedValueStorage<Layout, ConstView, View>(zit) {}
290 };
291
292 const int MsgDataHeaderSize = sizeof(Value) - 4;
293 inline int ConstView::dataLen() const {
294 return getLen() - MsgDataHeaderSize;
295 }
296 } // namespace MsgData
297
298 class Message {
299 public:
300 // we assume here that a vector with initial size 0 does no allocation (0 is the default, but wanted to make it explicit).
301 Message() : _buf( 0 ), _data( 0 ), _freeIt( false ) {}
302 Message( void * data , bool freeIt ) :
303 _buf( 0 ), _data( 0 ), _freeIt( false ) {
304 _setData( reinterpret_cast< char* >( data ), freeIt );
305 };
306 Message(Message& r) : _buf( 0 ), _data( 0 ), _freeIt( false ) {
307 *this = r;
308 }
309 ~Message() {
310 reset();
311 }
312
313 SockAddr _from;
314
315 MsgData::View header() const {
316 verify( !empty() );
317 return _buf ? _buf : _data[ 0 ].first;
318 }
319
320 int operation() const { return header().getOperation(); }
321
322 MsgData::View singleData() const {
323 massert( 13273, "single data buffer expected", _buf );
324 return header();
325 }
326
327 bool empty() const { return !_buf && _data.empty(); }
328
329 int size() const {
330 int res = 0;
331 if ( _buf ) {
332 res = MsgData::ConstView(_buf).getLen();
333 }
334 else {
335 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
336 res += it->second;
337 }
338 }
339 return res;
340 }
341
342 int dataSize() const { return size() - sizeof(MSGHEADER::Value); }
343
344 // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
345 // can get rid of this if we make response handling smarter
346 void concat() {
347 if ( _buf || empty() ) {
348 return;
349 }
350
351 verify( _freeIt );
352 int totalSize = 0;
353 for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
354 i != _data.end(); ++i) {
355 totalSize += i->second;
356 }
357 char *buf = (char*)malloc( totalSize );
358 char *p = buf;
359 for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
360 i != _data.end(); ++i) {
361 memcpy( p, i->first, i->second );
362 p += i->second;
363 }
364 reset();
365 _setData( buf, true );
366 }
367
368 // vector swap() so this is fast
369 Message& operator=(Message& r) {
370 verify( empty() );
371 verify( r._freeIt );
372 _buf = r._buf;
373 r._buf = 0;
374 if ( r._data.size() > 0 ) {
375 _data.swap( r._data );
376 }
377 r._freeIt = false;
378 _freeIt = true;
379 return *this;
380 }
381
382 void reset() {
383 if ( _freeIt ) {
384 if ( _buf ) {
385 free( _buf );
386 }
387 for (std::vector< std::pair< char *, int > >::const_iterator i = _data.begin();
388 i != _data.end(); ++i) {
389 free(i->first);
390 }
391 }
392 _buf = 0;
393 _data.clear();
394 _freeIt = false;
395 }
396
397 // use to add a buffer
398 // assumes message will free everything
399 void appendData(char *d, int size) {
400 if ( size <= 0 ) {
401 return;
402 }
403 if ( empty() ) {
404 MsgData::View md = d;
405 md.setLen(size); // can be updated later if more buffers added
406 _setData( md.view2ptr(), true );
407 return;
408 }
409 verify( _freeIt );
410 if ( _buf ) {
411 _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen()));
412 _buf = 0;
413 }
414 _data.push_back(std::make_pair(d, size));
415 header().setLen(header().getLen() + size);
416 }
417
418 // use to set first buffer if empty
419 void setData(char* d, bool freeIt) {
420 verify( empty() );
421 _setData( d, freeIt );
422 }
423 void setData(int operation, const char *msgtxt) {
424 setData(operation, msgtxt, strlen(msgtxt)+1);
425 }
426 void setData(int operation, const char *msgdata, size_t len) {
427 verify( empty() );
428 size_t dataLen = len + sizeof(MsgData::Value) - 4;
429 MsgData::View d = reinterpret_cast<char *>(malloc(dataLen));
430 memcpy(d.data(), msgdata, len);
431 d.setLen(dataLen);
432 d.setOperation(operation);
433 _setData( d.view2ptr(), true );
434 }
435
436 bool doIFreeIt() {
437 return _freeIt;
438 }
439
440 void send( MessagingPort &p, const char *context );
441
442 std::string toString() const;
443
444 private:
445 void _setData( char* d, bool freeIt ) {
446 _freeIt = freeIt;
447 _buf = d;
448 }
449 // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
450 char* _buf;
451 // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage instead
452 typedef std::vector< std::pair< char*, int > > MsgVec;
453 MsgVec _data;
454 bool _freeIt;
455 };
456
457
458 MSGID nextMessageId();
459
460
461} // namespace mongo
Definition message.h:99
Definition message.h:167
Definition message.h:134
Definition message.h:298
Definition message_port.h:68
Definition message.h:186
Definition message.h:283
Definition message.h:243
the idea here is to make one liners easy.
Definition str.h:44
the main MongoDB namespace
Definition bulk_operation_builder.h:24
const size_t MaxMessageSizeBytes
Maximum accepted message size on the wire protocol.
Definition message.h:36
Definition message.h:90
Definition message.h:180
wrapped around os representation of network address
Definition sock.h:94