Denis I.
02/22/2023, 11:06 PM{
"objects": [
{
"id": 1,
"child_id": 1,
"type_id": 1
},
{
"id": 2,
"child_id": 2,
"type_id": 2
},
{
"id": 3,
"child_id": 3,
"type_id": 2
}
],
"types": [
{
"id": 1,
"name": "1"
},
{
"id": 2,
"name": "2"
}
],
"hasMore": true
}
{
"objects": [
{
"id": 4,
"child_id": 4,
"type_id": 1
},
{
"id": 5,
"child_id": 5,
"type_id": 1
}
],
"types": [
{
"id": 1,
"name": "1"
}
],
"hasMore": false
}
So, I made it work with a RESTStream (records_jsonpath = "$.objects[*]") which passes it’s child_id to child RESTStream. It gives me two fancy outputs:
objects:
{"id": 1, "child_id": 1, "type_id": 1}
{"id": 2, "child_id": 2, "type_id": 2}
{"id": 3, "child_id": 3, "type_id": 2}
{"id": 4, "child_id": 4, "type_id": 1}
{"id": 5, "child_id": 5, "type_id": 1}
children:
{"id": 1, "parent_id": 1, "type_id": 1}
{"id": 2, "parent_id": 2, "type_id": 2}
{"id": 3, "parent_id": 3, "type_id": 2}
{"id": 4, "parent_id": 4, "type_id": 1}
{"id": 5, "parent_id": 5, "type_id": 1}
Everything works fine at this step with a pagination token linked to the hasMore value.
The tricky part here is the types list. Each response page with objects and types could have same types on each page, but I want to have types as a separate stream with only unique elements appeared on all of the pages:
types:
{"id": 1, "name": "1"}
{"id": 2, "name": "2"}
Is there a proper way to solve it?aaronsteers
02/22/2023, 11:32 PMtypes does not have a unique API endpoint - which would avoid the change of duplicates? And is it safe to assume that types will always sync with FULL_TABLE replication, given that there isn't a timestamp there?aaronsteers
02/22/2023, 11:37 PMrecords_jsonpath = "$.types[*]"
You can reuse the same pagination logic that you have already.aaronsteers
02/22/2023, 11:38 PMpost_process() to cache items already seen, returning None from post_process() if the record has already been sent - which tells the SDK to filter out the record.aaronsteers
02/22/2023, 11:38 PMaaronsteers
02/22/2023, 11:39 PMDenis I.
02/23/2023, 3:35 PMtypes is the API endpoint we have for now: objects are incremental and types are just references for objects that contains relevant types on a page. It means that during some API syncs we could receive less types than we’d previously seen.
I’ve duplicated the parent stream as TypeStream and it works fine. My concern was about the actual duplication of requests to the API endpoint 🙂 Solved it with requests-cache , seems a bit overkill, but it works for now.
I’d try to use post_process() nextDenis I.
02/27/2023, 9:29 PMreplication_key for both objects and types 🫠
I can’t save last updated_at from objects to types since the API responses return objects in ascending order and I filter out all new types items that has already been sent.
Is there a proper way to set replication_key_value that goes to the state?aaronsteers
02/27/2023, 11:49 PMreplication_key and just leave undefined.aaronsteers
02/27/2023, 11:50 PMDenis I.
02/28/2023, 10:52 AMcreated_at (datetime) to the API response example:
{
"objects": [
{
"id": 1,
"child_id": 1,
"type_id": 1,
"created_at": "2023-01-01T00:00:00"
},
{
"id": 2,
"child_id": 2,
"type_id": 2,
"created_at": "2023-01-01T00:01:00"
},
{
"id": 3,
"child_id": 3,
"type_id": null,
"created_at": "2023-01-01T00:02:00"
}
],
"types": [
{
"id": 1,
"name": "1"
},
{
"id": 2,
"name": "2"
}
],
"hasMore": true
}
replication_method = "INCREMENTAL"
replication_key = "created_at"
Request has date_from parameter, it could be used for incremental requests.
Also, the type_id could be null, some of objects could have no related types in response.Denis I.
02/28/2023, 11:04 AMObjectsStream yields items from both arrays (objects, types) and adds additional _type key to distinguish rows. And then downstream methods and TypesStream use the key to drop or process relevant items.
I also had to overload _sync_records() method and patch the block which yields record to filter out all of non-objects objects.
This approach works fine, but seems a bit hacky. What do you think?Denis I.
02/28/2023, 12:44 PMtypes data to each relevant objects item in parse_response() method and then process types in child TypesStream.Denis I.
02/28/2023, 10:32 PMobjects.created_at
2. All received types saved in ObjectsStream property as cached dict
3. ObjectsStream.get_response() yields rows from received objects and process them as usual
4. ObjectsStream.get_child_context() returns types item with every first objects item related to it
5. TypesStream.get_records() processes each received from parent row with types item and drops all new copies of it that come from every new page of response
I also use another child SubObjectsStream to query additional data per each objects item received from ObjectsStream.get_child_context() method.
This approach makes only one request per page of objects/types API endpoint, keeps track of state for replication key of objects.created_at and returns only unique types items per sync.
I wonder if there is more straightforward solution or features in roadmap that could improve this approach’s architecture (: