Matt Menzenski
04/07/2023, 1:07 PM_sdc
metadata fields in a tap? I haven’t found much in the way of examples of this (tap-singer-jsonl is one example). The documentation makes it seem like it’s something that’s handled by targets, but I would like to set it in my tap. (Specificially, my tap’s records can include the MongoDB cluster timestamp, which I’d like to map to the _sdc_extracted_at
field).
Is it as simple as adding a boolean add_record_metadata
setting to the tap and setting that field if this config property is true?aaronsteers
04/07/2023, 4:45 PMaaronsteers
04/07/2023, 4:46 PMMatt Menzenski
04/07/2023, 4:47 PMoperation_type = record["operationType"]
if operation_type not in operation_types_allowlist:
continue
cluster_time: datetime = record["clusterTime"].as_datetime()
parsed_record = {
"_id": record["_id"]["_data"],
"document": record["fullDocument"],
"operationType": operation_type,
"clusterTime": cluster_time.isoformat(),
"ns": record["ns"],
}
if should_add_metadata:
parsed_record[
"_sdc_extracted_at"
] = cluster_time.isoformat()
parsed_record[
"_sdc_batched_at"
] = datetime.utcnow().isoformat()
if operation_type == "delete":
parsed_record[
"_sdc_deleted_at"
] = cluster_time.isoformat()
yield parsed_record
Matt Menzenski
04/07/2023, 4:47 PMMatt Menzenski
04/07/2023, 4:47 PMaaronsteers
04/07/2023, 4:54 PM_sdc_extracted_at
should be happening automatically. There are two parts of this process. First, the RecordMessage
type contains a property above the level of the data called "time_extracted".
https://github.com/meltano/sdk/blob/304d1232f286a316567786846c0fa673e27d4323/singer_sdk/_singerlib/messages.py#L96-L106
Second, targets that want to add metadata columns will parse that from record metadata and put it into a column in the target. (Meaning, when coming from the tap, "extracted at" technically it is part of the record message but not part of the record data.)
https://github.com/meltano/sdk/blob/304d1232f286a316567786846c0fa673e27d4323/singer_sdk/sinks/core.py#L235aaronsteers
04/07/2023, 4:55 PMaaronsteers
04/07/2023, 4:57 PMrecord["_sdc_extracted_at"] = message.get("time_extracted")
Matt Menzenski
04/07/2023, 4:59 PMaaronsteers
04/07/2023, 5:04 PMaaronsteers
04/07/2023, 5:04 PMaaronsteers
04/07/2023, 5:10 PM_sdc_extracted_at
as you are doing, since you won't otherwise have access to that data when you are later creating the message to send to the target.
But then here you can pop it from the record and put it into the "time_extracted" field in the Message itself here.
record_message = singer.RecordMessage(
stream=stream_map.stream_alias,
record=mapped_record,
version=None,
time_extracted=mapped_record.pop("_sdc_extracted_at", utc_now()),
)
This overrides the Message default (currently "utc_now()") and then will be used by the target when creating _sdc_extracted_at
downstream.aaronsteers
04/07/2023, 5:11 PMaaronsteers
04/07/2023, 5:12 PMpop()
or get()
it at the top of that private method - since the other methods above it will remove/clean unknown or deselected fields.aaronsteers
04/07/2023, 5:15 PMaaronsteers
04/07/2023, 5:15 PMaaronsteers
04/07/2023, 5:16 PMMatt Menzenski
04/07/2023, 6:54 PMMatt Menzenski
04/07/2023, 6:55 PM