got a weird issue when trying to use stateful stre...
# singer-tap-development
d
got a weird issue when trying to use stateful stream partitioning. Partitioned stream works just fine when run from scratch, but when trying to run using a state file, I’m getting the following error a few partitions in:
Copy code
ValueError: State file contains duplicate entries for partition: {state_partition_context}.
Matching state values were: [{'context': {'mid': 14709184, 'start': '2022-04-04', 'end': '2022-04-26'}, 'replication_key': 'transaction_time', 'replication_key_value': '2022-04-26T23:25:55+00:00'}, {'context': {'mid': 14709184, 'start': '2022-04-04', 'end': '2022-04-26'}, 'starting_replication_value': '2022-01-01', 'progress_markers': {'Note': 'Progress is not resumable if interrupted.', 'replication_key': 'transaction_time', 'replication_key_value': '2022-04-03T22:43:22+00:00'}}]
code in 🧵
Copy code
class TransactionsStream(JoryStream):
    """Transactions stream representing Jory Transactions."""

    name = "transactions"
    path = "/transactions"
    primary_keys = ["mid", "txn_id"]
    replication_key = "transaction_time"
    schema = PropertiesList(*TRANSACTION).to_dict()

    @property
    def partitions(self) -> List[Dict[str, int]]:
        """Return a list of partitions to be used by the stream.
        Each stream partition will correspond to a single Merchant ID (MID).
        """
        return [{"mid": mid} for mid in self.config["mids"]]

    def _request_date_range(self, start: datetime) -> Iterable[Tuple[datetime, datetime]]:
        """Return a generator of dates as determined by the start_date config value and current datetime."""
        now = pendulum.now("UTC")
        while start < now:
            end = min(start + timedelta(days=30), now)
            yield start, end
            start = end + timedelta(days=1)

    def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
        """Return a generator of row-type dictionary objects.
        Each row emitted should be a dictionary of property names to their values.
        """
        context = context or {}
        starting_ts = self.get_starting_timestamp(context)
        if starting_ts is not None:
            for start, end in self._request_date_range(starting_ts):
                context.update({"start": start.strftime("%Y-%m-%d"), "end": end.strftime("%Y-%m-%d")})
                for row in self.request_records(context):
                    row = self.post_process(row, context)
                    yield row

    def get_url_params(self, context: Optional[dict], next_page_token: Optional[int]) -> Dict[str, Any]:
        """Override the RESTStream get_url_params method."""
        params = {}
        if context is not None:
            params["mid"] = context["mid"]
            params["startDate"] = context["start"]
            params["endDate"] = context["end"]
        return params

    def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
        """Post-process each transaction record by enriching it with the corresponding Merchant ID."""
        if context is not None:
            row["mid"] = context["mid"]
        return row
and the
JoryStream
class is a child class of RESTStream
my gut tells me it has something to do with some strange interaction between partitioned state and the
get_starting_timestamp
functionality, but Im not sure.
e
Hi @david_wallace! I'm not sure how safe it'd be to update the context dynamically as you're doing in
get_records
(multiple times) and
get_url_params
. On the other hand,
get_starting_timestamp
seems to be doing the right thing since it encountered two bookmarks with the same partition context. So, the real issue here is how you ended up with a duplicate partition context 🤔. The state partition context is retrieved before
get_records
is called, so that may be problematic for your use case: https://gitlab.com/meltano/sdk/-/blob/11638bbd4cf20e9c0cdd43884c705c79e6c5aa01/singer_sdk/streams/core.py#L941-946
d
cool. as far as I can tell, context is only being updated a single time (once in get_records). I actually didn’t know that updating context was considered an anti-pattern in the sdk. I think I’ve seen that elsewhere. what’s the recommended way to supply dynamic context to downstream methods?
e
Actually, it might be fine to update the context but I don't think we are testing those sort of flows (cc @aaronsteers). One thing we could do to avoid getting into a situation with duplicate contexts is to check for them when a new partition is added here: https://gitlab.com/meltano/sdk/-/blob/72e7e2be7271d37c9d2c40ae0d0be3785c243876/singer_sdk/helpers/_state.py#L95-101
d
ah, interesting. yeah I tried deep diving on this last night but tbh I still can’t understand why multiple states are being stored for the same partition context. it always looks like one of them is an “in progress” state that gets stored alongside the previous state