MongoDB C++ Driver mongocxx-4.0.0
Loading...
Searching...
No Matches
Change Streams

How to obtain and use change streams.

Obtain a Change Stream

From a Client

void example(mongocxx::client client) {
// Basic usage.
{
mongocxx::change_stream stream = client.watch();
EXPECT(stream.get_resume_token());
}
// With options.
{
opts.batch_size(1);
// ... other change stream options.
mongocxx::change_stream stream = client.watch(opts);
EXPECT(stream.get_resume_token());
}
// With a pipeline.
{
pipeline.match(bsoncxx::from_json(R"({"operationType": "insert"})"));
// ... other pipeline options.
mongocxx::change_stream stream = client.watch(pipeline);
EXPECT(stream.get_resume_token());
}
}

From a Database

void example(mongocxx::database db) {
// Basic usage.
{
mongocxx::change_stream stream = db.watch();
EXPECT(stream.get_resume_token());
}
// With options.
{
opts.batch_size(1);
// ... other change stream options.
mongocxx::change_stream stream = db.watch(opts);
EXPECT(stream.get_resume_token());
}
// With a pipeline.
{
pipeline.match(bsoncxx::from_json(R"({"operationType": "insert"})"));
// ... other pipeline options.
mongocxx::change_stream stream = db.watch(pipeline);
EXPECT(stream.get_resume_token());
}
}

From a Collection

void example(mongocxx::collection coll) {
// Basic usage.
{
mongocxx::change_stream stream = coll.watch();
EXPECT(stream.get_resume_token());
}
// With options.
{
opts.batch_size(1);
// ... other change stream options.
mongocxx::change_stream stream = coll.watch(opts);
EXPECT(stream.get_resume_token());
}
// With a pipeline.
{
pipeline.match(bsoncxx::from_json(R"({"operationType": "insert"})"));
// ... other pipeline options.
mongocxx::change_stream stream = coll.watch(pipeline);
EXPECT(stream.get_resume_token());
}
}

Use a Change Stream

Basic Usage

void example(mongocxx::database db) {
mongocxx::collection coll = db.create_collection("coll");
mongocxx::change_stream stream = coll.watch();
auto result_opt = coll.insert_one(bsoncxx::from_json(R"({"x": 1})"));
EXPECT(result_opt);
auto id = result_opt->inserted_id();
int count = 0;
auto now = [] { return std::chrono::steady_clock::now(); };
auto start = now();
// periodicNoopIntervalSecs: 10 (default)
while (count < 1 && now() - start < std::chrono::seconds(10)) {
for (bsoncxx::document::view change : stream) {
++count;
EXPECT(change["operationType"]);
EXPECT(change["operationType"].get_string().value == "insert");
EXPECT(change["ns"]);
EXPECT(change["ns"]["db"].get_string().value == db.name());
EXPECT(change["ns"]["coll"].get_string().value == coll.name());
EXPECT(change["fullDocument"]);
EXPECT(change["fullDocument"]["x"]);
EXPECT(change["documentKey"]);
EXPECT(change["documentKey"]["_id"].get_oid().value == id);
}
}
EXPECT(count == 1);
}

With Pipeline

void example(mongocxx::database db) {
mongocxx::collection coll = db.create_collection("coll");
pipeline.match(
bsoncxx::from_json(R"({"operationType": "insert", "fullDocument.watched": true})"));
// ... other pipeline options.
mongocxx::change_stream stream = coll.watch(pipeline);
// Observed.
EXPECT(coll.insert_one(bsoncxx::from_json(R"({"x": 1, "watched": true})")));
// Not observed (fullDocument mismatch).
EXPECT(coll.insert_one(bsoncxx::from_json(R"({"x": 2, "watched": false})")));
// Not observed (operationType mismatch).
EXPECT(coll.update_one(bsoncxx::from_json(R"({"x": 2})"),
bsoncxx::from_json(R"({"$set": {"watched": true}})")));
// Observed.
EXPECT(coll.insert_one(bsoncxx::from_json(R"({"x": 3, "watched": true})")));
int count = 0;
auto now = [] { return std::chrono::steady_clock::now(); };
auto start = now();
// periodicNoopIntervalSecs: 10 (default)
while (count < 2 && now() - start < std::chrono::seconds(10)) {
for (bsoncxx::document::view change : stream) {
++count;
}
}
EXPECT(count == 2);
}