david_wallace
04/26/2022, 11:39 PMValueError: State file contains duplicate entries for partition: {state_partition_context}.
Matching state values were: [{'context': {'mid': 14709184, 'start': '2022-04-04', 'end': '2022-04-26'}, 'replication_key': 'transaction_time', 'replication_key_value': '2022-04-26T23:25:55+00:00'}, {'context': {'mid': 14709184, 'start': '2022-04-04', 'end': '2022-04-26'}, 'starting_replication_value': '2022-01-01', 'progress_markers': {'Note': 'Progress is not resumable if interrupted.', 'replication_key': 'transaction_time', 'replication_key_value': '2022-04-03T22:43:22+00:00'}}]
code in 🧵david_wallace
04/26/2022, 11:39 PMclass TransactionsStream(JoryStream):
"""Transactions stream representing Jory Transactions."""
name = "transactions"
path = "/transactions"
primary_keys = ["mid", "txn_id"]
replication_key = "transaction_time"
schema = PropertiesList(*TRANSACTION).to_dict()
@property
def partitions(self) -> List[Dict[str, int]]:
"""Return a list of partitions to be used by the stream.
Each stream partition will correspond to a single Merchant ID (MID).
"""
return [{"mid": mid} for mid in self.config["mids"]]
def _request_date_range(self, start: datetime) -> Iterable[Tuple[datetime, datetime]]:
"""Return a generator of dates as determined by the start_date config value and current datetime."""
now = pendulum.now("UTC")
while start < now:
end = min(start + timedelta(days=30), now)
yield start, end
start = end + timedelta(days=1)
def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
"""Return a generator of row-type dictionary objects.
Each row emitted should be a dictionary of property names to their values.
"""
context = context or {}
starting_ts = self.get_starting_timestamp(context)
if starting_ts is not None:
for start, end in self._request_date_range(starting_ts):
context.update({"start": start.strftime("%Y-%m-%d"), "end": end.strftime("%Y-%m-%d")})
for row in self.request_records(context):
row = self.post_process(row, context)
yield row
def get_url_params(self, context: Optional[dict], next_page_token: Optional[int]) -> Dict[str, Any]:
"""Override the RESTStream get_url_params method."""
params = {}
if context is not None:
params["mid"] = context["mid"]
params["startDate"] = context["start"]
params["endDate"] = context["end"]
return params
def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
"""Post-process each transaction record by enriching it with the corresponding Merchant ID."""
if context is not None:
row["mid"] = context["mid"]
return row
david_wallace
04/26/2022, 11:40 PMJoryStream
class is a child class of RESTStreamdavid_wallace
04/26/2022, 11:41 PMget_starting_timestamp
functionality, but Im not sure.edgar_ramirez_mondragon
04/27/2022, 12:29 AMget_records
(multiple times) and get_url_params
. On the other hand, get_starting_timestamp
seems to be doing the right thing since it encountered two bookmarks with the same partition context. So, the real issue here is how you ended up with a duplicate partition context 🤔.
The state partition context is retrieved before get_records
is called, so that may be problematic for your use case: https://gitlab.com/meltano/sdk/-/blob/11638bbd4cf20e9c0cdd43884c705c79e6c5aa01/singer_sdk/streams/core.py#L941-946david_wallace
04/27/2022, 12:51 AMedgar_ramirez_mondragon
04/27/2022, 11:28 PMdavid_wallace
04/27/2022, 11:29 PM