What are best practices for setting `_sdc` metadat...
# singer-tap-development
m
What are best practices for setting
_sdc
metadata fields in a tap? I haven’t found much in the way of examples of this (tap-singer-jsonl is one example). The documentation makes it seem like it’s something that’s handled by targets, but I would like to set it in my tap. (Specificially, my tap’s records can include the MongoDB cluster timestamp, which I’d like to map to the
_sdc_extracted_at
field). Is it as simple as adding a boolean
add_record_metadata
setting to the tap and setting that field if this config property is true?
a
The answer varies a bit depending on which field(s) you're looking for. Let me check on the extract-at timestamp piece you mention...
m
Copy code
operation_type = record["operationType"]
                        if operation_type not in operation_types_allowlist:
                            continue
                        cluster_time: datetime = record["clusterTime"].as_datetime()
                        parsed_record = {
                            "_id": record["_id"]["_data"],
                            "document": record["fullDocument"],
                            "operationType": operation_type,
                            "clusterTime": cluster_time.isoformat(),
                            "ns": record["ns"],
                        }
                        if should_add_metadata:
                            parsed_record[
                                "_sdc_extracted_at"
                            ] = cluster_time.isoformat()
                            parsed_record[
                                "_sdc_batched_at"
                            ] = datetime.utcnow().isoformat()
                            if operation_type == "delete":
                                parsed_record[
                                    "_sdc_deleted_at"
                                ] = cluster_time.isoformat()
                        yield parsed_record
is what I have now
a
I see. Yes, so I believe the implementation for
_sdc_extracted_at
should be happening automatically. There are two parts of this process. First, the
RecordMessage
type contains a property above the level of the data called "time_extracted". https://github.com/meltano/sdk/blob/304d1232f286a316567786846c0fa673e27d4323/singer_sdk/_singerlib/messages.py#L96-L106 Second, targets that want to add metadata columns will parse that from record metadata and put it into a column in the target. (Meaning, when coming from the tap, "extracted at" technically it is part of the record message but not part of the record data.) https://github.com/meltano/sdk/blob/304d1232f286a316567786846c0fa673e27d4323/singer_sdk/sinks/core.py#L235
Can you tell me: what's the "cluster time" mean in this context? Is it meaningfully different from current time on the runner?
It you want to override the default time that would be put on a record message, you probably need to change it in the record message. This line (second link in my message above) would overwrite it in the record (presumably) if the target applies the message "time_extracted" over what you've specified as "_sdc_extracted_at".
Copy code
record["_sdc_extracted_at"] = message.get("time_extracted")
m
in this context, I’m reading from the MongoDB change stream - the cluster time on a change stream event is the timestamp that corresponds to the event. If the tap is processing an event “live” it should equal the current time but if we’ve resumed a change stream and are processing events from the past the cluster time on those events will not equal current time.
a
I think I understand now. While it's not the time that we are parsing the record or that the tap is "extracting" it - it is the time that the change event was modified and therefor emitted or "extracted" from the upstream table into the change stream. 🤔 👍 Yeah, I can see that making sense - and I can see the reason for wanting that "timestamp when it entered the change stream" to be ingested. 👍
Let me check if there's a good way to override what is used in the Record message...
What I'd do is probably go ahead and add
_sdc_extracted_at
as you are doing, since you won't otherwise have access to that data when you are later creating the message to send to the target. But then here you can pop it from the record and put it into the "time_extracted" field in the Message itself here.
Copy code
record_message = singer.RecordMessage(
                    stream=stream_map.stream_alias,
                    record=mapped_record,
                    version=None,
                    time_extracted=mapped_record.pop("_sdc_extracted_at", utc_now()),
                )
This overrides the Message default (currently "utc_now()") and then will be used by the target when creating
_sdc_extracted_at
downstream.
It's not pretty because you have to override that private method. But that should do the trick. Probably on the SDK side, we could add a method there that is easier for users to override...
Actually, you'll probably need to
pop()
or
get()
it at the top of that private method - since the other methods above it will remove/clean unknown or deselected fields.
snippet.py
Would be something like this... 👆
We'd want to get others to look that over, but I don't see any problem with that being the default implementation in the SDK. The case we're dealing with is one where the tap developer is clearly sending an override for "extracted at", and the above would apply if available - and otherwise use the default (current time).
m
thank you! I will test this out this weekend
hoping to start running this new tap next week on both MongoDB and DocumentDB sources 🚀