Hi, I’m building a tap for a custom API and it ha...
# singer-tap-development
d
Hi, I’m building a tap for a custom API and it has tricky paginated responses:
Copy code
{
  "objects": [
    {
      "id": 1,
      "child_id": 1,
      "type_id": 1
    },
    {
      "id": 2,
      "child_id": 2,
      "type_id": 2
    },
    {
      "id": 3,
      "child_id": 3,
      "type_id": 2
    }
  ],
  "types": [
    {
      "id": 1,
      "name": "1"
    },
    {
      "id": 2,
      "name": "2"
    }
  ],
  "hasMore": true
}
Copy code
{
  "objects": [
    {
      "id": 4,
      "child_id": 4,
      "type_id": 1
    },
    {
      "id": 5,
      "child_id": 5,
      "type_id": 1
    }
  ],
  "types": [
    {
      "id": 1,
      "name": "1"
    }
  ],
  "hasMore": false
}
So, I made it work with a RESTStream (
records_jsonpath = "$.objects[*]"
) which passes it’s
child_id
to child RESTStream. It gives me two fancy outputs:
Copy code
objects:
{"id": 1, "child_id": 1, "type_id": 1}
{"id": 2, "child_id": 2, "type_id": 2}
{"id": 3, "child_id": 3, "type_id": 2}
{"id": 4, "child_id": 4, "type_id": 1}
{"id": 5, "child_id": 5, "type_id": 1}
Copy code
children:
{"id": 1, "parent_id": 1, "type_id": 1}
{"id": 2, "parent_id": 2, "type_id": 2}
{"id": 3, "parent_id": 3, "type_id": 2}
{"id": 4, "parent_id": 4, "type_id": 1}
{"id": 5, "parent_id": 5, "type_id": 1}
Everything works fine at this step with a pagination token linked to the
hasMore
value. The tricky part here is the
types
list. Each response page with
objects
and
types
could have same
types
on each page, but I want to have
types
as a separate stream with only unique elements appeared on all of the pages:
Copy code
types:
{"id": 1, "name": "1"}
{"id": 2, "name": "2"}
Is there a proper way to solve it?
a
Hi, @Denis I.! I love an interesting challenge... 😅 Can we safely assume that
types
does not have a unique API endpoint - which would avoid the change of duplicates? And is it safe to assume that
types
will always sync with
FULL_TABLE
replication, given that there isn't a timestamp there?
I think what I'd probably do here, if 'yes' to the above, would be to duplicate your parent stream as TypeStream and replace the jsonpath with something like
records_jsonpath = "$.types[*]"
You can reuse the same pagination logic that you have already.
Then, use
post_process()
to cache items already seen, returning
None
from
post_process()
if the record has already been sent - which tells the SDK to filter out the record.
Let me know if this helps! 🙂
d
Thanks for the reply! Yep, your assumptions are correct. The only source for
types
is the API endpoint we have for now:
objects
are incremental and
types
are just references for objects that contains relevant types on a page. It means that during some API syncs we could receive less
types
than we’d previously seen. I’ve duplicated the parent stream as TypeStream and it works fine. My concern was about the actual duplication of requests to the API endpoint 🙂 Solved it with
requests-cache
, seems a bit overkill, but it works for now. I’d try to use
post_process()
next
So, the things got tricky again when I realised the need of the same
replication_key
for both
objects
and
types
🫠 I can’t save last
updated_at
from
objects
to
types
since the API responses return objects in ascending order and I filter out all new
types
items that has already been sent. Is there a proper way to set
replication_key_value
that goes to the state?
a
If the streams here will only be synced via FULL_TABLE replication, you can ignore
replication_key
and just leave undefined.
I don't see any valid keys for incremental replication in your above examples, but lmk if there's more context on how incremental replication might work.
d
Sorry, my bad! The example was too simplified for the updated context. I’ve added
created_at
(datetime) to the API response example:
Copy code
{
  "objects": [
    {
      "id": 1,
      "child_id": 1,
      "type_id": 1,
      "created_at": "2023-01-01T00:00:00"
    },
    {
      "id": 2,
      "child_id": 2,
      "type_id": 2,
      "created_at": "2023-01-01T00:01:00"
    },
    {
      "id": 3,
      "child_id": 3,
      "type_id": null,
      "created_at": "2023-01-01T00:02:00"
    }
  ],
  "types": [
    {
      "id": 1,
      "name": "1"
    },
    {
      "id": 2,
      "name": "2"
    }
  ],
  "hasMore": true
}
replication_method = "INCREMENTAL"
replication_key = "created_at"
Request has
date_from
parameter, it could be used for incremental requests. Also, the
type_id
could be null, some of
objects
could have no related
types
in response.
Meanwhile, I’ve tried to implement another approach.
ObjectsStream
yields items from both arrays (
objects
,
types
) and adds additional
_type
key to distinguish rows. And then downstream methods and
TypesStream
use the key to drop or process relevant items. I also had to overload
_sync_records()
method and patch the block which yields record to filter out all of non-
objects
objects. This approach works fine, but seems a bit hacky. What do you think?
Another approach that came up to me recently is to merge
types
data to each relevant
objects
item in
parse_response()
method and then process
types
in child
TypesStream
.
So, I’ve implemented the last approach I mentioned above and it seems to be the most viable and stable for my API: 1. The tap makes incremental requests with replication key
objects.created_at
2. All received
types
saved in
ObjectsStream
property as cached dict 3.
ObjectsStream.get_response()
yields rows from received
objects
and process them as usual 4.
ObjectsStream.get_child_context()
returns
types
item with every first
objects
item related to it 5.
TypesStream.get_records()
processes each received from parent row with
types
item and drops all new copies of it that come from every new page of response I also use another child
SubObjectsStream
to query additional data per each
objects
item received from
ObjectsStream.get_child_context()
method. This approach makes only one request per page of
objects/types
API endpoint, keeps track of state for replication key of
objects.created_at
and returns only unique
types
items per sync. I wonder if there is more straightforward solution or features in roadmap that could improve this approach’s architecture (: