Reuben (Matatika)
04/10/2025, 3:19 AMrequest_records
entirely). Thought about setting up a date range parent stream to pass dates through as context, which I think would solve the state update part of my problem, but it felt counter-intuitive to how parent-child streams should work and incremental replication in general (I would end up with a bunch of interim dates stored in state as context, as well as the final date as replication_key_value
)visch
04/10/2025, 1:17 PMquery_start_date
(or something like that) to your schema for that stream
4. Set replication key equal to query_start_date
5. Do the requests in order from oldest to newest
If the stream gets killed in the middle of this you'd set start_date equal to the replication key start_date that you had and remake the chunks from that day forward so if it failed in the middle of 2024-06-10 it repeat that chunk
My guess is you're trying to parallelize this and do multiple chunks at a time? Is that right?Siddu Hussain
04/10/2025, 2:15 PMSiddu Hussain
04/10/2025, 2:15 PM<S3://back_fill/delete/>
which i change back to actual pipeline path and state-id when I have to resume daily syncsSiddu Hussain
04/10/2025, 2:18 PMReuben (Matatika)
04/10/2025, 3:02 PMIf the stream gets killed in the middle of this you'd set start_date equal to the replication key start_date that you had and remake the chunks from that day forward so if it failed in the middle of 2024-06-10 it repeat that chunkThe SDK seems to finalise state after processing each partition and the total stream - in this case, I want it to finalise after each chunk (i.e. paginator.advance).
visch
04/10/2025, 3:03 PMis_sorted=True
it definitely syncs along the wayvisch
04/10/2025, 3:04 PMReuben (Matatika)
04/10/2025, 3:19 PMis_sorted=True
I don't know if that data I have within a given chunk is sorted though, so is that applicable here? The SDK throws an error if the replication key value of a record is less than the previous if that is set IIRC.
> Finalizing isn't the only time state is sent
Sure - maybe my understanding (and use of terminology) is wrong here, but isn't the act of "finalising" state what promotes a replication_key_value
from a progress marker to a bookmark? I believe get_starting_replication_key_value
only respects the bookmark...Siddu Hussain
04/10/2025, 3:24 PMvisch
04/10/2025, 3:38 PMI don't know if that data I have within a given chunk is sorted though, so is that applicable here? The SDK throws an error if the replication key value of a record is less than the previous if that is set IIRC.You know that your chunks are sorted. That's my suggestion is to use your start_date chunk (or one of the dates you're using to chunk with) as your replication key, that's 100% sorted
but isn't the act of "finalising" state what promotes aI'm not 100% certain of all the internals of when progress markers get bumped up and around but I'm certain that
is_sorted=True
allows you to resume where you left offvisch
04/10/2025, 3:39 PMReuben (Matatika)
04/10/2025, 4:05 PMis_sorted = True
, I hit singer_sdk.exceptions.InvalidStreamSortException
(implies chunks are unordered).
Without it, the stream runs but nothing gets written to the Meltano state backend (default systemdb
) until all chunks have been processed (i.e. stream sync is complete).visch
04/10/2025, 4:13 PMReuben (Matatika)
04/10/2025, 4:24 PM2025-01-01
to 2025-02-01
) are not ordered, which is why I hit the above exception. The chunks themselves are processed in sequential date order
1. 2025-01-01
to 2025-02-01
2. 2025-02-01
to 2025-03-01
3. 2025-03-01
to 2025-04-01
4. 2025-04-01
to current date
, so no issue there (end date is exclusive in this case).
class ExportPaginator(BaseAPIPaginator):
"""Export paginator."""
@override
def __init__(self, start_date: datetime | None) -> None:
super().__init__(start_date and self._get_date_range(start_date))
@override
def get_next(self, response):
params = dict(parse_qsl(urlparse(response.request.url).query))
if "endDateTime" not in params:
return None # end pagination
prev_end_date = datetime.fromisoformat(params["endDateTime"]).astimezone(
tz=timezone.utc
)
return self._get_date_range(prev_end_date)
@staticmethod
def _get_date_range(start_date: datetime) -> tuple[datetime, datetime | None]:
date_offset = timedelta(days=28) # TODO: make this configurable
end_date = start_date + date_offset
if end_date >= datetime.now(tz=timezone.utc):
end_date = None
# `startDateTime` is inclusive, `endDateTime` is exclusive
return start_date, end_date
visch
04/10/2025, 4:38 PM2025-01-01
(your start_date on one of those chunks) to be a value of a column in your recordReuben (Matatika)
04/10/2025, 5:11 PMcreatedAt
property: https://github.com/Matatika/tap-iterable-msdk/blob/f2c0480e98ea7b9a19bb49c84df6792b9fde7691/tap_iterable/streams.py#L226Reuben (Matatika)
04/10/2025, 7:32 PMcreatedAt
from the API)
4. Set is_sorted = True
Do I have that right? It sounds like it should work, although I don't like that I have to add a new property to capture the start date from the request to achieve this... Feels like there could be an easier way.visch
04/10/2025, 7:50 PMclient.py
you are talking about in this slack message.
re https://meltano.slack.com/archives/C068YBV6KEK/p1744313570123959?thread_ts=1744255144.140489&cid=C068YBV6KEK
You've got the idea!
Generally the replication_key concept ties itself to a column 🤷 doesn't mean you couldn't make something else work but that was the gist so I roll with itvisch
04/10/2025, 7:51 PM_sdc_start_date
or something field so it's obviousReuben (Matatika)
04/10/2025, 7:51 PMReuben (Matatika)
04/10/2025, 7:55 PMcreatedAt
), which is not compatible with what I want to do here.Reuben (Matatika)
04/10/2025, 7:56 PMEdgar RamĂrez (Arch.dev)
04/10/2025, 11:28 PMStream.increment_state_from_record(record, context)
, which would be a new SDK method, a no-op. Then, another new method (e.g. Stream._increment_state(value, context)
could be called to update the state at any point.Reuben (Matatika)
04/10/2025, 11:44 PMrequest_records
entirely)?Edgar RamĂrez (Arch.dev)
04/10/2025, 11:52 PMafter_page
method that you could override?Reuben (Matatika)
04/11/2025, 12:07 AMself._finalize_state
after each page (date chunk) has been processed, to "persist" the state back to Meltano. Currently, I don't see any state saved until the stream finishes syncing or the tap process ends, which I'm assuming is as a result of self._finalize_state
calls in the base Stream
class.Reuben (Matatika)
04/11/2025, 12:15 AMSTATE
messages emitted by a tap.Chad
04/16/2025, 5:48 AMEdgar RamĂrez (Arch.dev)
04/16/2025, 7:41 PMEdgar RamĂrez (Arch.dev)
04/16/2025, 7:51 PMIt's not clear to me how/when exactly Meltano handles state updates fromMeltano processes every state payload that's emitted by the target. Targets have different criteria for when they emit state, but it's usually after committing data to the destination system (e.g. after a successfulmessages emitted by a tap.STATE
INSERT INTO
). I really want to add a nice and clear (graphical?) explanation of this somewhere in the docs.
For the concrete problem of giving the developer more control of when the tap finalizes and emits state, I think we can tackle piecemeal.