If I want to use a replication_key_value that is s...
# singer-tap-development
m
If I want to use a replication_key_value that is sometimes one type, and sometimes another, is that a sign that I should partition my Stream and have one stream with each type? More context: I am working through an implementation of a MongoDB tap (with the intent that it will support AWS DocumentDB as well) that uses change streams. Change streams in MongoDB only go as far back in time as the operations log has data for, so as a rule, when the tap is run for the first time on an existing database, it won’t be able to pull all the data from the change stream (only the last few days of activity, possibly). The behavior that I am trying to implement is this (I’m using
_id
as the replication key everywhere): • When the tap is first run, and there is no prior replication_key_value, scan the entire collection.
_id
is the stringified ObjectId, which is sortable (timestamp) - so this emits all documents in the collection in order of creation.
Copy code
for record in self._collection.find({}).sort([("_id", ASCENDING)]):
                object_id: ObjectId = record["_id"]
                yield {"_id": str(object_id), "document": record}
• If the tap is run with a replication_key_value that is an ObjectId, start processing the collection from that point:
Copy code
for record in self._collection.find({"_id": {"$gt": bookmark}}).sort([("_id", ASCENDING)]):
                object_id: ObjectId = record["_id"]
                yield {"_id": str(object_id), "document": record}
• once we reach the end of the collection, switch from just iterating over documents to querying the MongoDB change stream:
Copy code
with self._collection.watch(full_document="updateLookup") as change_stream:
                for record in change_stream:
                    yield record  # TODO this data model will change, they'll all need to be in sync
• if the tap is run with a replication_key_value that is not parseable as an ObjectId, but is parseable as a BSON resume token, start reading from the change stream at that point:
Copy code
with self._collection.watch(full_document="updateLookup", resume_after=bookmark) as change_stream:
                for record in change_stream:
                    yield record  # TODO this data model will change, they'll all need to be in sync
This seems like it should work (will it?) but it also feels like it might be a code smell / antipattern to be switching from one kind of replication key value to another. So I’d like to solicit input on how best to handle this scenario. Do I: 1. use this strategy (full collection scan first, then switch to change stream, one single Stream) 2. use stream partitioning to manage these separately (one partition for the full collection scan with ObjectId replication key, another partition for the change streams with resume token replication key) (is this even a use case for stream partitioning?) 3. split these out entirely and let callers configure the tap separately for loading a collection vs streaming changes