Matt Menzenski
04/07/2023, 1:07 PM_sdc metadata fields in a tap? I haven’t found much in the way of examples of this (tap-singer-jsonl is one example). The documentation makes it seem like it’s something that’s handled by targets, but I would like to set it in my tap. (Specificially, my tap’s records can include the MongoDB cluster timestamp, which I’d like to map to the _sdc_extracted_at field).
Is it as simple as adding a boolean add_record_metadata setting to the tap and setting that field if this config property is true?aaronsteers
04/07/2023, 4:45 PMaaronsteers
04/07/2023, 4:46 PMMatt Menzenski
04/07/2023, 4:47 PMoperation_type = record["operationType"]
if operation_type not in operation_types_allowlist:
continue
cluster_time: datetime = record["clusterTime"].as_datetime()
parsed_record = {
"_id": record["_id"]["_data"],
"document": record["fullDocument"],
"operationType": operation_type,
"clusterTime": cluster_time.isoformat(),
"ns": record["ns"],
}
if should_add_metadata:
parsed_record[
"_sdc_extracted_at"
] = cluster_time.isoformat()
parsed_record[
"_sdc_batched_at"
] = datetime.utcnow().isoformat()
if operation_type == "delete":
parsed_record[
"_sdc_deleted_at"
] = cluster_time.isoformat()
yield parsed_recordMatt Menzenski
04/07/2023, 4:47 PMMatt Menzenski
04/07/2023, 4:47 PMaaronsteers
04/07/2023, 4:54 PM_sdc_extracted_at should be happening automatically. There are two parts of this process. First, the RecordMessage type contains a property above the level of the data called "time_extracted".
https://github.com/meltano/sdk/blob/304d1232f286a316567786846c0fa673e27d4323/singer_sdk/_singerlib/messages.py#L96-L106
Second, targets that want to add metadata columns will parse that from record metadata and put it into a column in the target. (Meaning, when coming from the tap, "extracted at" technically it is part of the record message but not part of the record data.)
https://github.com/meltano/sdk/blob/304d1232f286a316567786846c0fa673e27d4323/singer_sdk/sinks/core.py#L235aaronsteers
04/07/2023, 4:55 PMaaronsteers
04/07/2023, 4:57 PMrecord["_sdc_extracted_at"] = message.get("time_extracted")Matt Menzenski
04/07/2023, 4:59 PMaaronsteers
04/07/2023, 5:04 PMaaronsteers
04/07/2023, 5:04 PMaaronsteers
04/07/2023, 5:10 PM_sdc_extracted_at as you are doing, since you won't otherwise have access to that data when you are later creating the message to send to the target.
But then here you can pop it from the record and put it into the "time_extracted" field in the Message itself here.
record_message = singer.RecordMessage(
stream=stream_map.stream_alias,
record=mapped_record,
version=None,
time_extracted=mapped_record.pop("_sdc_extracted_at", utc_now()),
)
This overrides the Message default (currently "utc_now()") and then will be used by the target when creating _sdc_extracted_at downstream.aaronsteers
04/07/2023, 5:11 PMaaronsteers
04/07/2023, 5:12 PMpop() or get() it at the top of that private method - since the other methods above it will remove/clean unknown or deselected fields.aaronsteers
04/07/2023, 5:15 PMaaronsteers
04/07/2023, 5:15 PMaaronsteers
04/07/2023, 5:16 PMMatt Menzenski
04/07/2023, 6:54 PMMatt Menzenski
04/07/2023, 6:55 PM