I have a Parent/Child stream and the partitions array in the state object is growing indefinitely (u...
a
I have a Parent/Child stream and the partitions array in the state object is growing indefinitely (up to 206+KB), any ideas on what could be causing this? Is this expected or am I doing something wrong?
https://sdk.meltano.com/en/latest/implementation/state.html#partitioned-state
For parent-child streams, the SDK will automatically use the parent’s context as the default state partition.
I understand the objects are being generated by the parent stream via
get_child_context
, but shouldn’t them be removed from the state automatically once the child stream finished?
1
Here are the parent and child streams:
Copy code
class SegmentationContactsStream(RDStationStream):
    def __init__(self, tap: Tap, segmentation_id: int) -> None:
        self.segmentation_id = segmentation_id
        super().__init__(tap)

    name = "segmentation_contacts"

    @property
    def path(self) -> str:
        return f"/platform/segmentations/{self.segmentation_id}/contacts"

    primary_keys: t.ClassVar[list[str]] = ["id"]
    is_sorted = False
    replication_key = None
    records_jsonpath = "$.contacts[*]"
    schema = th.PropertiesList(
        th.Property("uuid", th.UUIDType),
    ).to_dict()

    def get_child_context(self, record: dict, context: dict | None) -> dict | None:
        return {"contact_uuid": record["uuid"]}
Copy code
class ContactEventsStream(RDStationStream):
    name = "contact_events"
    path = "/platform/contacts/{contact_uuid}/events?event_type=CONVERSION"
    parent_stream_type = SegmentationContactsStream

    primary_keys: t.ClassVar[list[str]] = []
    is_sorted = False
    replication_key = None
    records_jsonpath = "$[*]"

    @cached_property
    def schema(self) -> dict:
        response = requests.get(
            f"{self.url_base}/platform/contacts/fields",
            headers=self.authenticator.auth_headers,
        ).json()

        custom_fields: List[th.Property] = []

        def getThType(data_type: str) -> th.JSONTypeHelper:
            data_type_to_th_type = {
                "STRING": th.StringType,
                "TIMESTAMP": th.DateTimeType,
                "INTEGER": th.NumberType,
                "SET": th.ArrayType(th.StringType),
                "STRING[]": th.ArrayType(th.StringType),
            }
            return data_type_to_th_type.get(data_type, th.StringType)

        for field in response.get("fields", []):
            key = field.get("api_identifier")
            data_type = field.get("data_type")
            custom_fields.append(th.Property(key, getThType(data_type=data_type)))

        return th.PropertiesList(
            th.Property("contact_uuid", th.UUIDType),
            th.Property("event_type", th.StringType),
            th.Property("event_family", th.StringType),
            th.Property("event_identifier", th.StringType),
            th.Property("event_timestamp", th.DateTimeType),
            th.Property(
                "payload",
                th.ObjectType(
                    th.Property("event_timestamp", th.StringType),
                    th.Property("conversion_identifier", th.StringType),
                    th.Property("traffic_source", th.StringType),
                    th.Property("traffic_medium", th.StringType),
                    th.Property("traffic_campaign", th.StringType),
                    *custom_fields,
                ),
            ),
        ).to_dict()
For reference: Following the suggestion on this thread, I’ve set
state_partitioning_keys = []
and it reduced all the noice on the state object 🙌 https://meltano.slack.com/archives/C069CQNHDNF/p1664301145547619
👍 1
e
Hi @andrio_frizon! Glad you figured it out. I think https://sdk.meltano.com/en/latest/parent_streams.html#if-you-do-want-to-utilize-parent-child-streams could be clearer, or the potential issue should be better called out.