https://linen.dev logo
#singer-tap-development
Title
# singer-tap-development
m

Matt Menzenski

04/05/2023, 4:12 PM
I’m now running into this issue again but in a different way 😫 https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/streams.py#L52-L117 is my implementation of get_records. The incremental replication implementation works great. The records are emitted with their
_id
field set to string value of the
_id
ObjectId field on the source document. This is persisted in the state and the tap is able to resume processing from the saved state key on the next run. I’m happy with this. The log-based replication implementation is causing me grief. If I run the tap with the
keep_open = False
line commented out, it will keep the change stream open and poll the change stream for new records. If there are new records (if I push database updates while the meltano tap is running), they are emitted as expected by the tap. If there is no new record in the change stream, it will emit a “dummy” document where the
_id
field is set to the string value of the change stream’s resume token. This is intended to allow the state to be updated with that token so that it can resume from that point. However, while the tap is emitting records correctly (I see the expected records in the output JSONL file) it’s never updating the state:
Copy code
$ meltano state get test:tap-mongodb-to-target-jsonl
2023-04-05T15:51:32.243210Z [info     ] The default environment 'test' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-05T15:51:32.675307Z [warning  ] Running state operation for environment 'test' outside of an environment
2023-04-05T15:51:32.676460Z [info     ] Environment 'test' is active
{"singer_state": {"bookmarks": {"test_database_TestDocument": {}}}}