Ian OLeary
05/22/2024, 3:59 PMINFO | target-snowflake | Emitting completed target state {"bookmarks": {"jd_NewUpdatedTimesheetRecords": {"starting_replication_value": "2022-01-01T00:00:00", "progress_markers": {"Note": "Progress is not resumable if interrupted.", "replication_key": "DATEUPDATED", "replication_key_value": "2024-05-16T10:24:22"}}}} cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
I keep running into this message when running a particular stream in tap, and each time it prints - it prints with the same replication_key_value
of 2024-05-16T10:24:22
. Between each message I'll get some standard messages for the record count, such as JSONPath $.data[*] match count: 1012
and the total record count appears to be changing with each log message. Anyone run into this behavior before? Perhaps it's getting stuck on that particular request?Edgar Ramírez (Arch.dev)
05/22/2024, 6:23 PMI keep running into this message when running a particular stream in tap, and each time it prints - it prints with the samehttps://sdk.meltano.com/en/v0.37.0/faq.html#i-m-seeing-note-progress-is-not-resumable-if-interrupted-in-my-state-filesofreplication_key_value
.2024-05-16T10:24:22
such asThat's expected if the stream changes from message to message, or if it's a different partition each time. Perhaps it's a child stream?and the total record count appears to be changing with each log messageJSONPath $.data[*] match count: 1012
Ian OLeary
05/22/2024, 6:38 PMis_sorted = True
in the class?Ian OLeary
05/22/2024, 6:39 PMEdgar Ramírez (Arch.dev)
05/22/2024, 6:39 PMEdgar Ramírez (Arch.dev)
05/22/2024, 6:41 PMi don't care if it's sorted in the downstream table
The flag is rather used to determine if the state can be updated when any record is received, otherwise you'd risk updating to an older bookmark
Ian OLeary
05/22/2024, 6:41 PMIan OLeary
05/22/2024, 6:41 PMIan OLeary
05/22/2024, 6:42 PMclass jd_NewUpdatedTimesheetRecords(JobDivaStream):
is_sorted = True
Ian OLeary
05/22/2024, 6:42 PMIan OLeary
05/22/2024, 6:46 PMThat's expected if the stream changes from message to message, or if it's a different partition each time. Perhaps it's a child stream?I was citing the changing record count to show that I don't suspect it's running the same request every time since the row count for that partition changes, you know?
Edgar Ramírez (Arch.dev)
05/22/2024, 7:01 PMIan OLeary
05/22/2024, 8:29 PMsinger_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream. Latest value '2019-08-05T15:47:20' is smaller than previous max '2024-01-03T08:44:02'.
I'm getting this now so I think I'll use a different column. As a rule of thumb should I be using the MAX or "largest" or "soonest" date column as a replication key for a particular stream?Ian OLeary
05/22/2024, 8:34 PMIan OLeary
05/22/2024, 8:34 PMIan OLeary
05/22/2024, 8:41 PMIan OLeary
05/22/2024, 8:42 PMYeah, set that in the stream class if you can confirm the records come un sorted.Do you mean "come in sorted"? hahaha could mean two very different things
Edgar Ramírez (Arch.dev)
05/22/2024, 8:43 PMcome in sorted
Edgar Ramírez (Arch.dev)
05/22/2024, 8:46 PMI'm getting this now so I think I'll use a different column. As a rule of thumb should I be using the MAX or "largest" or "soonest" date column as a replication key for a particular stream?I think the answer depends a lot but I'd summarize as "whatever ensures you don't lose any data" 😅. If that means not supporting incremental replication then there's a tradeoff to consider, e.g. I'll only sync new records but will miss updates which I'll tolerate but I'll also run a full-refresh every so often.
Ian OLeary
05/23/2024, 1:46 PMreplication_key
to the same exact value. This is weird behavior, since my code is supposed to be parsing the previous "toDate" parameter for the next request, making that the new "fromDate" and adding 7 days to that for the "toDate" for the next request, so why would it just keep getting stuck on the same request or printing the same "replication_key": "DATEUPDATED", "replication_key_value": "2024-05-16T10:24:22"
?
My pagination class:
class JobDivaPaginator(BaseAPIPaginator):
def __init__(self, *args, **kwargs):
super().__init__(None, *args, **kwargs)
def has_more(self, response):
#check the get_next() response to make sure it's before today
return self.get_next(response) < date.today()
def get_next(self, response):
#get the parameters used for the previous request
#return ["toDate"] param from the previous request and add 1 second to determine the new ["fromDate"]
params = dict(parse_qsl(urlparse(response.request.url).query))
return datetime.strptime(params["toDate"], OUTPUT_DATE_FORMAT).date() + timedelta(seconds=1)
My params method:
def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: date | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
#start_value = self.config["start_date"]
start_value = self.get_starting_replication_key_value(context)
from_date = (
next_page_token
or datetime.strptime(start_value, INPUT_DATE_FORMAT).date()
)
to_date = from_date + timedelta(days=7) - timedelta(seconds=1)
return {
"fromDate": from_date.strftime(OUTPUT_DATE_FORMAT),
"toDate": to_date.strftime(OUTPUT_DATE_FORMAT),
}
Am i going insane?Ian OLeary
05/23/2024, 1:53 PMEdgar Ramírez (Arch.dev)
05/23/2024, 2:36 PMmeltano run
?Ian OLeary
05/23/2024, 2:37 PMEdgar Ramírez (Arch.dev)
05/23/2024, 3:01 PMmeltano run --full-refresh
?Ian OLeary
05/23/2024, 3:02 PMEdgar Ramírez (Arch.dev)
05/23/2024, 3:07 PMIan OLeary
05/23/2024, 3:20 PM