Matt Menzenski
04/03/2023, 10:16 PMaaronsteers
04/03/2023, 10:24 PMHow does a tap built on the SDK update the bookmark in the state?By defining your replication key, the SDK will automatically track that property's value in the emitted records, emitting STATE messages at specific points in the pipeline that include the last-seen or highest-seen value. The developer configures
Stream.is_sorted
, Stream.replication_key
, and Stream.check_sorted
, and the SDK handles it from there (for most use cases, as least).
...how does the tap get that bookmark back?The orchestrator gets the STATE message as emitted from the target. On next run, it will send it back to the tap. The tap developer accesses the start value by calling either
Stream.get_starting_replication_key_value()
or Stream.get_starting_timestamp()
.
Does this help? 🙂aaronsteers
04/03/2023, 10:25 PMMatt Menzenski
04/03/2023, 10:40 PMaaronsteers
04/03/2023, 10:44 PMaaronsteers
04/03/2023, 10:44 PMMatt Menzenski
04/03/2023, 11:17 PMMatt Menzenski
04/03/2023, 11:18 PMaaronsteers
04/03/2023, 11:19 PMI am setting replication key, and calling get_starting_replication_key_value, but the returned value is always None even though I am getting records returned.To follow up, are you testing by executing with Meltano or by invoking directly? One way or another, the next thing to confirm would be that your state is getting passed back to the tap. And also: This will be null for the entire "first" sync operation if you aren't passing it in explicitly.
aaronsteers
04/03/2023, 11:20 PM--state
CLI arg, it will always think it is in its first sync.Matt Menzenski
04/03/2023, 11:25 PMmeltano invoke
and with meltano run
aaronsteers
04/03/2023, 11:26 PMMatt Menzenski
04/03/2023, 11:26 PMMatt Menzenski
04/03/2023, 11:26 PMmeltano state get
Matt Menzenski
04/03/2023, 11:26 PMMatt Menzenski
04/04/2023, 2:34 AM_id
key looks like I’d expect it too. That _id
key is set as the replication key.
$ TAP_MONGODB_MONGODB_CONNECTION_STRING=<mongodb://admin:password@localhost:27017> meltano run --full-refresh tap-mongodb target-jsonl | tee -a logs.txt
2023-04-04T02:30:33.757318Z [info ] Environment 'test' is active
2023-04-04T02:30:36.314589Z [info ] Performing full refresh, ignoring state left behind by any previous runs.
2023-04-04T02:30:38.066360Z [info ] 2023-04-03 21:30:38,066 | INFO | tap-mongodb | Beginning full_table sync of 'TestDocument'... cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.066947Z [info ] 2023-04-03 21:30:38,066 | INFO | tap-mongodb | Tap has custom mapper. Using 1 provided map(s). cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.067338Z [info ] 2023-04-03 21:30:38,066 | INFO | tap-mongodb | bookmark: None cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068258Z [info ] 2023-04-03 21:30:38,067 | INFO | tap-mongodb | record: {'_id': ObjectId('642b40e397e74b5400375e73'), 'name': 'TestDocumentName'} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068419Z [info ] 2023-04-03 21:30:38,068 | INFO | tap-mongodb | object_id: 642b40e397e74b5400375e73 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068548Z [info ] 2023-04-03 21:30:38,068 | INFO | tap-mongodb | str(object_id): 642b40e397e74b5400375e73 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068828Z [info ] 2023-04-03 21:30:38,068 | INFO | tap-mongodb | record: {'_id': ObjectId('642b417197e74b5400375e74'), 'name': 'TestDocumentName2'} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.068966Z [info ] 2023-04-03 21:30:38,068 | INFO | tap-mongodb | object_id: 642b417197e74b5400375e74 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.069089Z [info ] 2023-04-03 21:30:38,068 | INFO | tap-mongodb | str(object_id): 642b417197e74b5400375e74 cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.069206Z [info ] 2023-04-03 21:30:38,068 | INFO | singer_sdk.metrics | INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.0021359920501708984, "tags": {"stream": "TestDocument", "context": {}, "status": "succeeded"}} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.069396Z [info ] 2023-04-03 21:30:38,068 | INFO | singer_sdk.metrics | INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 2, "tags": {"stream": "TestDocument", "context": {}}} cmd_type=elb consumer=False name=tap-mongodb producer=True stdio=stderr string_id=tap-mongodb
2023-04-04T02:30:38.671086Z [info ] Incremental state has been updated at 2023-04-04 02:30:38.670981.
2023-04-04T02:30:38.680620Z [info ] Block run completed. block_type=ExtractLoadBlocks err=None set_number=0 success=True
But nothing shows in the state afterwards:
```meltano state get test:tap-mongodb-to-target-jsonl
2023-04-04T023240.624493Z [info ] The default environment 'test' will be ignored for meltano state
. To configure a specific environment, p…Matt Menzenski
04/04/2023, 2:53 AMmeltano state get dev:tap-mongodb-to-target-jsonl
2023-04-04T02:48:45.760254Z [info ] The default environment 'dev' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-04T02:48:46.193665Z [warning ] Running state operation for environment 'dev' outside of an environment
2023-04-04T02:48:46.194728Z [info ] Environment 'dev' is active
{"singer_state": {"bookmarks": {"example_test_database_TestDocument": {}}}}
alexander_butler
04/04/2023, 4:48 AMmetadata:
example_test_database_TestDocument:
replication-key: <yourField>
replication-method: INCREMENTAL
Possibly worth clearing any cached catalog, I am not 100% sure if the metadata
key is overlayed over an existing catalog or only factored in during generation.Matt Menzenski
04/04/2023, 1:58 PM.meltano/run/
(to clear the cache) and get the same behavior.Matt Menzenski
04/04/2023, 1:59 PM$ meltano state get test:tap-mongodb-to-target-jsonl
2023-04-04T13:58:09.468456Z [info ] The default environment 'test' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-04T13:58:09.873825Z [warning ] Running state operation for environment 'test' outside of an environment
2023-04-04T13:58:09.874717Z [info ] Environment 'test' is active
{"singer_state": {"bookmarks": {"test_database_TestDocument": {"replication_key": "_id", "replication_key_value": "642b417197e74b5400375e74"}}}}
Thanks @alexander_butler! now I have something to dig into moreMatt Menzenski
04/04/2023, 2:27 PMaaronsteers
04/04/2023, 3:49 PMMatt Menzenski
04/04/2023, 3:50 PMyield {"_id": str(object_id), "document": record}
but yes, it is “properly” a BSON ObjectIDaaronsteers
04/04/2023, 3:53 PMStream.schema
. There's nothing inherently wrong with that approach but it may have other affects that I'm not fully aware of.
Also - I think this call might need to declare a replication key argument: https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/tap.py#L160-L162aaronsteers
04/04/2023, 3:54 PMaaronsteers
04/04/2023, 3:54 PMMatt Menzenski
04/04/2023, 3:55 PMMatt Menzenski
04/04/2023, 3:55 PMAlso - I think this call might need to declare a replication key argumentbased on what I’ve seen, this would explain a lot
alexander_butler
04/04/2023, 3:56 PMalexander_butler
04/04/2023, 3:57 PMMatt Menzenski
04/04/2023, 4:02 PMalexander_butler
04/04/2023, 4:03 PMMatt Menzenski
04/05/2023, 4:12 PM_id
field set to string value of the _id
ObjectId field on the source document. This is persisted in the state and the tap is able to resume processing from the saved state key on the next run. I’m happy with this.
The log-based replication implementation is causing me grief. If I run the tap with the keep_open = False
line commented out, it will keep the change stream open and poll the change stream for new records. If there are new records (if I push database updates while the meltano tap is running), they are emitted as expected by the tap. If there is no new record in the change stream, it will emit a “dummy” document where the _id
field is set to the string value of the change stream’s resume token. This is intended to allow the state to be updated with that token so that it can resume from that point. However, while the tap is emitting records correctly (I see the expected records in the output JSONL file) it’s never updating the state:
$ meltano state get test:tap-mongodb-to-target-jsonl
2023-04-05T15:51:32.243210Z [info ] The default environment 'test' will be ignored for `meltano state`. To configure a specific environment, please use the option `--environment=<environment name>`.
2023-04-05T15:51:32.675307Z [warning ] Running state operation for environment 'test' outside of an environment
2023-04-05T15:51:32.676460Z [info ] Environment 'test' is active
{"singer_state": {"bookmarks": {"test_database_TestDocument": {}}}}
Matt Menzenski
04/05/2023, 6:16 PMStream#_increment_stream_state
has no handling for log-based replication, so I will need to override this methodMatt Menzenski
04/05/2023, 6:19 PMMatt Menzenski
04/05/2023, 6:29 PMMatt Menzenski
04/05/2023, 9:16 PM