ryan_bell
12/04/2024, 9:42 PMflows
and one called flowactions
. Each flow can have many flow actions associated with it. When I run the tap with these streams selected, although I see it properly requesting the data (the logs show that every flow is being requested and every flow action is also being requested), only the flow actions associated with the last of the ingested flows is actually recorded in the target. So, even though I see that all 60+ flows are properly loaded into their target table, I only see 4 flow actions loaded into the flowactions
table when I can see from the request output that their should be hundreds of actions. Furthermore, I can tell that the 4 flow actions are only associated with the final flow because all 4 have the same flow_id
from their parent flow. I’ve tried reading through the Meltano Singer SDK documentation and looking at other examples, but I just can’t figure out what I’m doing incorrectly.
Here’s the code for those two data streams:
class FlowsStream(KlaviyoStream):
"""Define custom stream."""
name = "flows"
path = "/flows"
primary_keys = ["id"]
replication_key = "updated"
schema_filepath = SCHEMAS_DIR / "flows.json"
def get_child_context(self, record: dict, context: dict | None) -> dict:
context = context or {}
context["flow_id"] = record["id"]
return super().get_child_context(record, context) # type: ignore[no-any-return]
def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
row["updated"] = row["attributes"]["updated"]
return row
@property
def is_sorted(self) -> bool:
return True
class FlowActionsStream(KlaviyoStream):
"""Define custom stream."""
name = "flowactions"
path = "/flows/{flow_id}/flow-actions"
primary_keys = ["id"]
replication_key = "updated"
parent_stream_type = FlowsStream
schema_filepath = SCHEMAS_DIR / "flowactions.json"
max_page_size = 50
def get_child_context(self, record: dict, context: dict | None) -> dict:
context = context or {}
context["flow_action_id"] = record["id"]
return super().get_child_context(record, context) # type: ignore[no-any-return]
def post_process(self, row: dict, context: dict) -> dict | None:
row["flow_id"] = context["flow_id"]
row["updated"] = row["attributes"]["updated"]
time.sleep(1)
return row
@property
def is_sorted(self) -> bool:
return True
Reuben (Matatika)
12/04/2024, 11:45 PMtarget-jsonl
) you do see all expected flow actions. You could also check by inspecting the tap output manually and looking for flow action RECORD
messages:
meltano invoke tap-klaviyo > tap.out
grep -e RECORD -e flowactions tap.out | wc -l
Is it possible for more than one flow to reference the same action? If so, you may need to update the flow actions stream primary keys to include the flow ID in addition to the action ID (which also means updating the steam schema with flow_id
as a top-level property):
primary_keys = ["flow_id, "id"]
ryan_bell
12/05/2024, 4:10 PMflow_id
and id
. When I run this with target-jsonl, it works and I see the full output with every flowaction
record. However, when using target-postgres, I’m running into the same issue of only seeing the 4 records related to the final recorded flow.
Also, here’s what my schema looks like for `flowactions`:
{
"properties": {
"id": {
"type": [
"string"
]
},
"flow_id": {
"type": [
"string"
]
},
"updated": {
"format": "date-time",
"type": [
"string",
"null"
]
},
"attributes": {
"properties": {
"action_type": {
"type": [
"string",
"null"
]
},
"created": {
"format": "date-time",
"type": [
"string",
"null"
]
},
"updated": {
"format": "date-time",
"type": [
"string",
"null"
]
}
},
"type": [
"object",
"null"
]
}
}
}
ryan_bell
12/05/2024, 4:13 PMReuben (Matatika)
12/05/2024, 4:25 PMflowactions
table after changing the primary keys?ryan_bell
12/05/2024, 4:29 PMryan_bell
12/05/2024, 4:41 PMReuben (Matatika)
12/05/2024, 4:44 PMReuben (Matatika)
12/05/2024, 4:49 PMmeltano run --full-refresh tap-klaviyo target-postgres
works after dropping the table