I posted this in <#C068YBV6KEK|>, but does anyone ...
# troubleshooting
i
I posted this in #C068YBV6KEK, but does anyone know how to use dates for pagination? I've got my tap working so far as getting one range of dates through, but I'm stuck on what to set as the replication key, how to get that replication key from the context, and what methods to override. My current
get_url_params
looks like this:
Copy code
def get_url_params(
        self,
        context: dict | None,  # noqa: ARG002
        next_page_token: Any | None,  # noqa: ANN401
    ) -> dict[str, Any]:

        params: dict = {}
        params["fromDate"] = "02/01/2022 00:00:00"
        params["toDate"] = "02/08/2022 00:00:00"
        return params
Does the replication key need to be one of my columns within the dataset? Perhaps one of the parameters? Or is it its own seperate thing?
r
How are you determining where
fromDate
and
toDate
are coming from initially? From user config (both the start date and end date, or just the start date with a dynamic default for end date)?
I think you are conflating pagination and state concepts here - I would focus on one at a time. Do you have to make multiple requests to the endpoint you are providing
fromDate
and
toDate
to to get the data you need? If yes, does the API return anything that you can use to determine the next values for
fromDate
and
toDate
, or do you just want to paginate up until a date range inclusive of the current date?
i
Right now I just hardcoded the
fromDate
and
toDate
to those values to test whether or not the tap works - and it does. The API only accepts a 14-day range per request, so I'll need to do multiple requests to get a full refresh of the table. My ultimate goal is to get the initial start date config from user input
start_date
, so I can set the initial
fromDate
to that date, add 14 days to determine the
toDate
, and use that range for the first request - then query data for the next 14 day range, etc... until I hit the
current_date()
so it doesn't run infinitely. My API only returns the data, so that's where I'm getting confused.
r
OK, great. So your probably going to want add a custom pagination class like this (registered here) and then resolve
startDate
in
get_url_params
with some logic like this. Bear in mind that it is dealing with a single date from a URL path, rather than a range from URL params. I suggest having a read through this thread as I think the objective was pretty similar to yours.
👍 1
i
I actually ran across this earlier - thank you. I'll take a look at it.
in your caiso tap, is the current_date the date for that instance of the class
or the literal current date like get_date()
Copy code
class JobDivaPaginator(BaseAPIPaginator):
    def __init__(self, *args, **kwargs):
        super().__init__(None, *args, **kwargs)

    def has_more(self, response):
        return self.get_next(response) < date.today()

    def get_next(self, response):

        # increment by 14 days
        return JobDivaStream.get_from_date(response) + timedelta(days=14)
    

class JobDivaStream(RESTStream):
    """JobDiva stream class."""

    default_start_date_str = "02/01/2022"
    default_start_date = datetime.strptime(default_start_date_str, "%m/%d/%Y")

    @staticmethod
    def get_from_date(self):
        
        fromDate = self.default_start_date
        return fromDate

    def get_new_paginator(self):
        return JobDivaPaginator()

    def get_url_params(
        self,
        context: dict | None,  # noqa: ARG002
        next_page_token: date | None,  # noqa: ANN401
    ) -> dict[str, Any]:
        """Return a dictionary of values to be used in URL parameterization.

        Args:
            context: The stream context.
            next_page_token: The next page index or value.

        Returns:
            A dictionary of URL query parameters.
        """

        params: dict = {}

        fromDate = (
        next_page_token
        or self.default_start_date
        )
        params["fromDate"] = str(fromDate)

        toDate = fromDate + timedelta(days=14) - timedelta(seconds=1)
        params["toDate"] = str(toDate)

        return params
this is sort of pseudo-code, but is this a decent start? I think this will create a fromDate and use that by default, then add 14 days to determine the next fromDate. I'll just handle the toDate logic in the params because it will always be 1 second less than 14 days from the fromDate. Where I'm stuck is: say I have a default date of 02/01/2022 and with the get_next I can set a date of 02/15/2022 to be the start date. How do I then get that date to be used as my base date to add to for the next request? Hope that makes sense
r
in your caiso tap, is the current_date the date for that instance of the class
Yeah, current date as in date the date to make the request with.
> Where I'm stuck is: say I have a default date of 02/01/2022 and with the get_next I can set a date of 02/15/2022 to be the start date. How do I then get that date to be used as my base date to add to for the next request? Hope that makes sense
Copy code
from urllib.parse import parse_qsl, urlparse

...

DATE_FORMAT = ...  # define date format e.g. "%Y-%m-%d"

...

class JobDivaPaginator(BaseAPIPaginator):

    ...

    def get_next(self, response):
        params = dict(parse_qsl(urlparse(response.request.url).params))
        return datetime.strptime(params["toDate"], DATE_FORMAT).date() + timedelta(days=1)

...

class JobDivaStream(RESTStream):

    ...

    def get_url_params(self, context, next_page_token):
        start_value = self.get_starting_replication_key_value(context)  # will read `start_date` from config if no state set
        from_date = next_page_token or datetime.strptime(start_value, DATE_FORMAT).date()
        to_date = from_date + timedelta(days=14)

        return {
            "fromDate": from_date.strftime(DATE_FORMAT),
            "toDate": to_date.strftime(DATE_FORMAT),
        }
🙌 1
i
should I define get_starting_replication_key_value as a method in the client stream class and just write that myself to return the date set in config??
You're the goat btw thank you so much
🫡 1
r
should I define get_starting_replication_key_value as a method in the client stream class and just write that myself to return the date set in config??
get_starting_replication_key_value
will be available to you already since your stream is inheriting from
RESTStream
(which inherits from
Stream
). It will handle the lookup of start_date from config automatically if it can't find a value in state to use - I expect this is one of the next things you're going to bump into.
i
You can see the future:
Copy code
line 93, in get_url_params
    or datetime.strptime(start_value, DATE_FORMAT).date()
TypeError: strptime() argument 1 must be str, not None
I get that the
get_starting_replication_key_value
method reads from the context but I suspect it's reading it from here by default, since the replication_key set in the stream class is "None".
Copy code
class NewUpdatedCandidateRecordsStream(JobDivaStream):
    """Define custom stream."""

    name = "NewUpdatedCandidateRecordsStream"
    path = "bi/NewUpdatedCandidateRecords"
    primary_keys = ["CANDIDATEID"]
    replication_key = None
Should I set that replication key to
replication_key = self.context["start_date"]
? Is that what you mean? Manually setting it to some value throws "Field '02/01/2022' is not in schema for stream {stream}"
r
Do your records contain a date property? Ideally, that should be set as the replication key (i.e.
replication_key = "date"
) - that way, on the next sync the tap will start from the last encountered date, rather than the initial date set in config. Having said that,
get_starting_replication_key_value
should not be returning
None
in your case, unless
start_date
is not set in config (i.e. in
meltano.yml
or
.env
). I'd expect it to return the same value from config every time, as you have no replication behaviour defined yet.
🙌 1
Just to get things going, you could replace
Copy code
start_value = self.get_starting_replication_key_value(context)
with
Copy code
start_value = self.config["start_date"]
for the time being and deal with the state implementation later.
🙌 1
i
There's
"DATEUPDATED":
which comes out like this
"2022-01-22T11:58:22"
as a string I think.
So the max dateupdated would ideally be the replication key
Copy code
line 222, in is_timestamp_replication_key
    type_dict = self.schema.get("properties", {}).get(self.replication_key)
TypeError: unhashable type: 'list'
replication_key = ["DATEUPDATED"]
is this caused because I'm not specifying a single value for the replication key but rather the whole column itself?
r
Yeah, you just want to supply a single property name as a string for
replication_key
.
i
so set it to self.config["start_date"]? is that what you mean by a property?
r
No, a top-level property of the schema you have defined for your stream:
Copy code
replication_key = "DATEUPDATED"
i
Copy code
return datetime.strptime(params["toDate"], DATE_FORMAT).date() + timedelta(seconds=1)
KeyError: 'toDate'
It looks like its not getting the 'toDate' param from the returned request. in the (self, request), where exactly is that request coming from or what does it represent? the whole instance of the previous request that just returned the dataset? Im getting the first date range to run fine
r
Copy code
def get_next(self, response):
        params = dict(parse_qsl(urlparse(response.request.url).params))
        return datetime.strptime(params["toDate"], DATE_FORMAT).date() + timedelta(days=1)
will fire after each request (
has_more
dictates whether or not to keep making requests), so you have access to its response and therefore the request itself. You should be able to see the request path in the log output, which might help you see if you are setting your URL params correctly.
i
Right, that makes sense - pagination won't start until after a successful initial run. Are you talking about the meltano logs? Because I can't seem to find the parameters in the log output. I get the url path back (or at least the endpoint extension to it), but that doesn't contain the params.
r
Hmm... I don't see them either actually, so I was probably wrong about that. What does your
get_url_params
look like now?
i
Copy code
def get_url_params(
        self,
        context: dict | None,  # noqa: ARG002
        next_page_token: date | None,  # noqa: ANN401
    ) -> dict[str, Any]:
        """Return a dictionary of values to be used in URL parameterization.

        Args:
            context: The stream context.
            next_page_token: The next page index or value.

        Returns:
            A dictionary of URL query parameters.
        """
        start_value = self.config["start_date"] #self.get_starting_replication_key_value(context)
        from_date = (
        next_page_token
        or datetime.strptime(start_value, DATE_FORMAT).date()
        )
        to_date = from_date + timedelta(days=14) - timedelta(seconds=1)

        return {
            "fromDate": from_date.strftime(DATE_FORMAT),
            "toDate": to_date.strftime(DATE_FORMAT),
        }
the initial params seems to be working fine so it mustn't be an issue with the return statement
r
Maybe it's an issue with parsing the request URL in the
JobDivaPaginator
... You could override
prepare_request
temporarily to log it out:
Copy code
def prepare_request(self, context, next_page_token):
        request = super().prepare_request(context, next_page_token)
        self.logger.info("Prepared request URL: %s", request.url)

        return request
i
Copy code
?fromDate=02%2F01%2F2022&toDate=02%2F15%2F2022
This is how the parameters are appended to the url
r
Oh, it should be
query
- not
params
.
👍 1
i
Got it - retesting now
ran the first request
WARNING | singer_sdk | Stream is assumed to be unsorted, progress is not resumable if interrupted
could this be an issue? it just ran 2 sequential request succesfully
r
Not seen that before, but is probably something to do with replication.
From here if you want to work backwards.
i
THE WHOLE STREAM WORKED
🙌 1
WOOOOO
You're the GOAT Reuben
😂 1
2024-02-14T182554.322518Z [info ] Incremental state has been updated at 2024-02-14 182554.322518. where can i check where this was updated???
r
Copy code
meltano state list
meltano state get <state id>
i
bro is carrying my career
Copy code
{
  "singer_state": {
    "bookmarks": {
      "NewUpdatedCandidateRecordsStream": {
        "replication_key": "DATEUPDATED",
        "replication_key_value": "2024-02-14T13:23:43"
      }
    }
  }
}
awesome: looks like the right value.
r
bro is carrying my career
😂😂😂
Glad you got it working!
i
and that replication key differs per target right
r
The state ID differs, yes.