Denis I.
02/22/2023, 11:06 PM{
"objects": [
{
"id": 1,
"child_id": 1,
"type_id": 1
},
{
"id": 2,
"child_id": 2,
"type_id": 2
},
{
"id": 3,
"child_id": 3,
"type_id": 2
}
],
"types": [
{
"id": 1,
"name": "1"
},
{
"id": 2,
"name": "2"
}
],
"hasMore": true
}
{
"objects": [
{
"id": 4,
"child_id": 4,
"type_id": 1
},
{
"id": 5,
"child_id": 5,
"type_id": 1
}
],
"types": [
{
"id": 1,
"name": "1"
}
],
"hasMore": false
}
So, I made it work with a RESTStream (records_jsonpath = "$.objects[*]"
) which passes it’s child_id
to child RESTStream. It gives me two fancy outputs:
objects:
{"id": 1, "child_id": 1, "type_id": 1}
{"id": 2, "child_id": 2, "type_id": 2}
{"id": 3, "child_id": 3, "type_id": 2}
{"id": 4, "child_id": 4, "type_id": 1}
{"id": 5, "child_id": 5, "type_id": 1}
children:
{"id": 1, "parent_id": 1, "type_id": 1}
{"id": 2, "parent_id": 2, "type_id": 2}
{"id": 3, "parent_id": 3, "type_id": 2}
{"id": 4, "parent_id": 4, "type_id": 1}
{"id": 5, "parent_id": 5, "type_id": 1}
Everything works fine at this step with a pagination token linked to the hasMore
value.
The tricky part here is the types
list. Each response page with objects
and types
could have same types
on each page, but I want to have types
as a separate stream with only unique elements appeared on all of the pages:
types:
{"id": 1, "name": "1"}
{"id": 2, "name": "2"}
Is there a proper way to solve it?aaronsteers
02/22/2023, 11:32 PMtypes
does not have a unique API endpoint - which would avoid the change of duplicates? And is it safe to assume that types
will always sync with FULL_TABLE
replication, given that there isn't a timestamp there?aaronsteers
02/22/2023, 11:37 PMrecords_jsonpath = "$.types[*]"
You can reuse the same pagination logic that you have already.aaronsteers
02/22/2023, 11:38 PMpost_process()
to cache items already seen, returning None
from post_process()
if the record has already been sent - which tells the SDK to filter out the record.aaronsteers
02/22/2023, 11:38 PMaaronsteers
02/22/2023, 11:39 PMDenis I.
02/23/2023, 3:35 PMtypes
is the API endpoint we have for now: objects
are incremental and types
are just references for objects that contains relevant types on a page. It means that during some API syncs we could receive less types
than we’d previously seen.
I’ve duplicated the parent stream as TypeStream and it works fine. My concern was about the actual duplication of requests to the API endpoint 🙂 Solved it with requests-cache
, seems a bit overkill, but it works for now.
I’d try to use post_process()
nextDenis I.
02/27/2023, 9:29 PMreplication_key
for both objects
and types
🫠
I can’t save last updated_at
from objects
to types
since the API responses return objects in ascending order and I filter out all new types
items that has already been sent.
Is there a proper way to set replication_key_value
that goes to the state?aaronsteers
02/27/2023, 11:49 PMreplication_key
and just leave undefined.aaronsteers
02/27/2023, 11:50 PMDenis I.
02/28/2023, 10:52 AMcreated_at
(datetime) to the API response example:
{
"objects": [
{
"id": 1,
"child_id": 1,
"type_id": 1,
"created_at": "2023-01-01T00:00:00"
},
{
"id": 2,
"child_id": 2,
"type_id": 2,
"created_at": "2023-01-01T00:01:00"
},
{
"id": 3,
"child_id": 3,
"type_id": null,
"created_at": "2023-01-01T00:02:00"
}
],
"types": [
{
"id": 1,
"name": "1"
},
{
"id": 2,
"name": "2"
}
],
"hasMore": true
}
replication_method = "INCREMENTAL"
replication_key = "created_at"
Request has date_from
parameter, it could be used for incremental requests.
Also, the type_id
could be null, some of objects
could have no related types
in response.Denis I.
02/28/2023, 11:04 AMObjectsStream
yields items from both arrays (objects
, types
) and adds additional _type
key to distinguish rows. And then downstream methods and TypesStream
use the key to drop or process relevant items.
I also had to overload _sync_records()
method and patch the block which yields record to filter out all of non-objects
objects.
This approach works fine, but seems a bit hacky. What do you think?Denis I.
02/28/2023, 12:44 PMtypes
data to each relevant objects
item in parse_response()
method and then process types
in child TypesStream
.Denis I.
02/28/2023, 10:32 PMobjects.created_at
2. All received types
saved in ObjectsStream
property as cached dict
3. ObjectsStream.get_response()
yields rows from received objects
and process them as usual
4. ObjectsStream.get_child_context()
returns types
item with every first objects
item related to it
5. TypesStream.get_records()
processes each received from parent row with types
item and drops all new copies of it that come from every new page of response
I also use another child SubObjectsStream
to query additional data per each objects
item received from ObjectsStream.get_child_context()
method.
This approach makes only one request per page of objects/types
API endpoint, keeps track of state for replication key of objects.created_at
and returns only unique types
items per sync.
I wonder if there is more straightforward solution or features in roadmap that could improve this approach’s architecture (: