If I want to use a replication_key_value that is s...
# singer-tap-development
m
If I want to use a replication_key_value that is sometimes one type, and sometimes another, is that a sign that I should partition my Stream and have one stream with each type? More context: I am working through an implementation of a MongoDB tap (with the intent that it will support AWS DocumentDB as well) that uses change streams. Change streams in MongoDB only go as far back in time as the operations log has data for, so as a rule, when the tap is run for the first time on an existing database, it won’t be able to pull all the data from the change stream (only the last few days of activity, possibly). The behavior that I am trying to implement is this (I’m using
_id
as the replication key everywhere): • When the tap is first run, and there is no prior replication_key_value, scan the entire collection.
_id
is the stringified ObjectId, which is sortable (timestamp) - so this emits all documents in the collection in order of creation.
Copy code
for record in self._collection.find({}).sort([("_id", ASCENDING)]):
                object_id: ObjectId = record["_id"]
                yield {"_id": str(object_id), "document": record}
• If the tap is run with a replication_key_value that is an ObjectId, start processing the collection from that point:
Copy code
for record in self._collection.find({"_id": {"$gt": bookmark}}).sort([("_id", ASCENDING)]):
                object_id: ObjectId = record["_id"]
                yield {"_id": str(object_id), "document": record}
• once we reach the end of the collection, switch from just iterating over documents to querying the MongoDB change stream:
Copy code
with self._collection.watch(full_document="updateLookup") as change_stream:
                for record in change_stream:
                    yield record  # TODO this data model will change, they'll all need to be in sync
• if the tap is run with a replication_key_value that is not parseable as an ObjectId, but is parseable as a BSON resume token, start reading from the change stream at that point:
Copy code
with self._collection.watch(full_document="updateLookup", resume_after=bookmark) as change_stream:
                for record in change_stream:
                    yield record  # TODO this data model will change, they'll all need to be in sync
This seems like it should work (will it?) but it also feels like it might be a code smell / antipattern to be switching from one kind of replication key value to another. So I’d like to solicit input on how best to handle this scenario. Do I: 1. use this strategy (full collection scan first, then switch to change stream, one single Stream) 2. use stream partitioning to manage these separately (one partition for the full collection scan with ObjectId replication key, another partition for the change streams with resume token replication key) (is this even a use case for stream partitioning?) 3. split these out entirely and let callers configure the tap separately for loading a collection vs streaming changes
Now that I think about it more, Option 3 “split these out entirely and let callers configure the tap separately for loading a collection vs streaming changes” does seem to correspond nicely with configuring the tap for incremental replication vs log-based replication thinkspin
e
Yeah, we don’t have a reference implementations of log replication in sql taps built with the sdk, but I think that’s the approach we’d be taking: having separate stream classes for field-based and oplog-based. Then depending on the contents of the state, initialize one or the other for each stream in the catalog. Wdyt @ken_payne @aaronsteers?
a
There's an issue or discussion in the SDK repo that evaluates a few different approaches to log based replication. One key finding of that discussion was that our bookmarks should be bi-modal and contain (when possible) both log-based and field-based bookmark values.
This would lean towards a design where replication_key and log_replication_key are both defined for a stream, and then our bookmarking system would attempt to track and record both when available.
I'll try to find that issue. Also, would make a great office hours discussion if folks are available to discuss this week or next
Here we go. I've updated this issue to link to both related discussions: Support `LOG_BASED` replication method · Issue #304 · meltano/sdk
m
Has any discussion occurred around log based replication in non-sql taps? Any prior art?
a
DynamoDB implements log based via the "Dynamo Streams" feature. Not technically a SQL tap, despite the name. If I remember correctly, it's the Singer/Stitch variant