Is there a recommended approach to handling API en...
# singer-tap-development
r
Is there a recommended approach to handling API endpoints that support start and end dates in chunks (i.e. sliding window) and emitting/finalising state for each? The API we are dealing with supports exporting data in this way - and there is quite a lot of it - so dividing up into multiple requests and keeping track of the start date in state would be ideal. I implemented it mostly in a custom paginator class, but was not able to find how/where to apply state operations after each "page" (without overriding
request_records
entirely). Thought about setting up a date range parent stream to pass dates through as context, which I think would solve the state update part of my problem, but it felt counter-intuitive to how parent-child streams should work and incremental replication in general (I would end up with a bunch of interim dates stored in state as context, as well as the final date as
replication_key_value
)
đź‘€ 1
v
Normally I'd do this, but I"m guessing you're asking for something different and I'm just not catching the why you're asking for something different so let me know 1. start_date set to something like today()-1y (for initial run) 2. Create chunks in the "window" you mentioned of something like 1 month chunks up to today() - 1y which would give you 12 chunks, chunk[0] = "2024-04-10" , chunk[1] = "2024-05-10" .. etc 3. Add
query_start_date
(or something like that) to your schema for that stream 4. Set replication key equal to
query_start_date
5. Do the requests in order from oldest to newest If the stream gets killed in the middle of this you'd set start_date equal to the replication key start_date that you had and remake the chunks from that day forward so if it failed in the middle of 2024-06-10 it repeat that chunk My guess is you're trying to parallelize this and do multiple chunks at a time? Is that right?
s
Untitled.py
I did it recently and it was a bit non-conventional approach. here is what I have done : • Build my tap as a daily run one it will not worry about whether I am calling for a year of data or hourly it will run it within the page size limit and complete the process page after page. This is used for daily runs • in case I need to backfill I have a Python script that creates sub-processes and executes "melatno el" • here is a sample, not pretty work but does the job. • Please note you have to store your state in a different location or have a new state created for this runs in my case I store them in S3 I gave them a path like
<S3://back_fill/delete/>
which i change back to actual pipeline path and state-id when I have to resume daily syncs
it's not fully automated to rerun, but in my case, I don't have access to any orchestration tools, this might be simple if we can auto-start failed ones as for 1-year data divided 3 hours I see 100 failures so it can be painful to resume all failures
r
@visch No, not trying to parallelize. Moreso this part:
If the stream gets killed in the middle of this you'd set start_date equal to the replication key start_date that you had and remake the chunks from that day forward so if it failed in the middle of 2024-06-10 it repeat that chunk
The SDK seems to finalise state after processing each partition and the total stream - in this case, I want it to finalise after each chunk (i.e. paginator.advance).
v
If you set state to
is_sorted=True
it definitely syncs along the way
Finalizing isn't the only time state is sent
r
>
is_sorted=True
I don't know if that data I have within a given chunk is sorted though, so is that applicable here? The SDK throws an error if the replication key value of a record is less than the previous if that is set IIRC. > Finalizing isn't the only time state is sent Sure - maybe my understanding (and use of terminology) is wrong here, but isn't the act of "finalising" state what promotes a
replication_key_value
from a progress marker to a bookmark? I believe
get_starting_replication_key_value
only respects the bookmark...
s
@visch - when you say create chunks is it possible implicitly to create chunks in meltano and run the pipeline or it’s something we control externally and call pipeline like meltano el start-date=“xx” like what I did , I am curious if we have a internal way to do it in meltano
v
We're getting close @Reuben (Matatika)
I don't know if that data I have within a given chunk is sorted though, so is that applicable here? The SDK throws an error if the replication key value of a record is less than the previous if that is set IIRC.
You know that your chunks are sorted. That's my suggestion is to use your start_date chunk (or one of the dates you're using to chunk with) as your replication key, that's 100% sorted
but isn't the act of "finalising" state what promotes a
I'm not 100% certain of all the internals of when progress markers get bumped up and around but I'm certain that
is_sorted=True
allows you to resume where you left off
@Siddu Hussain what you're talking about doesn't sound like it's the same as this thread so I'm going to just focus on Rebuens stuff, maybe another thread for your question
r
With
is_sorted = True
, I hit
singer_sdk.exceptions.InvalidStreamSortException
(implies chunks are unordered). Without it, the stream runs but nothing gets written to the Meltano state backend (default
systemdb
) until all chunks have been processed (i.e. stream sync is complete).
v
@Reuben (Matatika) you can order your chunks right? Start with the oldest and move forwards one chunk at a time
r
To clarify: it appears the records returned within a chunk of a given start/end date (i.e.
2025-01-01
to
2025-02-01
) are not ordered, which is why I hit the above exception. The chunks themselves are processed in sequential date order 1.
2025-01-01
to
2025-02-01
2.
2025-02-01
to
2025-03-01
3.
2025-03-01
to
2025-04-01
4.
2025-04-01
to current date , so no issue there (end date is exclusive in this case).
Copy code
class ExportPaginator(BaseAPIPaginator):
    """Export paginator."""

    @override
    def __init__(self, start_date: datetime | None) -> None:
        super().__init__(start_date and self._get_date_range(start_date))

    @override
    def get_next(self, response):
        params = dict(parse_qsl(urlparse(response.request.url).query))

        if "endDateTime" not in params:
            return None  # end pagination

        prev_end_date = datetime.fromisoformat(params["endDateTime"]).astimezone(
            tz=timezone.utc
        )
        return self._get_date_range(prev_end_date)

    @staticmethod
    def _get_date_range(start_date: datetime) -> tuple[datetime, datetime | None]:
        date_offset = timedelta(days=28)  # TODO: make this configurable
        end_date = start_date + date_offset

        if end_date >= datetime.now(tz=timezone.utc):
            end_date = None

        # `startDateTime` is inclusive, `endDateTime` is exclusive
        return start_date, end_date
v
We're still not talking about the same thing You need
2025-01-01
(your start_date on one of those chunks) to be a value of a column in your record
r
Sorry, I assumed that was a given here. It is configured as the
createdAt
property: https://github.com/Matatika/tap-iterable-msdk/blob/f2c0480e98ea7b9a19bb49c84df6792b9fde7691/tap_iterable/streams.py#L226
I think I finally got what you are saying: 1. Define a new property on the stream schema for the start date 2. Set the stream replication key to the new start date property 3. Populate each record with the replication key value from the request start date param (previously
createdAt
from the API) 4. Set
is_sorted = True
Do I have that right? It sounds like it should work, although I don't like that I have to add a new property to capture the start date from the request to achieve this... Feels like there could be an easier way.
v
Looked at your code quick and I don't see the custom paginator in
client.py
you are talking about in this slack message. re https://meltano.slack.com/archives/C068YBV6KEK/p1744313570123959?thread_ts=1744255144.140489&amp;cid=C068YBV6KEK You've got the idea! Generally the replication_key concept ties itself to a column 🤷 doesn't mean you couldn't make something else work but that was the gist so I roll with it
Ends up being nice for debugging as well, and I just throw it into a
_sdc_start_date
or something field so it's obvious
👍 1
r
Yeah, just testing the paginator change locally at the moment.
It just feels odd that there is already a property on the stream that is already used for replication (
createdAt
), which is not compatible with what I want to do here.
(as in it is already tied to a column, as you put it)
e
@Reuben (Matatika) can you create an issue? (or ping me in one if it already exists) I think we could expose a way of incrementing the state "manually" at arbitrary points during the stream sync instead of only for each record. With this, I'm imagining for your use case we'd make an
Stream.increment_state_from_record(record, context)
, which would be a new SDK method, a no-op. Then, another new method (e.g.
Stream._increment_state(value, context)
could be called to update the state at any point.
r
@Edgar RamĂ­rez (Arch.dev) I can do that. In regards to your proposal, I don't know if I'd consider "on page end" an arbitrary point here - would it not be generally beneficial to increment state here? Even so, how as a developer would I be able to manually call the proposed method(s) if the SDK does not also expose some stream lifecycle hook at that point (without overriding
request_records
entirely)?
e
> does not also expose some stream lifecycle hook at that point I'm not sure I understand what that means. You mean something like an
after_page
method that you could override?
r
Yes - at least that is the theory: I want to call
self._finalize_state
after each page (date chunk) has been processed, to "persist" the state back to Meltano. Currently, I don't see any state saved until the stream finishes syncing or the tap process ends, which I'm assuming is as a result of
self._finalize_state
calls in the base
Stream
class.
I am operating off my own assumptions here though, so my understanding could be way off. It's not clear to me how/when exactly Meltano handles state updates from
STATE
messages emitted by a tap.
c
I've hit these same issues recently while trying to page through the Zoom Phone API which only allows date ranges of up to a single month and then returns the data in pages for that window in an unsorted stream. I too ended up with an ugly Paginator class URL param calculation https://github.com/chadcampling-opteon/tap-zoomphone/blob/main/tap_zoomphone/client.py One issue that I did bump into in testing that I couldn't work around was that if my request returned no results however I wasn't at the end of the paging just a sub page then it would kill off the stream. In practise I'm not seeing this with production data but it was still something that could come up. Would love to follow any enhancements that come to the pagination or SDK out of this discussion
e
For pagination breaking when an empty page is received: https://github.com/meltano/sdk/issues/2980
It's not clear to me how/when exactly Meltano handles state updates from
STATE
messages emitted by a tap.
Meltano processes every state payload that's emitted by the target. Targets have different criteria for when they emit state, but it's usually after committing data to the destination system (e.g. after a successful
INSERT INTO
). I really want to add a nice and clear (graphical?) explanation of this somewhere in the docs. For the concrete problem of giving the developer more control of when the tap finalizes and emits state, I think we can tackle piecemeal.