Possibly a simple thing I've overlooked regarding ...
# singer-tap-development
m
Possibly a simple thing I've overlooked regarding
records_jsonpath = "$[*]"
I've set it to
records_jsonpath = "$.user_statuses[*]"
, which is where the records I'm after are available in an array, but it seems that prevents me accessing the API metadata necessary for incremental updates etc
The results look like
Copy code
{
  "user_statuses": [
    {
      "id": 6887,
      "user_id": 116625277472,
      "status": "away",
      "duration_in_seconds": 2345,
      "updater_id": 116625277472,
      "started_at": "2019-12-17T17:24:48.273Z",
      "ended_at": "2019-12-17T18:03:53.168Z"
    },
    ...
    {
      "id": 1447322,
      "user_id": 378630388592,
      "status": "out_of_office",
      "duration_in_seconds": 74631,
      "updater_id": 378630388592,
      "started_at": "2021-06-19T18:26:32.922Z",
      "ended_at": "2021-06-20T15:10:23.976Z"
    }
  ],
  "end_time": 1624201823.976,
  "count": 330,
  "end_of_stream": true,
  "cursor": "MTYyNDIwMTgyMy45NzZ8fDE0NDczMjN8",
  "next_page": "<https://api.myplaylist.io/zendesk/incremental/user_statuses?cursor=MTYyNDIwMTgyMy45NzZ8fDE0NDczMjN8>"
}
I'm using the
end_of_stream
,
end_time
and
cursor
fields from the above. In
client.py
I've got
Copy code
next_page_token_jsonpath = "$.cursor" 
    end_of_stream  = "$.end_of_stream"
a
Hi, @matt_arderne. If I'm reading this correctly, it looks like your
records_jsonpath
is correct but you'll need to override
get_next_page_token()
instead of
next_page_token_jsonpath
. The method implementation of
get_next_page_token()
will allow you you perform the more complex evaluation needed - for instance, returning
None
instead of the token if
end_of_stream
is
True
. Does that help?
Similarly, if you need to append the records themselves with some of the enveloping fields, you can do so by overriding
parse_response()
and injecting the needed extra fields before emitting your records.
m
Hey @aaronsteers thanks yes that is correct, and I'm happy with
next_page_token_jsonpath
and
end_of_stream
both of which are working fine. What I'm struggling with is the
replication_key  = "end_time"
or
replication_key = "$.end_time"
If I put either of those in
client.py
or
streams.py
then I get an error
Copy code
Could not detect type from empty type_dict. Did you forget to define a property in the stream schema?
I've set my Schema for the stream as the below, and I'm wondering if the
records_jsonpath
is meaning that the metadata isn't accessible
Copy code
records_jsonpath = "$.user_statuses[*]"
primary_keys = ["id"]

schema = th.PropertiesList(
                    th.Property("id", th.IntegerType, description=""),
                    th.Property("user_id", th.IntegerType, description=""),
                    th.Property("status", th.StringType, description=""),
                    th.Property("duration_in_seconds", th.IntegerType, description=""),
                    th.Property("updater_id", th.IntegerType, description=""),
                    th.Property("started_at", th.DateTimeType, description=""),
                    th.Property("ended_at", th.DateTimeType, description=""),  
    ).to_dict()
Similarly, if you need to append the records themselves with some of the enveloping fields, you can do so by overriding
parse_response()
and injecting the needed extra fields before emitting your records.
Hi @aaronsteers thanks for this! 🙏 I'm headed on the right track I think with
parse_response()
I think the issue is that the last page in the response set doesn't include an
end_time
which I think is what has confused it.
injecting the needed extra fields before emitting your records
Do you have an example of how to do this? I haven't seen this in any of the examples etc Side note: the reason I'm headed down this path is that I was having issues with target-stitch, initially I was putting the whole
user_statuses
response in its own
th.ArrayType()
, but that was making it hard to access the primary_key field
id
, so I scrapped the
th.ArrayType
and just went for the previous comment's stream schema. Perhaps I should rather consider
parse_response
more carefully.
The best way I can think of is something like below, will hack around with that
Copy code
def parse_response(self, response: requests.Response) -> Iterable[dict]:
        resp_json = response.json()
        for row in resp_json["results"]:
            row['end_time'] = end_time
            yield row
e
@matt_arderne That's how I'd do it. You'll also have to include it in the schema
m
@edgar_ramirez_mondragon ah ok, that was my next question 😄 thank you... It worked! Must say, I enjoy this SDK, very very much 🚀
a
@edgar_ramirez_mondragon's comment was exactly on point. And yes, your code sample looks great. It looks like you're on your way and I'm glad to hear the SDK is creating a good development experience for you. One last thing to note from above is that I don't think
end_of_stream
is a feature we support in the SDK (at least not yet!) The pagination may be working anyway without that, if the value is
next_page_token_jsonpath
is null when the stream is ended. But just wanted to call that out. If you do end up requiring both the
end_of_stream
signal as well as the next page token, then my comment above around
get_next_page_token()
would be a path forward to handle the more complex pagination requirement.
m
Ah haha thanks for clarifying that @aaronsteers Indeed, the last page looks like this,
next_page_token_jsonpath
ie the below
cursor
is null. I was puzzled by why it was working, as I hadn't seen quite where the
end_of_stream
flag was being picked up, and I hadn't done much to indicate that it was necessary... so that now makes sense 😄
Copy code
{
  "user_statuses": [],
  "count": 0,
  "end_of_stream": true,
  "cursor": null,
  "next_page": null
}