Matt Menzenski
04/04/2023, 9:12 PM_id
as the replication key everywhere):
• When the tap is first run, and there is no prior replication_key_value, scan the entire collection. _id
is the stringified ObjectId, which is sortable (timestamp) - so this emits all documents in the collection in order of creation.
for record in self._collection.find({}).sort([("_id", ASCENDING)]):
object_id: ObjectId = record["_id"]
yield {"_id": str(object_id), "document": record}
• If the tap is run with a replication_key_value that is an ObjectId, start processing the collection from that point:
for record in self._collection.find({"_id": {"$gt": bookmark}}).sort([("_id", ASCENDING)]):
object_id: ObjectId = record["_id"]
yield {"_id": str(object_id), "document": record}
• once we reach the end of the collection, switch from just iterating over documents to querying the MongoDB change stream:
with self._collection.watch(full_document="updateLookup") as change_stream:
for record in change_stream:
yield record # TODO this data model will change, they'll all need to be in sync
• if the tap is run with a replication_key_value that is not parseable as an ObjectId, but is parseable as a BSON resume token, start reading from the change stream at that point:
with self._collection.watch(full_document="updateLookup", resume_after=bookmark) as change_stream:
for record in change_stream:
yield record # TODO this data model will change, they'll all need to be in sync
This seems like it should work (will it?) but it also feels like it might be a code smell / antipattern to be switching from one kind of replication key value to another.
So I’d like to solicit input on how best to handle this scenario.
Do I:
1. use this strategy (full collection scan first, then switch to change stream, one single Stream)
2. use stream partitioning to manage these separately (one partition for the full collection scan with ObjectId replication key, another partition for the change streams with resume token replication key) (is this even a use case for stream partitioning?)
3. split these out entirely and let callers configure the tap separately for loading a collection vs streaming changes