hawkar_mahmod
05/08/2025, 9:16 AMSegmentsStream
) correctly fetches all segments and my override of get_child_context(record)
returns {"segment_id": record["id"]}
for the one segment I’m targeting.
- Child stream (SegmentMembersStream
) has schema
including "segment_id"
, no replication_key
, and overrides parse_response(response)
to yield only the member fields:
python
def parse_response(self, response):
for identifier in response.json().get("identifiers", []):
yield {
"member_id": identifier["id"],
"cio_id": identifier.get("cio_id"),
"email": identifier.get("email"),
}
- According to the docs, the SDK should automatically merge in the segment_id
from context after parse_response
(and before shipping the record out), as long as it’s in the schema. But in practice I only see segment_id
in the separate context
argument — it never appears in the actual record unless I manually inject it in `post_process`:
python
def post_process(self, row, context):
row["segment_id"] = context["segment_id"]
return row
Has anyone else seen this? Should the SDK be automatically adding parent-context fields into the record dict before emit, or is manual injection (in post_process
) the expected approach here? Any pointers or workaround suggestions are much appreciated! 🙏Edgar Ramírez (Arch.dev)
05/08/2025, 3:43 PMhawkar_mahmod
05/09/2025, 9:41 AMCustomerIoStream
that looks like this:
class CustomerIoStream(RESTStream):
"""CustomerIo stream class."""
# Set this value or override `get_new_paginator`.
next_page_token_jsonpath = "$.next" # noqa: S105
@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
return "<https://api.customer.io>"
@property
def authenticator(self) -> BearerTokenAuthenticator:
"""Return a new authenticator object.
Returns:
An authenticator instance.
"""
return BearerTokenAuthenticator.create_for_stream(
self,
token=self.config.get("api_key", ""),
)
@property
def http_headers(self) -> dict:
"""Return the http headers needed.
Returns:
A dictionary of HTTP headers.
"""
headers = {}
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
# If not using an authenticator, you may also provide inline auth headers:
# headers["Private-Token"] = self.config.get("auth_token") # noqa: ERA001
return headers
def get_new_paginator(self) -> BaseAPIPaginator:
"""Create a new pagination helper instance.
If the source API can make use of the `next_page_token_jsonpath`
attribute, or it contains a `X-Next-Page` header in the response
then you can remove this method.
If you need custom pagination that uses page numbers, "next" links, or
other approaches, please read the guide: <https://sdk.meltano.com/en/v0.25.0/guides/pagination-classes.html>.
Returns:
A pagination helper instance.
"""
return super().get_new_paginator()
def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params: dict = {}
if next_page_token:
params["start"] = next_page_token
return params
def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result records.
Args:
response: The HTTP ``requests.Response`` object.
Yields:
Each record from the source.
"""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())
def _row_is_updated(self, row: dict, starting_value: int) -> bool:
"""Check if a row is updated.
Args:
row: The row to check.
context: The stream context.
Returns:
True if the row is updated, False otherwise.
"""
update_value = row.get(self.replication_key)
if update_value is None:
return True
return update_value > starting_value
def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""Filter out records that are older than the latest record.
Overcomes the API limitation of not being able to filter by
updated timestamp. This is a workaround to avoid processing
all records every time.
"""
max_updated_timestamp = self.get_starting_replication_key_value(context)
if max_updated_timestamp is None:
return super().post_process(row, context=context)
if self.replication_key and self._row_is_updated(row, max_updated_timestamp):
return super().post_process(row, context=context)
return None
Edgar Ramírez (Arch.dev)
05/09/2025, 5:30 PMStream.get_records
(and thus Stream.parse_response
) here:
https://github.com/meltano/sdk/blob/86355f8b71be9cbabc08d64ab28ee92bf3e0a363/singer_sdk/streams/core.py#L1116-L1119
Adding a breakpoint()
there or around line 1182 might reveal if the context is coming in empty for some reason.visch
05/09/2025, 8:39 PMsegment_id
in your schema for the stream? You'll see a warning message if it isn'thawkar_mahmod
05/13/2025, 9:49 AMhawkar_mahmod
05/13/2025, 9:49 AM