Just want to check— the SDK docs claim that the SD...
# singer-tap-development
q
Just want to check— the SDK docs claim that the SDK will fully handle the override of a primary key via a stream-map override, including transforming them etc. The tap developer only needs to add two items to the tap config:
stream_maps
and
stream_map_config
. Based on my read of that part of the docs, I’d expect that, with a tap that has those config options, I should be able to: • make a new column
col_a
via stream_maps, • override the incremental key of that stream to be
col_a
, by setting
_"__key_properties__": ['col_a']
, and then • load that table incrementally (on
col_a
, since it’s overridden to be the incremental key). Is that correct? Or am I misunderstanding the SDK’s features?
a
A stream is incremental based on a replication_key, not key_properties. The right place to override a replication key for a tap would be the catalog as that is what provides arguments to instantiate a Stream usually.
q
Ah thank you! That’s a really helpful clarification. I guess then… does the SDK not enable incremental loading on a column created in a stream-map?
a
I couldn't remember if the incremental
replication_key
was overridable in stream maps. I checked the docs and I don't see any mention of the capability. https://sdk.meltano.com/en/latest/stream_maps.html#unset-or-modify-the-stream-s-primary-key-behavior As I think more on this, there may be an order-of-operations challenge here, where the stream maps being applied after records are generated are not higher enough in the stack to override how the tap actually deals with incremental bookmarks.
q
Dang. I guess I didn’t understand that primary key was being used in the specific meltano sense and that that was not applicable to incremental loading.
I spoke with @taylor about this a while back… this is a huge blocker for us with Meltano, which I thought the SDK had solved. But I guess we still can’t load on any special logic whatsoever, even something as simple as a concat of two columns.
a
We do have a few possible solutions here. I'm in meetings today but will try to loop back this afternoon.
a
Just a note that the concept of something being incremental is built into the tap, not the target. This means it might be easier to just fork or PR the tap that is your blocker. If you think of the tap as its own island, it gets a JSON object of previous state + catalog, and figures out what needs to be pulled.
But I guess we still can’t load on any special logic whatsoever, even something as simple as a concat of two columns.
It helps if you are specific with the tap you have in mind. I cannot imagine a concat being used for replication vs a timestamp-like field. But the SDK handles non timestamp incremental stuff too if for example a job id, page number, offset, or string id drives how to get new data from the source. But lets take a step back... It almost feels like what you are talking about is deduplication. IE I pull data incrementally but I want to make sure a hash of two fields is unique in the target. Which is significantly different problem that most of us solve with dbt but some targets can use key_properties for merges.
q
Awesome yeah I’m thinking there’s probably a hacky-ish way to do this in a fork of the tap. Actually just opened an issue describing what I’m trying to do here https://github.com/meltano/meltano/issues/7311. For my use case, it really is about loading incrementally on some slightly unusual statements rather than a specific existing column.
a
Makes sense @quinn_batten Do you have any control of the DB? Maybe a view with a computed column? Otherwise yes I totally see how this would need to be supported in the tap and is sort of specific. I think the view would be less work. Replication key in a Singer catalog is a string value (IE not a list). https://github.com/meltano/sdk/blob/main/singer_sdk/streams/sql.py#L186 This is using the fact that a string is expected and grabbing the
Column
object by key from the sqlalchemy table. So it would need to be hacked at to instead use
sqlachemy.func.coalesce
or something with multiple columns. I am not sure if passing a list into the catalog replication key would fly though.
q
Nope, no control whatsoever of source DB 😭. Ohhh that’s super helpful too, I’ll start there. Thanks for all this, this is great!
a
Okay, hi - I'm back! All of @alexander_butler’s points are right on. I can speak pretty confidently though that plural incremental keys (aka
replication_key
) is not supported in SDK, and probably not in other implementations either. 🙁 @quinn_batten - While I was thinking of really clever ideas, I had to pause to make sure I first ask the boring one: Since you mentioned in the issue of your use cases is Postgres, do you have the ability to use Postgres log-based replication instead of column-based replication? In that case, you wouldn't need to deal with the functional gap with neither updated-at nor created-at giving you the desired increment.
q
I’ll look into it further; we considered that when this issue first came up but IIRC we weren’t able to do logical replication for some reason. Probably worth a longer convo w eng though
a
Gotcha. Yeah, log-based is always the recommended approach if you're able to use it (for database taps, that is). Any type of
updated_at
column you pick for a replication key will sometimes have a case where an admin or someone changed/fixed rows on the backend without incrementing the timestamp column. And log-based has the advantage of working even when there is no
updated_at
-like column on the table.
q
Cool, thank you both. I’ll look into whether log-based will work for us, and if not, I’ll hack something together in my fork of tap-postgres. Thanks AJ!!
a
Cool. Happy to help! If that doesn't work out, there are a few alternatives I can share - but as of now those all would require additional dev work in the SDK or in the specific tap.