How to obtain and use change streams.
Obtain a Change Stream
From a Client
{
EXPECT(stream.get_resume_token());
}
{
opts.batch_size(1);
EXPECT(stream.get_resume_token());
}
{
EXPECT(stream.get_resume_token());
}
}
From a Database
{
EXPECT(stream.get_resume_token());
}
{
opts.batch_size(1);
EXPECT(stream.get_resume_token());
}
{
EXPECT(stream.get_resume_token());
}
}
From a Collection
{
EXPECT(stream.get_resume_token());
}
{
opts.batch_size(1);
EXPECT(stream.get_resume_token());
}
{
EXPECT(stream.get_resume_token());
}
}
Use a Change Stream
Basic Usage
EXPECT(result_opt);
auto id = result_opt->inserted_id();
int count = 0;
auto now = [] { return std::chrono::steady_clock::now(); };
auto start = now();
while (count < 1 && now() - start < std::chrono::seconds(10)) {
++count;
EXPECT(change["operationType"]);
EXPECT(change["operationType"].get_string().value == "insert");
EXPECT(change["ns"]);
EXPECT(change["ns"]["db"].get_string().value == db.name());
EXPECT(change["ns"]["coll"].get_string().value == coll.name());
EXPECT(change["fullDocument"]);
EXPECT(change["fullDocument"]["x"]);
EXPECT(change["documentKey"]);
EXPECT(change["documentKey"]["_id"].get_oid().value == id);
}
}
EXPECT(count == 1);
}
With Pipeline
pipeline.match(
int count = 0;
auto now = [] { return std::chrono::steady_clock::now(); };
auto start = now();
while (count < 2 && now() - start < std::chrono::seconds(10)) {
++count;
}
}
EXPECT(count == 2);
}