Question regarding a stream's `replication_key`, a...
# singer-tap-development
m
Question regarding a stream's
replication_key
, and this is partially in regards to Singer spec in general as well. Is a stream's
replication_key
and key within an individual record that is used a means of bookmarking, or is a key you use to bookmark where a stream left off? For example we have a hypothetical endpoint
GET /api/orders?created_after='2020-01-13'
with a response of
Copy code
{
  "orders": [
    {
      "order_id": 1,
      "customer_id": 2
    },
    {
      "order_id": 2,
      "customer_id": 55
    }
  ]
}
Now we want to track where we left off since the last sync by tracking the last time we successfully queried and not have to due a full sync of all orders all the time. Since there is no key within an individual record, does that mean there is no technical
replication_key
for this stream? Or is
replication_key
arbitrary and just a way to bookmark where this individual stream left off? For the SDK section, does the SDK utilize a stream's
replication_key
in any functional way, or does it only use it for metadata?
a
Hi, @michael_cooper. Great question! The
replication_key
can optionally be set to the name of any property in the stream’s records which can be used as a bookmark to resume and get just the incrementally new/updated records. And yes, the SDK will try to use that smartly, for instance, by enabling ‘INCREMENTAL’ replication automatically and automatically tracking state bookmarks internally when it detects that property is set. Depending on your implementation, you would also likely use that prior bookmarked value when you request records from the source, so only new or updated records have to be read.
Does that make sense?
m
I believe so. So does that mean it checks individual records, i.e. records yielded to be written, for your defined
replication_key
?
a
Yep! That’s right. Assuming the record data has that key present in a column (aka property), the specified field will get tracked and bookmarked for you automatically. State file and bookmarks are described in the spec here but technically you don’t really need to worry about the spec since it is fully implemented by the SDK.
m
This brings up the issue of where to put bookmarks that don't reside within a record's properties? As per my example above, there is clearly a way to query the API to only give records after a specific date via the API's query params, but there is no way to track that within the API's response, i.e. there's no
created_at
field within an individual record. If I don't have a bookmark somewhere to track
created_after
then I will effectively be doing a full table sync every time, which is not ideal.
a
Yeah, that makes sense. Our recommendation would be to add it into the stream so it becomes part of the record. This will solve the inclusion problem, but also will be a best practice in terms of ability to audit the data in the future. You can add any fields you want, either during
get_records()
or in the
post_process()
. Would these work for your implementation, or if not, can you tell me a little more about the API. Is this REST?
m
It's mostly a semantic thing. We like to design our streams to be able to track where it left off even if there is an error mid stream. For example, if we have to loop through pages within a stream, and the stream breaks for some reason on page 10 of 50, then we want to know where it stopped and not have to re-ingest pages 1 through 9 when we fix the stream.
It also means that any streams that don't have a
replication_key
are technically
FULL_TABLE
syncs despite the fact that the stream does not sync all records every sync even if the API supports querying only certain records.
a
Yeah, definitely you would want to enable incremental replication. A specific example API might be helpful for me to understand if there’s a gap in how the SDK handles this. (Also, the way in which the stream is sorted is also relevant here, since unsorted streams won’t be resumable in the same way.)
Most APIs I have run into so far have at least a timestamp or integer key which exists in the record or could be added to it.
m
This or this might work.
a
In the first example, I do see three timestamps on each record:
created
,
responsed
, and
lastemailed
. From the docs, it looks like you have
responded
as a timestamp (epoch format) and you would just use that as the
since_time
input and also for your replication key. (Let me know if I’m reading the docs wrong.)
For that example, if the sort is left as default (sort_by=responded, sort_direction=asc), then your records will come out in order. They’ll be resumable by default just by tracking the latest value and you can resume without needing to recall the pagination.
I just realized I should have also stated this upfront - although I haven’t tested this without a replication_key set - you also can take advantage of
get_context_state(context)
, which previous to 0.2.0 was called
get_stream_or_partition_state(partition)
. This gives you a readable and writeable state dictionary which you can use to store anything you want. 🙂https://gitlab.com/meltano/singer-sdk/-/blob/main/singer_sdk/streams/core.py#L386
I’ve not been broadly promoting that method, since one of our goals with the SDK is for developers to not need to directly manage state and other Singer internals, but it is there if you want or need it.
m
Sorry, I had a long weekend, so now I'm back. Here is actually a perfect example of an API we're bringing in. The way to query is the actual URI, and there's only one key within the response.
p
I also have something similar to the original use case. I have a
since
parameter that takes a date but the payload I receive back doesnt contain any dates. I think @aaronsteers recommendation might work for me, if I add a
extracted_ts
of current time using the
post_process
method to each record then I can use that as a replication key. My concern is that a failed stream could bookmark my current time before the entire stream is done causing missing data the next time it runs. Is this a valid concern? Or is there a mechanism to prevent this
@michael_cooper if what im doing work then you can do the same with your NPS endpoint. Just append current date and use it as a replication key. Then in
get_url_params
you can do a day diff between the last bookmark and current time then that would be your
days
parameter
m
@pnadolny how would the
get_url_params
work in this case? This API doesn't use query parameters but instead encodes the "params" as a url path.
p
ah i missed that - thought it was a param. Regardless it should work the same way if you can track some injected timestamp. I see a
get_url
method that gets called before each request is made so maybe you could do it there