```INFO | target-snowflake | Emitting comp...
# troubleshooting
i
Copy code
INFO     | target-snowflake     | Emitting completed target state {"bookmarks": {"jd_NewUpdatedTimesheetRecords": {"starting_replication_value": "2022-01-01T00:00:00", "progress_markers": {"Note": "Progress is not resumable if interrupted.", "replication_key": "DATEUPDATED", "replication_key_value": "2024-05-16T10:24:22"}}}} cmd_type=elb consumer=True name=target-snowflake producer=False stdio=stderr string_id=target-snowflake
I keep running into this message when running a particular stream in tap, and each time it prints - it prints with the same
replication_key_value
of
2024-05-16T10:24:22
. Between each message I'll get some standard messages for the record count, such as
JSONPath $.data[*] match count: 1012
and the total record count appears to be changing with each log message. Anyone run into this behavior before? Perhaps it's getting stuck on that particular request?
e
So there's two things going on here it seems:
I keep running into this message when running a particular stream in tap, and each time it prints - it prints with the same
replication_key_value
of
2024-05-16T10:24:22
.
https://sdk.meltano.com/en/v0.37.0/faq.html#i-m-seeing-note-progress-is-not-resumable-if-interrupted-in-my-state-files
such as
JSONPath $.data[*] match count: 1012
and the total record count appears to be changing with each log message
That's expected if the stream changes from message to message, or if it's a different partition each time. Perhaps it's a child stream?
i
Sooo, try setting the
is_sorted = True
in the class?
i don't care if it's sorted in the downstream table
e
Yeah, set that in the stream class if you can confirm the records come un sorted.
i don't care if it's sorted in the downstream table
The flag is rather used to determine if the state can be updated when any record is received, otherwise you'd risk updating to an older bookmark
i
ahhhhhh
makes sense
Copy code
class jd_NewUpdatedTimesheetRecords(JobDivaStream):
    
    is_sorted = True
👍 1
so like this?
That's expected if the stream changes from message to message, or if it's a different partition each time. Perhaps it's a child stream?
I was citing the changing record count to show that I don't suspect it's running the same request every time since the row count for that partition changes, you know?
e
Yeah that's the right place to set it
i
Copy code
singer_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream. Latest value '2019-08-05T15:47:20' is smaller than previous max '2024-01-03T08:44:02'.
I'm getting this now so I think I'll use a different column. As a rule of thumb should I be using the MAX or "largest" or "soonest" date column as a replication key for a particular stream?
I don't believe the data is sorted
Hmm
The "progress is not resumable" isn't the primary issue though in the log message, but rather the repeating replication key values that occur every 20 or 30 log messages
Yeah, set that in the stream class if you can confirm the records come un sorted.
Do you mean "come in sorted"? hahaha could mean two very different things
e
Oh yeah, typo! I meant
come in sorted
I'm getting this now so I think I'll use a different column. As a rule of thumb should I be using the MAX or "largest" or "soonest" date column as a replication key for a particular stream?
I think the answer depends a lot but I'd summarize as "whatever ensures you don't lose any data" 😅. If that means not supporting incremental replication then there's a tradeoff to consider, e.g. I'll only sync new records but will miss updates which I'll tolerate but I'll also run a full-refresh every so often.
i
So I tried using a different column for the replication key, and I'm still running into the same issue. Where I'm getting confused is why my code keeps setting that
replication_key
to the same exact value. This is weird behavior, since my code is supposed to be parsing the previous "toDate" parameter for the next request, making that the new "fromDate" and adding 7 days to that for the "toDate" for the next request, so why would it just keep getting stuck on the same request or printing the same
"replication_key": "DATEUPDATED", "replication_key_value": "2024-05-16T10:24:22"
? My pagination class:
Copy code
class JobDivaPaginator(BaseAPIPaginator):
    def __init__(self, *args, **kwargs):
        super().__init__(None, *args, **kwargs)

    def has_more(self, response):
        #check the get_next() response to make sure it's before today
        return self.get_next(response) < date.today()

    def get_next(self, response):
        #get the parameters used for the previous request
        #return ["toDate"] param from the previous request and add 1 second to determine the new ["fromDate"]
        params = dict(parse_qsl(urlparse(response.request.url).query))
        
        return datetime.strptime(params["toDate"], OUTPUT_DATE_FORMAT).date() + timedelta(seconds=1)
My params method:
Copy code
def get_url_params(
        self,
        context: dict | None,  # noqa: ARG002
        next_page_token: date | None,  # noqa: ANN401
    ) -> dict[str, Any]:
        """Return a dictionary of values to be used in URL parameterization.

        Args:
            context: The stream context.
            next_page_token: The next page index or value.

        Returns:
            A dictionary of URL query parameters.
        """
        #start_value = self.config["start_date"] 
        start_value = self.get_starting_replication_key_value(context)
        from_date = (
        next_page_token
        or datetime.strptime(start_value, INPUT_DATE_FORMAT).date()
        )
        to_date = from_date + timedelta(days=7) - timedelta(seconds=1)

        return {
            "fromDate": from_date.strftime(OUTPUT_DATE_FORMAT),
            "toDate": to_date.strftime(OUTPUT_DATE_FORMAT),
        }
Am i going insane?
Perhaps it's just getting stuck somewhere and not properly updated the state?
e
are you testing this with
meltano run
?
i
Yeah I'm testing both with meltano run and through my container by materializing the tap assets with just that stream (ignored other streams in my tap.py within custom stream and built the image with that saved)
e
Can you try using
meltano run --full-refresh
?
i
passing the --full-refresh flag results in it syncing records up until that one date, then getting stuck at that date again
e
Have you tried debugging in key places to see where that value is coming from?
i
I have not. I usually don't use my IDE for meltano run I just run commands from my command prompt