matt_arderne
10/25/2021, 10:28 AMrecords_jsonpath = "$[*]"
I've set it to records_jsonpath = "$.user_statuses[*]"
, which is where the records I'm after are available in an array, but it seems that prevents me accessing the API metadata necessary for incremental updates etcmatt_arderne
10/25/2021, 10:32 AM{
"user_statuses": [
{
"id": 6887,
"user_id": 116625277472,
"status": "away",
"duration_in_seconds": 2345,
"updater_id": 116625277472,
"started_at": "2019-12-17T17:24:48.273Z",
"ended_at": "2019-12-17T18:03:53.168Z"
},
...
{
"id": 1447322,
"user_id": 378630388592,
"status": "out_of_office",
"duration_in_seconds": 74631,
"updater_id": 378630388592,
"started_at": "2021-06-19T18:26:32.922Z",
"ended_at": "2021-06-20T15:10:23.976Z"
}
],
"end_time": 1624201823.976,
"count": 330,
"end_of_stream": true,
"cursor": "MTYyNDIwMTgyMy45NzZ8fDE0NDczMjN8",
"next_page": "<https://api.myplaylist.io/zendesk/incremental/user_statuses?cursor=MTYyNDIwMTgyMy45NzZ8fDE0NDczMjN8>"
}
I'm using the end_of_stream
, end_time
and cursor
fields from the above.
In client.py
I've got
next_page_token_jsonpath = "$.cursor"
end_of_stream = "$.end_of_stream"
aaronsteers
10/25/2021, 10:42 AMrecords_jsonpath
is correct but you'll need to override get_next_page_token()
instead of next_page_token_jsonpath
. The method implementation of get_next_page_token()
will allow you you perform the more complex evaluation needed - for instance, returning None
instead of the token if end_of_stream
is True
.
Does that help?aaronsteers
10/25/2021, 10:45 AMparse_response()
and injecting the needed extra fields before emitting your records.matt_arderne
10/25/2021, 10:48 AMnext_page_token_jsonpath
and
end_of_stream
both of which are working fine.
What I'm struggling with is the
replication_key = "end_time"
or
replication_key = "$.end_time"
If I put either of those in client.py
or streams.py
then I get an error
Could not detect type from empty type_dict. Did you forget to define a property in the stream schema?
matt_arderne
10/25/2021, 10:52 AMrecords_jsonpath
is meaning that the metadata isn't accessible
records_jsonpath = "$.user_statuses[*]"
primary_keys = ["id"]
schema = th.PropertiesList(
th.Property("id", th.IntegerType, description=""),
th.Property("user_id", th.IntegerType, description=""),
th.Property("status", th.StringType, description=""),
th.Property("duration_in_seconds", th.IntegerType, description=""),
th.Property("updater_id", th.IntegerType, description=""),
th.Property("started_at", th.DateTimeType, description=""),
th.Property("ended_at", th.DateTimeType, description=""),
).to_dict()
matt_arderne
10/25/2021, 11:50 AMSimilarly, if you need to append the records themselves with some of the enveloping fields, you can do so by overridingHi @aaronsteers thanks for this! 🙏 I'm headed on the right track I think withand injecting the needed extra fields before emitting your records.parse_response()
parse_response()
I think the issue is that the last page in the response set doesn't include an end_time
which I think is what has confused it.
injecting the needed extra fields before emitting your recordsDo you have an example of how to do this? I haven't seen this in any of the examples etc Side note: the reason I'm headed down this path is that I was having issues with target-stitch, initially I was putting the whole
user_statuses
response in its own th.ArrayType()
, but that was making it hard to access the primary_key field id
, so I scrapped the th.ArrayType
and just went for the previous comment's stream schema. Perhaps I should rather consider parse_response
more carefully.matt_arderne
10/25/2021, 11:53 AMdef parse_response(self, response: requests.Response) -> Iterable[dict]:
resp_json = response.json()
for row in resp_json["results"]:
row['end_time'] = end_time
yield row
edgar_ramirez_mondragon
10/25/2021, 3:32 PMmatt_arderne
10/25/2021, 4:07 PMaaronsteers
10/25/2021, 4:15 PMend_of_stream
is a feature we support in the SDK (at least not yet!) The pagination may be working anyway without that, if the value is next_page_token_jsonpath
is null when the stream is ended. But just wanted to call that out. If you do end up requiring both the end_of_stream
signal as well as the next page token, then my comment above around get_next_page_token()
would be a path forward to handle the more complex pagination requirement.matt_arderne
10/25/2021, 4:22 PMnext_page_token_jsonpath
ie the below cursor
is null. I was puzzled by why it was working, as I hadn't seen quite where the end_of_stream
flag was being picked up, and I hadn't done much to indicate that it was necessary... so that now makes sense 😄
{
"user_statuses": [],
"count": 0,
"end_of_stream": true,
"cursor": null,
"next_page": null
}