Ian OLeary
02/13/2024, 4:48 PMget_url_params
looks like this:
def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:
params: dict = {}
params["fromDate"] = "02/01/2022 00:00:00"
params["toDate"] = "02/08/2022 00:00:00"
return params
Does the replication key need to be one of my columns within the dataset? Perhaps one of the parameters? Or is it its own seperate thing?Reuben (Matatika)
02/13/2024, 5:00 PMfromDate
and toDate
are coming from initially? From user config (both the start date and end date, or just the start date with a dynamic default for end date)?Reuben (Matatika)
02/13/2024, 5:04 PMfromDate
and toDate
to to get the data you need? If yes, does the API return anything that you can use to determine the next values for fromDate
and toDate
, or do you just want to paginate up until a date range inclusive of the current date?Ian OLeary
02/13/2024, 5:14 PMfromDate
and toDate
to those values to test whether or not the tap works - and it does. The API only accepts a 14-day range per request, so I'll need to do multiple requests to get a full refresh of the table. My ultimate goal is to get the initial start date config from user input start_date
, so I can set the initial fromDate
to that date, add 14 days to determine the toDate
, and use that range for the first request - then query data for the next 14 day range, etc... until I hit the current_date()
so it doesn't run infinitely. My API only returns the data, so that's where I'm getting confused.Reuben (Matatika)
02/13/2024, 5:28 PMstartDate
in get_url_params
with some logic like this. Bear in mind that it is dealing with a single date from a URL path, rather than a range from URL params.
I suggest having a read through this thread as I think the objective was pretty similar to yours.Ian OLeary
02/13/2024, 5:32 PMIan OLeary
02/13/2024, 7:54 PMIan OLeary
02/13/2024, 7:54 PMIan OLeary
02/13/2024, 8:13 PMclass JobDivaPaginator(BaseAPIPaginator):
def __init__(self, *args, **kwargs):
super().__init__(None, *args, **kwargs)
def has_more(self, response):
return self.get_next(response) < date.today()
def get_next(self, response):
# increment by 14 days
return JobDivaStream.get_from_date(response) + timedelta(days=14)
class JobDivaStream(RESTStream):
"""JobDiva stream class."""
default_start_date_str = "02/01/2022"
default_start_date = datetime.strptime(default_start_date_str, "%m/%d/%Y")
@staticmethod
def get_from_date(self):
fromDate = self.default_start_date
return fromDate
def get_new_paginator(self):
return JobDivaPaginator()
def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: date | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params: dict = {}
fromDate = (
next_page_token
or self.default_start_date
)
params["fromDate"] = str(fromDate)
toDate = fromDate + timedelta(days=14) - timedelta(seconds=1)
params["toDate"] = str(toDate)
return params
this is sort of pseudo-code, but is this a decent start? I think this will create a fromDate and use that by default, then add 14 days to determine the next fromDate. I'll just handle the toDate logic in the params because it will always be 1 second less than 14 days from the fromDate.
Where I'm stuck is: say I have a default date of 02/01/2022 and with the get_next I can set a date of 02/15/2022 to be the start date. How do I then get that date to be used as my base date to add to for the next request? Hope that makes senseReuben (Matatika)
02/13/2024, 10:25 PMin your caiso tap, is the current_date the date for that instance of the classYeah, current date as in date the date to make the request with.
Reuben (Matatika)
02/13/2024, 11:27 PMfrom urllib.parse import parse_qsl, urlparse
...
DATE_FORMAT = ... # define date format e.g. "%Y-%m-%d"
...
class JobDivaPaginator(BaseAPIPaginator):
...
def get_next(self, response):
params = dict(parse_qsl(urlparse(response.request.url).params))
return datetime.strptime(params["toDate"], DATE_FORMAT).date() + timedelta(days=1)
...
class JobDivaStream(RESTStream):
...
def get_url_params(self, context, next_page_token):
start_value = self.get_starting_replication_key_value(context) # will read `start_date` from config if no state set
from_date = next_page_token or datetime.strptime(start_value, DATE_FORMAT).date()
to_date = from_date + timedelta(days=14)
return {
"fromDate": from_date.strftime(DATE_FORMAT),
"toDate": to_date.strftime(DATE_FORMAT),
}
Ian OLeary
02/14/2024, 12:23 AMIan OLeary
02/14/2024, 12:23 AMReuben (Matatika)
02/14/2024, 12:32 AMshould I define get_starting_replication_key_value as a method in the client stream class and just write that myself to return the date set in config??
get_starting_replication_key_value
will be available to you already since your stream is inheriting from RESTStream
(which inherits from Stream
). It will handle the lookup of start_date from config automatically if it can't find a value in state to use - I expect this is one of the next things you're going to bump into.Ian OLeary
02/14/2024, 12:38 AMline 93, in get_url_params
or datetime.strptime(start_value, DATE_FORMAT).date()
TypeError: strptime() argument 1 must be str, not None
I get that the get_starting_replication_key_value
method reads from the context but I suspect it's reading it from here by default, since the replication_key set in the stream class is "None".
class NewUpdatedCandidateRecordsStream(JobDivaStream):
"""Define custom stream."""
name = "NewUpdatedCandidateRecordsStream"
path = "bi/NewUpdatedCandidateRecords"
primary_keys = ["CANDIDATEID"]
replication_key = None
Should I set that replication key to replication_key = self.context["start_date"]
? Is that what you mean? Manually setting it to some value throws "Field '02/01/2022' is not in schema for stream {stream}"Reuben (Matatika)
02/14/2024, 12:45 AMreplication_key = "date"
) - that way, on the next sync the tap will start from the last encountered date, rather than the initial date set in config. Having said that, get_starting_replication_key_value
should not be returning None
in your case, unless start_date
is not set in config (i.e. in meltano.yml
or .env
). I'd expect it to return the same value from config every time, as you have no replication behaviour defined yet.Reuben (Matatika)
02/14/2024, 12:48 AMstart_value = self.get_starting_replication_key_value(context)
with
start_value = self.config["start_date"]
for the time being and deal with the state implementation later.Ian OLeary
02/14/2024, 1:05 AM"DATEUPDATED":
which comes out like this "2022-01-22T11:58:22"
as a string I think.Ian OLeary
02/14/2024, 1:06 AMIan OLeary
02/14/2024, 1:07 AMline 222, in is_timestamp_replication_key
type_dict = self.schema.get("properties", {}).get(self.replication_key)
TypeError: unhashable type: 'list'
replication_key = ["DATEUPDATED"]
is this caused because I'm not specifying a single value for the replication key but rather the whole column itself?Reuben (Matatika)
02/14/2024, 1:11 AMreplication_key
.Ian
02/14/2024, 1:31 AMReuben (Matatika)
02/14/2024, 1:37 AMreplication_key = "DATEUPDATED"
Reuben (Matatika)
02/14/2024, 1:43 AMIan OLeary
02/14/2024, 3:19 AMreturn datetime.strptime(params["toDate"], DATE_FORMAT).date() + timedelta(seconds=1)
KeyError: 'toDate'
It looks like its not getting the 'toDate' param from the returned request. in the (self, request), where exactly is that request coming from or what does it represent? the whole instance of the previous request that just returned the dataset? Im getting the first date range to run fineReuben (Matatika)
02/14/2024, 10:15 AMdef get_next(self, response):
params = dict(parse_qsl(urlparse(response.request.url).params))
return datetime.strptime(params["toDate"], DATE_FORMAT).date() + timedelta(days=1)
will fire after each request (has_more
dictates whether or not to keep making requests), so you have access to its response and therefore the request itself. You should be able to see the request path in the log output, which might help you see if you are setting your URL params correctly.Ian OLeary
02/14/2024, 1:49 PMReuben (Matatika)
02/14/2024, 3:29 PMget_url_params
look like now?Ian OLeary
02/14/2024, 4:00 PMdef get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: date | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
start_value = self.config["start_date"] #self.get_starting_replication_key_value(context)
from_date = (
next_page_token
or datetime.strptime(start_value, DATE_FORMAT).date()
)
to_date = from_date + timedelta(days=14) - timedelta(seconds=1)
return {
"fromDate": from_date.strftime(DATE_FORMAT),
"toDate": to_date.strftime(DATE_FORMAT),
}
Ian OLeary
02/14/2024, 4:01 PMReuben (Matatika)
02/14/2024, 5:18 PMJobDivaPaginator
... You could override prepare_request
temporarily to log it out:
def prepare_request(self, context, next_page_token):
request = super().prepare_request(context, next_page_token)
self.logger.info("Prepared request URL: %s", request.url)
return request
Ian OLeary
02/14/2024, 5:50 PM?fromDate=02%2F01%2F2022&toDate=02%2F15%2F2022
This is how the parameters are appended to the urlReuben (Matatika)
02/14/2024, 5:55 PMquery
- not params
.Ian OLeary
02/14/2024, 6:00 PMIan OLeary
02/14/2024, 6:00 PMIan OLeary
02/14/2024, 6:01 PMIan OLeary
02/14/2024, 6:01 PMReuben (Matatika)
02/14/2024, 6:06 PMReuben (Matatika)
02/14/2024, 6:07 PMIan OLeary
02/14/2024, 6:34 PMIan OLeary
02/14/2024, 6:34 PMIan OLeary
02/14/2024, 6:34 PMIan OLeary
02/14/2024, 6:36 PMReuben (Matatika)
02/14/2024, 6:42 PMmeltano state list
meltano state get <state id>
Ian OLeary
02/14/2024, 7:08 PMIan OLeary
02/14/2024, 7:09 PM{
"singer_state": {
"bookmarks": {
"NewUpdatedCandidateRecordsStream": {
"replication_key": "DATEUPDATED",
"replication_key_value": "2024-02-14T13:23:43"
}
}
}
}
Ian OLeary
02/14/2024, 7:09 PMReuben (Matatika)
02/14/2024, 7:20 PMbro is carrying my career
😂😂😂
Reuben (Matatika)
02/14/2024, 7:21 PMIan OLeary
02/14/2024, 7:21 PMReuben (Matatika)
02/14/2024, 9:36 PM