omar_abed
04/20/2023, 1:45 PMdan_ladd
04/20/2023, 1:50 PMtaylor
04/20/2023, 1:53 PMomar_abed
04/20/2023, 2:07 PMpat_nadolny
04/20/2023, 2:11 PMomar_abed
04/20/2023, 2:20 PMpat_nadolny
04/20/2023, 3:10 PMomar_abed
04/20/2023, 4:08 PMprratek_ramchandani
04/20/2023, 4:09 PMprratek_ramchandani
04/20/2023, 4:09 PMomar_abed
04/20/2023, 4:10 PMpat_nadolny
04/20/2023, 4:11 PMprratek_ramchandani
04/20/2023, 4:12 PMprratek_ramchandani
04/20/2023, 4:12 PMprratek_ramchandani
04/20/2023, 4:12 PMpat_nadolny
04/20/2023, 5:43 PMis_sorted
property to true. My understanding is that the SDK knows streams arent safe to resume if interrupted if they arent sorted but if they are sorted then you can resume. Right now it looks like your tap is telling the SDK that the streams are not sorted so it wont let you resume. The stripe API sounds like its reverse sorted but by using your chunking logic you can manually paginate to return chunks in sorted order at the chunk level (but within a chunk they arent necessarily sorted). Is that right? So technically your replication key is the latest chunk time and as long as youre emitting state only after all records in that chunk are processed then I think you're safe to consider this is_sorted=true
. I think your logic in the tap already does all of this properly (from a quick skim, definitely test it though) so its just a matter of letting the SDK know that youre handling the chunking and emitting of replication keys to simulate sorting, so its safe to resume. Does that make sense?pat_nadolny
04/20/2023, 5:43 PMpat_nadolny
04/20/2023, 5:46 PMcheck_sorted
setting because it was keeping a max timestamp then if anything older showed up it would throw an error since it detected an out of sort order timestamp. I understood this as a safe guard that can be disabled if you're doing something custom and know that its safe to do so.pat_nadolny
04/20/2023, 5:46 PMedgar_ramirez_mondragon
04/20/2023, 8:13 PMis_sorted = False
on the stream should be enough. We don’t have any guides or examples on that so if that works, it’d be great to evaluate the dev experience of signposts and improve the docs.pat_nadolny
04/20/2023, 8:43 PMedgar_ramirez_mondragon
04/20/2023, 8:57 PMis_sorted = False
and call retrieve the signpost with Stream.get_replication_key_signpost()
to set the relevant params or headersprratek_ramchandani
04/20/2023, 9:02 PMpat_nadolny
04/20/2023, 9:16 PMis_sorted
property is defaulted to False and not overriden by the tap so that is already set properly. Can you elaborate on how the get_replication_key_signpost
approach would work? How is the state replication key different than the signpost? The issue Prratek is having is that after an interrupted sync the SDK is throwing away the replication key because its not resumable. Right now the tap is manually telling the stream when to try to emit state in https://github.com/prratek/tap-stripe/blob/af95705f7cca624ea179ca49a2d46a9c05530ed2/tap_stripe/streams.py#L147, is that no longer needed?omar_abed
04/25/2023, 1:51 PMis_sorted
is defaulted to False (like Pat said). I'll take a look at the Signposts (like Edgar suggested), but I don't fully understand how it's different than the replication key, so I'll spend some time on that.omar_abed
04/25/2023, 8:38 PMget_replication_key_signpost
function seems like it's used in combination with the replication key to set an end goal post. I set it manually. It looks like our issue is that finalize_state_progress_markers
function doesn't actually emit an updated state value. When that function gets called, I do a check by running meltano state get <my_elt_pipeline_job>
from the command line and I don't see any state variables set. However, when I check our state with Python using the stream_state
and get_context_state
functions, I'm seeing a consistent format:
stream_state: {'replication_key_signpost': 1682454007, 'starting_replication_value': '2022-04-01T00:00:00', 'replication_key': 'created', 'replication_key_value': 1648771370}
get_context_state: {'replication_key_signpost': 1682454007, 'starting_replication_value': '2022-04-01T00:00:00', 'replication_key': 'created', 'replication_key_value': 1648771370}
I tried passing that format into our finalize_state_progress_markers
function, but seems like the state isn't being persisted. (And all this is after setting is_sorted
to True and check_sorted
to False.)
Is there another way to manually set state with the sdk?omar_abed
04/27/2023, 7:17 PMis_sorted
to True
, but got an error about unsorted data:
singer_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream. Latest value '1682622216' is smaller than previous max '1682622249'.
(and that's after also setting check_sorted
to False
).
Makes sense to me that it'd throw that error though, because is_sorted
is being set at the stream level, not per-chunk. And even within a chunk, it would see unsorted data (because it's reverse sorted).