jonas_kalderstam
10/03/2022, 7:55 AMBATCH messages. I'm implementing a BatchSink and I'm getting some surprising differences in behavior depending on if the Sink is receiving regular RECORD messages or a BATCH message.
If the Sink is receiving a BATCH message
* Sink.start_batch() is never called (This is very surprising)
* Sink.process_record() is never called (whether it SHOULD be called is up for interpretation)
* In Sink.process_batch(), context doesn't have a batch_id in it. It only has a list called records.jonas_kalderstam
10/03/2022, 7:56 AMjonas_kalderstam
10/03/2022, 7:56 AMtarget = TargetBigQuery(config = MINIMAL_CONFIG)
# This is a StringIO object
tap_lines = test_utils.get_test_tap_lines('batch_one.jsonl')
target.listen(file_input=tap_lines)jonas_kalderstam
10/03/2022, 7:56 AMjonas_kalderstam
10/03/2022, 7:56 AM{"type": "STATE", "value": {"currently_syncing": "batch_test"}}
{"type": "SCHEMA", "stream": "batch_test", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "BATCH", "stream": "batch_test", "encoding": { "format": "jsonl", "compression": "gzip"}, "manifest": ["<file://PATHGOESHERE/batch_records.jsonl.gz>"]}
{"type": "STATE", "value": {"currently_syncing": "batch_test"}}
{"type": "STATE", "value": {"currently_syncing": "batch_test", "bookmarks": {"batch_test": {"initial_full_table_complete": true}}}}jonas_kalderstam
10/03/2022, 7:56 AMjonas_kalderstam
10/03/2022, 7:56 AM{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}jonas_kalderstam
10/03/2022, 7:58 AMjonas_kalderstam
10/03/2022, 8:06 AMjonas_kalderstam
10/03/2022, 8:09 AMbatch_id is setedgar_ramirez_mondragon
10/03/2022, 4:03 PMI’m guessing the batch file only includes a list of records. the docs aren’t clear on thisThat’s correct, the batch file should include only raw records, not record messages. The docs could certainly be improved since that’s not clear 🙂
Still, it is surprisingthat noYeah, so that’s definitely an oversight inis setbatch_id
_process_batch_message.
Both logged: sdk#1030, sdk#1031jonas_kalderstam
10/03/2022, 4:49 PM