I'm a bit surprised that the <default Stripe tap> ...
# singer-tap-development
o
I'm a bit surprised that the default Stripe tap doesn't integrate with the BigQuery loader, seeing as how the Stripe API and BigQuery are two of the most prominent use cases in the data world. This schema issue has been open for nearly 2 years. 😕 I'm pretty new to Meltano, so still getting acquainted with things, but it feels like in that time span, someone would've fleshed out that schema to match the Stripe API. Am I missing something that complicates this more than I realize?
d
The now default variant, z3z1ma, should handle this case.
t
That’s the default variant for bigquery btw (not stripe) 😄
o
Ahh I see, thank you! I just realized that we're not using the default BigQuery variant (we're using adswerve), so that makes sense. Was adswerve the default in the past?
p
Yeah it was officially made default just recently https://github.com/meltano/hub/pull/1224 although there was a solid group of users that were using for it for a while before it became the default. Previously adswerve was the default for a long time
o
That's excellent context, thank you all! This helps put the picture together.
p
Have you tried https://hub.meltano.com/extractors/tap-stripe--prratek/? It looks like we didnt have it labeled but it is built with the SDK. From a quick skim it doesn't have the exact same stream coverage but it looks super easy to extend based on how its built. cc@prratek_ramchandani waves do you have any insights you could share on that tap?
o
Yup, that's the variant we're using now! (haha I work on Prratek's team at Vox. 😆) I'm making some tweaks to our variant to support pagination but ran into some issues and was just curious if upgrading to the default Stripe tap and the (new) default BigQuery loader would be worth our time in the long run. But, for now, I'll continue trying to get things working with our variant. Thanks!
p
LOL
hi omar
o
full circle
p
haha wow well please report back how it goes because making an SDK variant default (especially if the singer-io variant has bugs) would be ideal!
p
the biggest issue we've had with that stripe tap is getting meltano to update state because data from the API is reverse sorted. thread from a while ago - without that, it was impossible to do a full refresh because we needed like 3-4 years of historical data and it would fail eventually with a random network error and not save state
oh wait that might be the wrong thread
p
Ah ok. I just read through that thread and I wonder if you need to set the
is_sorted
property to true. My understanding is that the SDK knows streams arent safe to resume if interrupted if they arent sorted but if they are sorted then you can resume. Right now it looks like your tap is telling the SDK that the streams are not sorted so it wont let you resume. The stripe API sounds like its reverse sorted but by using your chunking logic you can manually paginate to return chunks in sorted order at the chunk level (but within a chunk they arent necessarily sorted). Is that right? So technically your replication key is the latest chunk time and as long as youre emitting state only after all records in that chunk are processed then I think you're safe to consider this
is_sorted=true
. I think your logic in the tap already does all of this properly (from a quick skim, definitely test it though) so its just a matter of letting the SDK know that youre handling the chunking and emitting of replication keys to simulate sorting, so its safe to resume. Does that make sense?
I also disabled the
check_sorted
setting because it was keeping a max timestamp then if anything older showed up it would throw an error since it detected an out of sort order timestamp. I understood this as a safe guard that can be disabled if you're doing something custom and know that its safe to do so.
cc @edgar_ramirez_mondragon to validate what I've said is correct
e
I think using a signpost would be the best path here, so setting
is_sorted = False
on the stream should be enough. We don’t have any guides or examples on that so if that works, it’d be great to evaluate the dev experience of signposts and improve the docs.
p
Ah ok. Does tap-stripe need to implement generating signposts or does the SDK do that automatically and tap-stripe just needs to access and use them when theyre available?
e
In theory the tap should only need to set
is_sorted = False
and call retrieve the signpost with
Stream.get_replication_key_signpost()
to set the relevant params or headers
p
okay yep this makes sense! we'll come back to it next week but sounds like it should be relatively easy to tell the SDK to update state after each chunk given that the chunks are sorted
p
@edgar_ramirez_mondragon I'm a bit confused now 😆. The goal is to checkpoint the unsorted stream at times where we know that all records from that batch are emitted. The
is_sorted
property is defaulted to False and not overriden by the tap so that is already set properly. Can you elaborate on how the
get_replication_key_signpost
approach would work? How is the state replication key different than the signpost? The issue Prratek is having is that after an interrupted sync the SDK is throwing away the replication key because its not resumable. Right now the tap is manually telling the stream when to try to emit state in https://github.com/prratek/tap-stripe/blob/af95705f7cca624ea179ca49a2d46a9c05530ed2/tap_stripe/streams.py#L147, is that no longer needed?
o
Apologies for dropping this. I was out for a few days. Since we're on such an old version of the SDK (0.3.X), I'm thinking of upgrading to version 0.21.X to make use of the new pagination classes, and see if that would facilitate getting state set up. But, that said,
is_sorted
is defaulted to False (like Pat said). I'll take a look at the Signposts (like Edgar suggested), but I don't fully understand how it's different than the replication key, so I'll spend some time on that.
Quick update (partially for my own tracking): The
get_replication_key_signpost
function seems like it's used in combination with the replication key to set an end goal post. I set it manually. It looks like our issue is that
finalize_state_progress_markers
function doesn't actually emit an updated state value. When that function gets called, I do a check by running
meltano state get <my_elt_pipeline_job>
from the command line and I don't see any state variables set. However, when I check our state with Python using the
stream_state
and
get_context_state
functions, I'm seeing a consistent format:
Copy code
stream_state: {'replication_key_signpost': 1682454007, 'starting_replication_value': '2022-04-01T00:00:00', 'replication_key': 'created', 'replication_key_value': 1648771370}

get_context_state: {'replication_key_signpost': 1682454007, 'starting_replication_value': '2022-04-01T00:00:00', 'replication_key': 'created', 'replication_key_value': 1648771370}
I tried passing that format into our
finalize_state_progress_markers
function, but seems like the state isn't being persisted. (And all this is after setting
is_sorted
to True and
check_sorted
to False.) Is there another way to manually set state with the sdk?
I attempted to set
is_sorted
to
True
, but got an error about unsorted data:
Copy code
singer_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream. Latest value '1682622216' is smaller than previous max '1682622249'.
(and that's after also setting
check_sorted
to
False
). Makes sense to me that it'd throw that error though, because
is_sorted
is being set at the stream level, not per-chunk. And even within a chunk, it would see unsorted data (because it's reverse sorted).