Hey y’all, I could use some help debugging a paren...
# troubleshooting
r
Hey y’all, I could use some help debugging a parent-child stream I have set up for tap-klaviyo. I’ll include what the code looks like currently at the bottom of this message, but first I’ll explain the issue I’m seeing. Basically, I have a stream called
flows
and one called
flowactions
. Each flow can have many flow actions associated with it. When I run the tap with these streams selected, although I see it properly requesting the data (the logs show that every flow is being requested and every flow action is also being requested), only the flow actions associated with the last of the ingested flows is actually recorded in the target. So, even though I see that all 60+ flows are properly loaded into their target table, I only see 4 flow actions loaded into the
flowactions
table when I can see from the request output that their should be hundreds of actions. Furthermore, I can tell that the 4 flow actions are only associated with the final flow because all 4 have the same
flow_id
from their parent flow. I’ve tried reading through the Meltano Singer SDK documentation and looking at other examples, but I just can’t figure out what I’m doing incorrectly. Here’s the code for those two data streams:
Copy code
class FlowsStream(KlaviyoStream):
    """Define custom stream."""

    name = "flows"
    path = "/flows"
    primary_keys = ["id"]
    replication_key = "updated"
    schema_filepath = SCHEMAS_DIR / "flows.json"

    def get_child_context(self, record: dict, context: dict | None) -> dict:
        context = context or {}
        context["flow_id"] = record["id"]

        return super().get_child_context(record, context)  # type: ignore[no-any-return]

    def post_process(
        self,
        row: dict,
        context: dict | None = None,  # noqa: ARG002
    ) -> dict | None:
        row["updated"] = row["attributes"]["updated"]
        return row

    @property
    def is_sorted(self) -> bool:
        return True


class FlowActionsStream(KlaviyoStream):
    """Define custom stream."""

    name = "flowactions"
    path = "/flows/{flow_id}/flow-actions"
    primary_keys = ["id"]
    replication_key = "updated"
    parent_stream_type = FlowsStream
    schema_filepath = SCHEMAS_DIR / "flowactions.json"
    max_page_size = 50

    def get_child_context(self, record: dict, context: dict | None) -> dict:
        context = context or {}
        context["flow_action_id"] = record["id"]

        return super().get_child_context(record, context)  # type: ignore[no-any-return]

    def post_process(self, row: dict, context: dict) -> dict | None:
        row["flow_id"] = context["flow_id"]
        row["updated"] = row["attributes"]["updated"]
        time.sleep(1)
        return row

    @property
    def is_sorted(self) -> bool:
        return True
r
Sounds like a primary key issue. You might find that when loading to an append-only or non-db target (e.g.
target-jsonl
) you do see all expected flow actions. You could also check by inspecting the tap output manually and looking for flow action
RECORD
messages:
Copy code
meltano invoke tap-klaviyo > tap.out
grep -e RECORD -e flowactions tap.out | wc -l
Is it possible for more than one flow to reference the same action? If so, you may need to update the flow actions stream primary keys to include the flow ID in addition to the action ID (which also means updating the steam schema with
flow_id
as a top-level property):
Copy code
primary_keys = ["flow_id, "id"]
1
r
I tried switching the primary keys to include both
flow_id
and
id
. When I run this with target-jsonl, it works and I see the full output with every
flowaction
record. However, when using target-postgres, I’m running into the same issue of only seeing the 4 records related to the final recorded flow. Also, here’s what my schema looks like for `flowactions`:
Copy code
{
  "properties": {
    "id": {
      "type": [
        "string"
      ]
    },
    "flow_id": {
      "type": [
        "string"
      ]
    },
    "updated": {
      "format": "date-time",
      "type": [
        "string",
        "null"
      ]
    },
    "attributes": {
      "properties": {
        "action_type": {
          "type": [
            "string",
            "null"
          ]
        },
        "created": {
          "format": "date-time",
          "type": [
            "string",
            "null"
          ]
        },
        "updated": {
          "format": "date-time",
          "type": [
            "string",
            "null"
          ]
        }
      },
      "type": [
        "object",
        "null"
      ]
    }
  }
}
Could this perhaps be an issue with the target? Here’s the exact target-postgres repo I’m using: https://github.com/fixdauto/target-postgres-fixd/tree/changeMeltano
r
Did you try dropping the
flowactions
table after changing the primary keys?
r
I’m almost positive I did but I’ll try running it again after explicitly doing so
Welp… it was an issue with the target. I’ll have to dig in deeper to know what exactly is causing the difference, but I used the MeltanoLabs version of target-postgres and it solved the issues I was seeing. I appreciate you helping me out here. If you do happen to have any theories about what would cause that issue with the target-postgres version I linked above, that would be helpful, but no problem if not. Thanks again!
👍 2
r
Great, I would use the default variant anyway - unless you had a specific reason for using that one.
I wonder if it was a state-related issue? i.e.
meltano run --full-refresh tap-klaviyo target-postgres
works after dropping the table