So regarding `BATCH` messages. I'm implementing a ...
# best-practices
j
So regarding
BATCH
messages. I'm implementing a
BatchSink
and I'm getting some surprising differences in behavior depending on if the Sink is receiving regular
RECORD
messages or a
BATCH
message. If the Sink is receiving a
BATCH
message *
Sink.start_batch()
is never called (This is very surprising) *
Sink.process_record()
is never called (whether it SHOULD be called is up for interpretation) * In
Sink.process_batch()
,
context
doesn't have a
batch_id
in it. It only has a list called
records
.
Maybe I'm doing something wrong. I'm doing this:
Copy code
target = TargetBigQuery(config = MINIMAL_CONFIG)

# This is a StringIO object
tap_lines = test_utils.get_test_tap_lines('batch_one.jsonl')

target.listen(file_input=tap_lines)
This is my batch stream:
Copy code
{"type": "STATE", "value": {"currently_syncing": "batch_test"}}
{"type": "SCHEMA", "stream": "batch_test", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "BATCH", "stream": "batch_test", "encoding": { "format": "jsonl", "compression": "gzip"}, "manifest": ["<file://PATHGOESHERE/batch_records.jsonl.gz>"]}
{"type": "STATE", "value": {"currently_syncing": "batch_test"}}
{"type": "STATE", "value": {"currently_syncing": "batch_test", "bookmarks": {"batch_test": {"initial_full_table_complete": true}}}}
With the compressed batch file looking like this:
Copy code
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
{"type": "RECORD", "stream": "test_stream", "record": {"c_pk": 3, "c_varchar": "3", "c_int": 3}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}
I'm guessing the batch file only includes a list of records. the docs aren't clear on this
Should the batch file not contain RECORD messages? Instead only the inner-records ?
ok raw records in file works. Still, it is surprisingthat no
batch_id
is set
e
I’m guessing the batch file only includes a list of records. the docs aren’t clear on this
That’s correct, the batch file should include only raw records, not record messages. The docs could certainly be improved since that’s not clear 🙂
Still, it is surprisingthat no
batch_id
is set
Yeah, so that’s definitely an oversight in
_process_batch_message
. Both logged: sdk#1030, sdk#1031
j
awesome