Hey everyone :wave: I’m working on a <Customer.io...
# singer-tap-development
h
Hey everyone 👋 I’m working on a Customer.io tap using the Meltano Singer SDK and running into some odd parent-child behavior. Here’s the gist: - Parent stream (
SegmentsStream
) correctly fetches all segments and my override of
get_child_context(record)
returns
{"segment_id": record["id"]}
for the one segment I’m targeting. - Child stream (
SegmentMembersStream
) has
schema
including
"segment_id"
, no
replication_key
, and overrides
parse_response(response)
to yield only the member fields:
Copy code
python
    def parse_response(self, response):
        for identifier in response.json().get("identifiers", []):
            yield {
                "member_id": identifier["id"],
                "cio_id":    identifier.get("cio_id"),
                "email":     identifier.get("email"),
            }
- According to the docs, the SDK should automatically merge in the
segment_id
from context after
parse_response
(and before shipping the record out), as long as it’s in the schema. But in practice I only see
segment_id
in the separate
context
argument — it never appears in the actual record unless I manually inject it in `post_process`:
Copy code
python
    def post_process(self, row, context):
        row["segment_id"] = context["segment_id"]
        return row
Has anyone else seen this? Should the SDK be automatically adding parent-context fields into the record dict before emit, or is manual injection (in
post_process
) the expected approach here? Any pointers or workaround suggestions are much appreciated! 🙏
e
Hey @hawkar_mahmod 👋 Which SDK class do those stream classes respectively inherit from?
h
A
CustomerIoStream
that looks like this:
Copy code
class CustomerIoStream(RESTStream):
    """CustomerIo stream class."""

    # Set this value or override `get_new_paginator`.
    next_page_token_jsonpath = "$.next"  # noqa: S105

    @property
    def url_base(self) -> str:
        """Return the API URL root, configurable via tap settings."""
        return "<https://api.customer.io>"


    @property
    def authenticator(self) -> BearerTokenAuthenticator:
        """Return a new authenticator object.

        Returns:
            An authenticator instance.
        """
        return BearerTokenAuthenticator.create_for_stream(
            self,
            token=self.config.get("api_key", ""),
        )

    @property
    def http_headers(self) -> dict:
        """Return the http headers needed.

        Returns:
            A dictionary of HTTP headers.
        """
        headers = {}
        if "user_agent" in self.config:
            headers["User-Agent"] = self.config.get("user_agent")
        # If not using an authenticator, you may also provide inline auth headers:
        # headers["Private-Token"] = self.config.get("auth_token")  # noqa: ERA001
        return headers

    def get_new_paginator(self) -> BaseAPIPaginator:
        """Create a new pagination helper instance.

        If the source API can make use of the `next_page_token_jsonpath`
        attribute, or it contains a `X-Next-Page` header in the response
        then you can remove this method.

        If you need custom pagination that uses page numbers, "next" links, or
        other approaches, please read the guide: <https://sdk.meltano.com/en/v0.25.0/guides/pagination-classes.html>.

        Returns:
            A pagination helper instance.
        """
        return super().get_new_paginator()

    def get_url_params(
        self,
        context: dict | None,  # noqa: ARG002
        next_page_token: Any | None,  # noqa: ANN401
    ) -> dict[str, Any]:
        """Return a dictionary of values to be used in URL parameterization.

        Args:
            context: The stream context.
            next_page_token: The next page index or value.

        Returns:
            A dictionary of URL query parameters.
        """
        params: dict = {}
        if next_page_token:
            params["start"] = next_page_token
        return params

    def parse_response(self, response: requests.Response) -> Iterable[dict]:
        """Parse the response and return an iterator of result records.

        Args:
            response: The HTTP ``requests.Response`` object.

        Yields:
            Each record from the source.
        """
        yield from extract_jsonpath(self.records_jsonpath, input=response.json())

    def _row_is_updated(self, row: dict, starting_value: int) -> bool:
        """Check if a row is updated.

        Args:
            row: The row to check.
            context: The stream context.

        Returns:
            True if the row is updated, False otherwise.
        """
        update_value = row.get(self.replication_key)
        if update_value is None:
            return True

        return update_value > starting_value


    def post_process(
        self,
        row: dict,
        context: dict | None = None,  # noqa: ARG002
    ) -> dict | None:
        """Filter out records that are older than the latest record.

        Overcomes the API limitation of not being able to filter by
        updated timestamp. This is a workaround to avoid processing
        all records every time.
        """
        max_updated_timestamp = self.get_starting_replication_key_value(context)

        if max_updated_timestamp is None:
            return super().post_process(row, context=context)

        if self.replication_key and self._row_is_updated(row, max_updated_timestamp):
            return super().post_process(row, context=context)

        return None
e
Gotcha. I see no reason why the context fields wouldn't be added to the record. That is happening downstream of
Stream.get_records
(and thus
Stream.parse_response
) here: https://github.com/meltano/sdk/blob/86355f8b71be9cbabc08d64ab28ee92bf3e0a363/singer_sdk/streams/core.py#L1116-L1119 Adding a
breakpoint()
there or around line 1182 might reveal if the context is coming in empty for some reason.
v
Is
segment_id
in your schema for the stream? You'll see a warning message if it isn't
1
h
Yes it is, that's why this puzzled me.
I'll try the break point and feedback