jonas_kalderstam
10/03/2022, 7:55 AMBATCH
messages. I'm implementing a BatchSink
and I'm getting some surprising differences in behavior depending on if the Sink is receiving regular RECORD
messages or a BATCH
message.
If the Sink is receiving a BATCH
message
* Sink.start_batch()
is never called (This is very surprising)
* Sink.process_record()
is never called (whether it SHOULD be called is up for interpretation)
* In Sink.process_batch()
, context
doesn't have a batch_id
in it. It only has a list called records
.jonas_kalderstam
10/03/2022, 7:56 AMjonas_kalderstam
10/03/2022, 7:56 AMtarget = TargetBigQuery(config = MINIMAL_CONFIG)
# This is a StringIO object
tap_lines = test_utils.get_test_tap_lines('batch_one.jsonl')
target.listen(file_input=tap_lines)
jonas_kalderstam
10/03/2022, 7:56 AMjonas_kalderstam
10/03/2022, 7:56 AM{"type": "STATE", "value": {"currently_syncing": "batch_test"}}
{"type": "SCHEMA", "stream": "batch_test", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "BATCH", "stream": "batch_test", "encoding": { "format": "jsonl", "compression": "gzip"}, "manifest": ["<file://PATHGOESHERE/batch_records.jsonl.gz>"]}
{"type": "STATE", "value": {"currently_syncing": "batch_test"}}
{"type": "STATE", "value": {"currently_syncing": "batch_test", "bookmarks": {"batch_test": {"initial_full_table_complete": true}}}}
jonas_kalderstam
10/03/2022, 7:56 AMjonas_kalderstam
10/03/2022, 7:56 AM{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
jonas_kalderstam
10/03/2022, 7:58 AMjonas_kalderstam
10/03/2022, 8:06 AMjonas_kalderstam
10/03/2022, 8:09 AMbatch_id
is setedgar_ramirez_mondragon
10/03/2022, 4:03 PMI’m guessing the batch file only includes a list of records. the docs aren’t clear on thisThat’s correct, the batch file should include only raw records, not record messages. The docs could certainly be improved since that’s not clear 🙂
Still, it is surprisingthat noYeah, so that’s definitely an oversight inis setbatch_id
_process_batch_message
.
Both logged: sdk#1030, sdk#1031jonas_kalderstam
10/03/2022, 4:49 PM