New tap for Prefect flow and task runs: <https://g...
# singer-tap-development
h
New tap for Prefect flow and task runs: https://github.com/radbrt/tap-prefect The API isn’t well-suited for the state, which means a full-refresh is the best option for the time being. I will talk with the Prefect people about that, but in the meantime, are there any examples of using current timestamp as the state bookmark?
p
@Henning Holgersen If I understand your question I think you can just call get_starting_timestamp to get the state bookmark/start_date/None (in that order) then if you get None you can default to current date. Also I think you could exclude the
state
capability from the plugin definition so Meltano knows to not attempt to store/retrieve state for that tap.
h
Thanks! I stumbled across that function but I think I misunderstood what it was. How does it work with setting
replication_key
in a stream then, if I just use current timestamp instead of a high-watermark attribute from the source? There can quite a bit of data, so not making any attempt at state might be impractical. But the Prefect people were interested in adding the needed functionality to the API, so a better state system might be coming.
(and basically what I am aiming for now is something like reading all records timestamped starting 24 hours before the last time meltano run ended).
p
Whats the challenge with the API? I'm not positive this answers your question but I'll try: My understanding is that replication_key is the attribute thats tracked by the tap and emitted as the bookmark value and ultimately stored by meltano between runs. On subsequent runs the SDK makes that bookmark available to you but in most cases (if not all) it doesnt actually do anything with it for you. You need to use that value when building your API request url to filter your dataset or whatever your source needs to limit data. Although if you dont implement logic to use your bookmark, it sounds like thats what youre doing now, then the tap will basically be running a full refresh each time. I think the main challenge is if you use current timestamp vs the current bookmark when requesting data then you'd be at risk of skipping data if your sync didnt run in the expected time frame.
Semi relevant are these issues where we were talking about relative start/end dates https://github.com/MeltanoLabs/tap-cloudwatch/issues/27 and https://github.com/meltano/sdk/issues/922
h
Yup, that is definetely what it seemed to be doing when I tried it out. Really simple, almost automagical. the only thing I needed was to override the method to prepare the POST payload, so that it would actually include the state in the appropriate place in the request. I think I realized a solution while formulating the problem rubber duck. The issue is that records from the API are first created (started), then updated. I am able to filter and sort by the start_time attribute, but when a flow ends it is updated and gets an end time, run results etc, and I have no way of getting recods that have been updated after some timestamp. Perhaps the easiest solution for now is to use start_time as replication_key, but subtract 24 hours from it when using it in the request. That way, I will catch updated ecords for all flows that takes less than 24 hours. I had been thinking of using only the system timestamp from when the flow ran as the bookmark, but that would require me to override how the bookmark is written.