Hello, I have been struggling with a custom tap fo...
# getting-started
m
Hello, I have been struggling with a custom tap for some time now. I want to pull archived energy data from the caiso website. The API url contains a string of the date you are trying to access. For example, yesterday's data would be located at "https://www.caiso.com/outlook/SP/History/20230815/demand.csv". I have been looking through tutorials and the docs for a while trying different methods unsuccessfully. Would this be something accomplished with overloading some functions in a new caisoPaginator class or perhaps returning a dynamic url base. Any insight or direction or specific methods to look into would be greatly appreciated. I have attached the postman preview.
r
Is the objective to pull data from a specific date up to (and including) the current date? In my mind, you want that value (e.g.
20230815
) stored in state, so you know when to continue from on subsequent syncs. Unless you are getting some token from the response that you can use to construct the next URL, you will need to parse and increment the date value used in the previous request manually in a custom
Paginator
class (like you said) - that logic should probably sit in the
get_next
implementation. I think the
has_more
implementation can simply check the date from the previous request against the current date (e.g.
20230816
), and just assume there are no more records to process if they match. Bear in mind that you probably want a sensible default date value to sync from if no other is provided, either from state or config. This could be dynamic (e.g. last two weeks).
m
Yes, eventually I would like to be able to store most recent date pulled in state. For now though I just want to get something working exactly how you understood it. Thank you so much for your response it was extremely helpful. It has been difficult trying to understand how the tap, streams, client files are actually interacting and where the functions are being called to actually make the requests. Hopefully I can construct some coherent functions to accomplish my goals.
r
No problem - glad I could help! 🙂 IMO, the SDK is pretty intuitive once you get into it.
m
I am working on it now and have a couple of questions you might be able to help with. The first, should the return value of the get_next function remain as TPageToken or switch to string? The other is, where is this get_next() being called and how can I make sure it's being put in the url before the request is made. I have the "/demand.csv" in the streams.py I'm assuming that will go at the very end like it should, will the date being returned automatically be placed at the end of what is being returned by the url_base()
message has been deleted
more info: this is my url_base() I start with yesterdays date already in there
r
If you are using Python with VSCode, you don't have to declare parameter or return types for a method you are overriding - it will infer these automatically with IntelliSense.
get_next
will be called by the default implementation of
RESTStream
, given that you are registering your
Paginator
in your stream class inheriting from
RESTStream
. To make sure it is set for the URL, you will probably have to override
prepare_request
like so:
Copy code
def prepare_request(self, context, next_page_token):
        prepared_request = super().prepare_request(context, next_page_token)

        # `next_page_token`: from paginator
        # `self.get_starting_timestamp(context)`: from state (rest of state logic need implementing)
        # `self.config["start_date"]`: from config
        # `self.default_start_date`: from default (needs implementing)
        date = next_page_token or self.get_starting_timestamp(context) or self.config["start_date"] or self.default_start_date

        prepared_request.prepare_url(
            self.url_base.format(date=date),  # where `url_base` is something like `<https://www.caiso.com/outlook/SP/History/{date}>`
            self.get_url_params(context, next_page_token),
        )

        return prepared_request
Also, you don't call
url_base
- it is a property (
@property
) so just access it like
Copy code
url = self.url_base
m
Slightly modified implementation just because I didn't really understand where the .format was coming from and what would happen with the original url_base. Been trying to poetry run pytest periodically and keep getting this jsonDecodeError. I tried messing with the stream property types and still showing up. Attaching my prepare_request along with the pytest summary. Again, thank you so much for taking the time to help me if you're busy or if I'm taking too much of your time please don't hesitate to let me know. It has just been extremely helpful to talk to someone who knows what they're doing.
message has been deleted
r
I would test with
meltano invoke tap-caiso
from the root of your project personally. That will probably give you a stack trace to follow.
format
is a Python string method that replaces keys enclosed in curly braces with kwarg values:
Copy code
>>> test = "hello world: {my_key}"
>>> test
'hello world: {my_key}'
>>> test.format(my_key="test")
'hello world: test'
m
Bottom of stack trace from meltano invoke tap-caiso, maybe delete the parse_response method I'm not too sure?
r
Isn't this because the response body isn't JSON data? (from your initial screenshot)
m
Hm, actually you're definitely onto something there
Its coming back as csv how would I specify that
r
Does it come back as JSON if you set the
Accept
header in the request to
application/json
? Otherwise you can probably override
parse_response
to deserialise the CSV.
m
So change the http_headers function to set that?
r
Try it in Postman first? Otherwise yes.
m
I think it didn't work unless I did it wrong
r
Yeah doesn't look like it. Maybe check the API docs to see if it supports JSON in some other way? Otherwise you will probably have to parse the CSV:
Copy code
import csv
from io import StringIO

from singer_sdk.helpers.jsonpath import extract_jsonpath

# ...

    def parse_response(self, response):
        data = list(csv.DictReader(StringIO(response.text)))
        yield from extract_jsonpath(self.records_jsonpath, input=data)
Didn't notice the URL was targeting a CSV file - makes more sense now! 😅
m
I think that worked since I'm getting a more local error now. Has an issue with how I'm trying to access the url in the paginator class methods (I tried both with and without the parentheses). Is there a function to return from the tap class?
r
If you define a custom
__init__
for
casioPaginator
, you can instantiate it passing in
self.url_base
from
get_new_paginator
in your stream class. See this too.
m
okay I copied over the init from BaseAPIPaginator and just added a url parameter and one line saying self.url_base = url and then in the get_new_paginator return caisoPaginator(0, self.url_base)
r
Sure - you are extending from
BaseAPIPaginator
though, so you could just call
super().__init__
to achieve the same thing.
m
Okay now this one is just stupid, it says integer is required so I type casted, gave the same error so I .astype(int) all of them in the actual function call and it says int has no method .astype!
Not familiar with the super function either that one also seems useful
r
Yeah, a lot of using the SDK is about knowing some stuff about Python inheritance. It would be good to have a little primer on those concepts in the docs...
You probably want to instantiate
caisoPaginator
with an initial start date instead of the URL, then use
self.current_value
to extract
year
,
month
and
day
from in
has_more
and
get_next
.
Scratch that. Get rid of the
__init__
and just parse out the date from
response.request.url
in
has_more
and
get_next
.
m
That works much better, however my prepare request seems to be messing with the date. I int casted the entire url so it shows up in the stack trace and it looks like this
Commented out everything but next_page_token and now its just a zero. I suspect from my instantiating the caisoPaginator(0) but it was asking for a start value I guess I should put the yesterdays date part in the get_new_paginator and pass that as the start value?
r
Nah, just instantiate the paginator in
get_new_paginator
without any arguments like before. You need to figure out how you can get your date back from the paginator
get_next
in the format
YYYYMMDD
expected for the URL. Also, this is where your logic for reforming the URL is not correct, since you are missing the
/demand.csv
path segment. You can also call
<http://self.logger.info|self.logger.info>
from the stream class methods to print out runtime information - even if it's just for debugging during development. Here, I'd recommend you log out
next_page_token
,
self.url_base
and
url
in
prepare_request
, just so we can see how they are changing together.
That
format
stuff I shared ealier is probably relevant here too now.
m
Okay, I had the last /demand.csv peice in the streams.py in my demandStream class should I remove it from there?
Also, the get_next should already be returning a correctly formatted string, if I understand datetime's strftime("%Y%m%d") correctly it should be returning that format as a string.
r
If
demandStream
is extending
caisoStream
, then you can override the URL from there:
client.py
Copy code
class caisoStream(RESTStream):

    url_base = "<https://www.caiso.com/outlook/SP/History/{date}>"
streams.py
Copy code
class demandStream(caisoStream):

    @property
    def url_base(self):
        return super().url_base + "/demand.csv"
m
Okay, it still wont let me create instance of caisoPaginator without a start_value
r
Ok, so your date logic looks good
Copy code
>>> datetime.now()
datetime.datetime(2023, 8, 17, 20, 41, 31, 978053)
>>> datetime.now().strftime("%Y%m%d")
'20230817'
so you probably need to do the same
strftime
call on
date
in
parse_respone
(it's still a
datetime
object) - most likely it's making the first request with the default
datetime
string representation:
Copy code
>>> str(datetime.now())
'2023-08-17 20:43:34.482836'
Okay, it still wont let me create instance of caisoPaginator without a start_value
Pass it
None
when instantiating the paginator
Copy code
def get_new_paginator(self):
        return caisoPaginator(None)
or declare
Copy code
def __init__(self, *args, **kwargs):
        super().__init__(None, *args, **kwargs)
so you don't have to pass
None
explicitly on instantiation (a lot of the inbuilt paginator classes do this):
Copy code
def get_new_paginator(self):
        return caisoPaginator()
m
Added a type check and because if it uses next_page_token it comes back as a str from get_next. Still coming out as the full format
r
Yeah, that makes sense, although for type checking in Python you generally want to use `isinstance`:
Copy code
>>> date = datetime.now()
>>> isinstance(date, datetime)
True
>>> isinstance(date, str)
False
It's probably better to do this though:
Copy code
start_date = self.get_starting_timestamp(context) or self.config["start_date"] or self.default_start_date
    date = next_page_token or start_date.strftime("%Y%m%d")
Wait, isn't this problem coming from your
url_base
implementation?
Nevermind, unless you changed it from before.
m
It very well may be, what makes you say that?
message has been deleted
r
Yeah, looks fine. (you need to call
<http://self.logger.info|self.logger.info>(url)
to print the URL out)
m
I was wondering why it wasn't doing anything
r
Do you have this on GitHub yet?
m
No not yet
r
Was just thinking it would probably be faster getting it working if I could look through the code. Happy to go on a call if you want, or what we've been doing is fine too. 🙂
m
Currently trying to figure out how to push it to github but I'm in office right now so messaging is preferable
r
Fair enough, I'm happy to keep on with the back and forth here if that's better for you. Nothing happening here this evening, so fire away! 😅
m
I'm using a remote repo from an azureml compute instance and I think github has a problem with that.
going to download locally and attach in here that the only way I can think of
client.py,streams.py,tap.py
r
Can you just zip the whole directory? I will need most of the repo files. Excluding sensitive credentials obviously.
m
This is the whole meltano project currently I don't think it has anything sensitive. And I just have to say, Reuben, never in my life have I been treated with such kindness and patience by a complete stranger like I have here today. I simply cannot overstate how grateful I am.
r
No problem at all, you're very welcome! I find this kind of stuff really interesting, so there is definitely a selfish element! 😂 I'll have a look through now and let you know what I find here.
Do you have the
poetry.lock
and
pyproject.toml
files? They're weren't included in the zip.
m
They're in tap-caiso unless there are different ones for the project as a whole
here are the ones I'm seeing
r
Oh right, I see!
My bad!
m
No problem, but unfortunately, I have to head home in about 15-30 minutes. However, I will be back early tomorrow.
r
Alright, check here tomorrow - I should have made some progress by then. I'll try to explain what I did as well.
m
Alright will do, also if you have venmo I would love to buy you a beer or something.
r
https://github.com/ReubenFrankel/tap-caiso The majority of changes are in client.py - I left a bunch of comments that (hopefully) explain the important parts. FYI, if you run without any explicit
start_date
config now, the tap will get the last 4 weeks of data as a fallback.
Copy code
meltano invoke tap-caiso
I've only got the request and pagination logic working here intentionally, since I didn't want to step on your toes in learning the SDK and how it works with the Singer spec. There's definitely still some things to do, like: • Define your config • Define your stream schemas • Last processed date state implementation (i.e. start from where you left off on the next sync) • More streams!
m
Thank you so much Reuben, this has been unbelievably helpful. Hope you got some enjoyment out of figuring this out. I was going on a week with nothing to show for my efforts.
r
Haha, I did! If you want, I can transfer repo ownership to you - otherwise I'll remove it at some point whenever you don't need it anymore.
m
That's alright I think I have l what I need and will merge with my own after defining some more streams and try to figure out how to get the last date record from state.