Hey folks, Would appreciate some guidance on how t...
# singer-tap-development
h
Hey folks, Would appreciate some guidance on how to progress with my Singer tap implementation. If you look at my previous message above this one you'll see I'm building a REST based tap. I'm not sure how to deal with state in my situation. The data I am pulling is aggregated report data which is keyed on a Date and some other columns. I'd like to run my pipeline once per day and retrieve data for the past 7 days so I can backfill values and account for late arriving data. As I understand, the SDK doesn't offer a way of dealing with this use case directly. Since there is no single replication key using the default SDK interface doesn't work for me. Instead I need to manually work with the state object to store the last run date and derive my start/end dates based off of this. IMO this is the most robust approach and ensures that at the target I overwrite records based on a combination columns I designate as a my primary key. Alternatively I can avoid state altogether and just set the start and end dates for the API based on config values which I pass via env vars. If the first approach, using State, is best than can you advise on where to start with this in terms of implementation. Just a pointer in the right direction would be very helpful.
v
"To account for late arriving data" What rule do you want to use for this? It sounds like you want to say data within 7 days from today could come in late. In the tap I'd just never allow the date to go beyond today()-7 . ie if the tap ran today the state would get set to 2022-03-15 How do you do this? Well state is set based on replication keys, so the "record" object needs to have the replication_key set to that data. A managed replication_key like
_sdc_replication_date
would work. https://github.com/AutoIDM/tap-indeed/blob/main/tap_indeedsponsoredjobs/streams.py#L128 Is different than your ask but may provide some context Probably other options but that's the first thing I thought about!
If your company controls this source I'd push back a bit as this whole thing can be side stepped with an appropriate
updated_at
field
h
Thanks @visch. I don't control the source unfortunately. It's AppsFlyer's aggregated data API, for which there isn't a Singer tap currently. On the 7 day window - there's no solid rule I have. We've just determined that 7 days is a good amount of days to refresh data for in our use case. So each time the tap runs it should retrieve data for the last seven whole days relative to today, or some relative date that I can pass as config. Perhaps it would help if I described what the desired outcome is in the target. I want to be able to fetch data for some
Date-MediaSource-Campaign
combination (this is what is set as primary key in the stream) such that we overwrite that data (where necessary) for seven days. So if a record has a Date value 21/3/22, we would "refresh", and attempt to overwrite that value in our target until 29/3/22 (after which we wouldn't care about the 21/3/22). From what you've said and the example I'm a bit clearer but still not sure on how to proceed. I understand now that a replication key is required but it isn't clicking what that should be. Are you suggesting it should be
sdc_replication_date
, which is just the date the replication attempt was made?
I think part of the issue I have is understand the flow of date from state into the stream and vice versa. Not sure where the state is originally coming from and when it's being written to.
v
I'd recommend writing a tap with a replication key and playing with it. This is definietly doable. The next thing you should do is write out exactly what requests you'd like to see happen to your source api and when you'd like them to happen ie "state is null, start_date is x, the following api calls should happen" "state has replication_date set to today, the following api calls should happen" Get detailed about it, and you can definitely make it happen. At this point I"d write those out and just code it
a
It comes from
--state path/to/state.json
, which is how any singer tap gets its state. Meltano does it for you so you don't see it. The
state.json
itself is derived from the `target`'s stdout stream. A target receives and buffers
STATE
messages from the tap, whenever the target decides to "commit" data to the data store, it typically "propagates" the buffered state to stdout. Meltano scrapes the stdout for the last propagated state message. This is target specific, some targets will propagate state immediately, some buffer it. What it sounds to me like you want is very simply a configurable lookback window. Thats pretty common. I would add it to your taps
config
options so the user can adjust for their specific use case. The actual implementation will just use the builtin SDK method for get starting replication key value and subtract a timedelta based on your configured lookback. And I think thats about all you need @hawkar_mahmod
h
@alexander_butler thank you for that, it's very clarifying. I've included the lookback window as a config option so it can be adjusted as necessary. At the moment when I set the config value for the start_date in the Meltano config (inside the tap project) it doesn't override the value that comes from state. Since Meltano passes the
--state
argument to Singer it seems as though this is by design and the only way to override the state value is to do so in the implementation of the stream, either by overriding
get_starting_timestamp
or resolving to the desired value elsewhere. The docstring of
get_starting_timestamp
states:
Developers should use this method to seed incremental processing for date
and datetime replication keys. For non-datetime replication keys, use
meth`~singer_sdk.Stream.get_starting_replication_key_value()`
Which isn't possible given that Meltano always passes --state in. Have I got that right?
a
You should just call
get_starting_timestamp
like you normally would, and afterwards, apply the timedelta subtraction. Do that anywhere your tap uses the state. There shouldn't be any need to do any overrides.
Oh I mightve misread, is this a different issue your referring to?
yeah start date is only used in lieu of state otherwise state would be used I think
you cannot forcibly change the start date via config if state exists and is passed. You either need to manually update the state file or clear the state (delete it) since presumably your start_date is newer
v
I think you can use the window idea Alex has. Just take the value you get from
get_starting_timestamp
and if the date is today (2023-03-24) subtract your window (sepearte config maybe call it
window
) from that (7 days as a default?) and then query your system.
h
@alexander_butler the reason why I want to ignore state in certain situations is so that I can perform a historical import, or to manually backfill dates in the future if there are issues encountered for a particular date range. I know that by setting a replication key we automatically go into incremental import mode according to the SDK docs. But it's not clear what this means or how to get opt out of it when we don't want to depend on the state?
a
Thats not something the tap handles. It’s totally outside of it. You would just clear the state if you are using Meltano or not pass state if you are running it directly. Also I think meltano has a —full-refresh flag which might even be more on the nose. So basically, leave your tap alone if it is good lol.
v
catalog.json
is where you control how a stream syncs. https://hub.meltano.com/singer/spec#catalog-files Taps take in config, catalog, and state.