I have what is probably a dumb question, but I don...
# singer-tap-development
m
I have what is probably a dumb question, but I don’t seem to be able to find the answer, so I’ll ask it anyway. How does a tap built on the SDK update the bookmark in the state, and how does the tap get that bookmark back? I’m working with mongodb so want a non timestamp replication key
a
How does a tap built on the SDK update the bookmark in the state?
By defining your replication key, the SDK will automatically track that property's value in the emitted records, emitting STATE messages at specific points in the pipeline that include the last-seen or highest-seen value. The developer configures
Stream.is_sorted
,
Stream.replication_key
, and
Stream.check_sorted
, and the SDK handles it from there (for most use cases, as least).
...how does the tap get that bookmark back?
The orchestrator gets the STATE message as emitted from the target. On next run, it will send it back to the tap. The tap developer accesses the start value by calling either
Stream.get_starting_replication_key_value()
or
Stream.get_starting_timestamp()
. Does this help? 🙂
m
Thanks - that’s how I thought it was supposed to work. I am setting replication key, and calling get_starting_replication_key_value, but the returned value is always None even though I am getting records returned. I must have something misconfigured 😕
a
If using Meltano, you'll need to specify that the tap supports the "state" capability.
Otherwise, Meltano will not try to feed it back to the tap
m
Yeah, I’m using the SDK tap template and I have that capability enabled
I must have done something dumb
a
I am setting replication key, and calling get_starting_replication_key_value, but the returned value is always None even though I am getting records returned.
To follow up, are you testing by executing with Meltano or by invoking directly? One way or another, the next thing to confirm would be that your state is getting passed back to the tap. And also: This will be null for the entire "first" sync operation if you aren't passing it in explicitly.
I put "first" in quotes because until the tap gets an input state via
--state
CLI arg, it will always think it is in its first sync.
m
I am testing with
meltano invoke
and with
meltano run
a
Meltano run will auto-pass state to the next run, but invoke will not.
m
For the latter I am testing with and without —`full-refresh`
I’m looking at the state too, with
meltano state get
At this point it seems like I must have misspelled a key or something 😫
I’ve checked my spelling everywhere and I’m still stumped by this. This is my tap: https://github.com/menzenski/tap-mongodb When I run it against a test db that has two documents, it outputs both documents, and the
_id
key looks like I’d expect it too. That
_id
key is set as the replication key.
Copy code
$ TAP_MONGODB_MONGODB_CONNECTION_STRING=<mongodb://admin:password@localhost:27017> meltano run --full-refresh tap-mongodb target-jsonl | tee -a logs.txt
2023-04-04T02:30:33.757318Z [info     ] Environment 'test' is active
2023-04-04T02:30:36.314589Z [info     ] Performing full refresh, ignoring state left behind by any previous runs.
2023-04-04T02:30:38.066360Z [info     ] 2023-04-03 21:30:38,066 | INFO     | tap-mongodb          | Beginning full_table sync of 'TestDocument'... cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.066947Z [info     ] 2023-04-03 21:30:38,066 | INFO     | tap-mongodb          | Tap has custom mapper. Using 1 provided map(s). cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.067338Z [info     ] 2023-04-03 21:30:38,066 | INFO     | tap-mongodb          | bookmark: None cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068258Z [info     ] 2023-04-03 21:30:38,067 | INFO     | tap-mongodb          | record: {'_id': ObjectId('642b40e397e74b5400375e73'), 'name': 'TestDocumentName'} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068419Z [info     ] 2023-04-03 21:30:38,068 | INFO     | tap-mongodb          | object_id: 642b40e397e74b5400375e73 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068548Z [info     ] 2023-04-03 21:30:38,068 | INFO     | tap-mongodb          | str(object_id): 642b40e397e74b5400375e73 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068828Z [info     ] 2023-04-03 21:30:38,068 | INFO     | tap-mongodb          | record: {'_id': ObjectId('642b417197e74b5400375e74'), 'name': 'TestDocumentName2'} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068966Z [info     ] 2023-04-03 21:30:38,068 | INFO     | tap-mongodb          | object_id: 642b417197e74b5400375e74 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.069089Z [info     ] 2023-04-03 21:30:38,068 | INFO     | tap-mongodb          | str(object_id): 642b417197e74b5400375e74 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.069206Z [info     ] 2023-04-03 21:30:38,068 | INFO     | singer_sdk.metrics   | INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.0021359920501708984, "tags": {"stream": "TestDocument", "context": {}, "status": "succeeded"}} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.069396Z [info     ] 2023-04-03 21:30:38,068 | INFO     | singer_sdk.metrics   | INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 2, "tags": {"stream": "TestDocument", "context": {}}} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.671086Z [info     ] Incremental state has been updated at 2023-04-04 02:30:38.670981.
2023-04-04T02:30:38.680620Z [info     ] Block run completed.           block_type=ExtractLoadBlocks err=None set_number=0 success=True
But nothing shows in the state afterwards: ```meltano state get test:tap-mongodb-to-target-jsonl 2023-04-04T023240.624493Z [info ] The default environment 'test' will be ignored for
meltano state
. To configure a specific environment, p…
hmmm… I did the same test with https://github.com/z3z1ma/tap-mongodb and get the same behavior - the two records are output but there’s no value added to the state
Copy code
meltano state get dev:tap-mongodb-to-target-jsonl
2023-04-04T02:48:45.760254Z [info     ] The default environment 'dev' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-04T02:48:46.193665Z [warning  ] Running state operation for environment 'dev' outside of an environment
2023-04-04T02:48:46.194728Z [info     ] Environment 'dev' is active
{"singer_state": {"bookmarks": {"example_test_database_TestDocument": {}}}}
a
Did you try adding this to your metadata to mutate the output catalog?
Copy code
metadata:
    example_test_database_TestDocument:
      replication-key: <yourField>
      replication-method: INCREMENTAL
Possibly worth clearing any cached catalog, I am not 100% sure if the
metadata
key is overlayed over an existing catalog or only factored in during generation.
m
I deleted
.meltano/run/
(to clear the cache) and get the same behavior.
but that metadata object has an effect!!
Copy code
$ meltano state get test:tap-mongodb-to-target-jsonl
2023-04-04T13:58:09.468456Z [info     ] The default environment 'test' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-04T13:58:09.873825Z [warning  ] Running state operation for environment 'test' outside of an environment
2023-04-04T13:58:09.874717Z [info     ] Environment 'test' is active
{"singer_state": {"bookmarks": {"test_database_TestDocument": {"replication_key": "_id", "replication_key_value": "642b417197e74b5400375e74"}}}}
Thanks @alexander_butler! now I have something to dig into more
🦜 now it’s doing what I hoped - I run the tap, I get all the documents in the database. I add a new document in the database, and run the tap again, and it only outputs the new document.
a
@Matt Menzenski - I noticed in your prior log snippet that the _id value seemed to be some kind of an object type - instead of string. I wonder if that is related to your issue.
m
I’m setting it to a string when emitting records:
yield {"_id": str(object_id), "document": record}
but yes, it is “properly” a BSON ObjectID
a
Okay, cool. The other thing I saw when I looked at your source code is that you are overriding the entire Tap.catalog_dict https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/tap.py#L93 instead of the stream
Stream.schema
. There's nothing inherently wrong with that approach but it may have other affects that I'm not fully aware of. Also - I think this call might need to declare a replication key argument: https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/tap.py#L160-L162
(Pasted wrong url at first. Second URL is updated.)
(get_standard_metadata())
m
👀 i started with the catalog_dict implementation from https://github.com/z3z1ma/tap-mongodb because my end goal at this point (I think) is to contribute back to that tap to implement log-based replication per https://github.com/z3z1ma/tap-mongodb/issues/2#issuecomment-1482380787
thanks for these links, I will look more into this
Also - I think this call might need to declare a replication key argument
based on what I’ve seen, this would explain a lot
a
The catalog dict override drives catalog generation. Also the reason we don’t declare replication key is because it might be completely different based on the collection. So we set it via “metadata”, ie directly in the catalog
On a per stream basis. With the help of the fact streams can be matched by globs too.
m
to be clear, I’m doing a lot of stumbling around in the dark right now, but it’s fun. I’m enjoying getting some practice with tap internals.
a
Yeah it’s great. I love doing that too 😄
m
I’m now running into this issue again but in a different way 😫 https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/streams.py#L52-L117 is my implementation of get_records. The incremental replication implementation works great. The records are emitted with their
_id
field set to string value of the
_id
ObjectId field on the source document. This is persisted in the state and the tap is able to resume processing from the saved state key on the next run. I’m happy with this. The log-based replication implementation is causing me grief. If I run the tap with the
keep_open = False
line commented out, it will keep the change stream open and poll the change stream for new records. If there are new records (if I push database updates while the meltano tap is running), they are emitted as expected by the tap. If there is no new record in the change stream, it will emit a “dummy” document where the
_id
field is set to the string value of the change stream’s resume token. This is intended to allow the state to be updated with that token so that it can resume from that point. However, while the tap is emitting records correctly (I see the expected records in the output JSONL file) it’s never updating the state:
Copy code
$ meltano state get test:tap-mongodb-to-target-jsonl
2023-04-05T15:51:32.243210Z [info     ] The default environment 'test' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-05T15:51:32.675307Z [warning  ] Running state operation for environment 'test' outside of an environment
2023-04-05T15:51:32.676460Z [info     ] Environment 'test' is active
{"singer_state": {"bookmarks": {"test_database_TestDocument": {}}}}
I think I found the issue https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L767
Stream#_increment_stream_state
has no handling for log-based replication, so I will need to override this method
omg that did it
I’m so happy 🥲
I do think I’ve found a bug in DocumentDB though facepalm trek