Hi All, I am trying to work on a tap so that it pa...
# singer-tap-development
t
Hi All, I am trying to work on a tap so that it parses the data from the initial API call to extract the data set and then extracts the next page in the api so that it can make subsequent requests. Below is how the API is structured- I was able to extract the path of the next url using the response get function. I placed that in the get_next_page_token function , below(second screenshot) shows how i structured the get_url_params and get_next_page_token.. However, when i run the tap, it is throwing below errors (third screenshot) after the first iteration(parsing the first response and loading it to target). I would really appreciate if there is a sample code i can replicate as my rest api is just returning the next url and i just need to take that next url and execute subsequent requests.. I am struggling to get such a simple functionality working . Any help would be immensely appreciated.
r
In
get_url_params
, you are setting the
page
param to
next_page_token
which is a full URL (e.g.
<https://a.klaviyo.com/api/lists/?page%5Bcursor%5D=bmV4dDo6U2M3CHZR>
). I would guess that the tap is trying to make a subsequent request with a query param like
page=https%3A%2F%<http://2Fa.klaviyo.com|2Fa.klaviyo.com>%2Fapi%2Flists%2F%3Fpage%255Bcursor%255D%3DbmV4dDo6U2M3CHZR
- probably not what you intended. You need to parse the query params from
next_page_token
- here is how we do this with `tap-spotify`. In your case, this should look like:
Copy code
from urllib.parse import parse_qsl, urlsplit

# ...

    def get_url_params(
        self, context: Optional[dict], next_page_token: Optional[Any]
    ) -> Dict[str, Any]:
        "Return a dictionary of values to be used in URL parameterization."""
        params: dict = {}
        if next_page_token:
            params.update(dict(parse_qsl(urlsplit(next_page_token).query)))
        if self.replication_key:
            params["sort"] = "asc"
            params["order_by"] = self.replication_key
        self.logger.debug(params)
        return params
d
I worked on this for the exact same API a while ago (production ready Soon™️ 😄) and solved pagination with the paginator class (available in singer-sdk v0.10.0 and up) like so: paginator.py
Copy code
class KlaviyoPaginator(JSONPathPaginator):

    def get_next(self, response: Response) -> str | None:
        next_page_url = next(extract_jsonpath(self._jsonpath, response.json()))

        if next_page_url is None:
            return None

        parameters = urlparse(next_page_url).query
        return parse_qs(parameters)["page[cursor]"][0]
client.py
Copy code
class KlaviyoStream(RESTStream):
    
    url_base = "<https://a.klaviyo.com/api>"

    records_jsonpath: str = "$.data[*]"
    next_page_token_jsonpath: str = "$.links.next"

    def get_new_paginator(self) -> BaseAPIPaginator:
        return KlaviyoPaginator(self.next_page_token_jsonpath)
Maybe this is helpful for you!
t
@dylan I tried to replicate it exactly by creating the appropriate classes as you mentioned. However, I am still receiving the same error. I am attaching the code if you could please see where i am stepping on the issue. I tried to replicate what @Reuben (Matatika) guided, but seems like it went to the second api call, but was crashing after that. Anyway, here is the client.py , stream.py and paginator.py. I am very new to Meltano and Python as you can see, but just trying to get it working. On a side note: Any chance @dylan you intend to consider publishing the klaviyo tap you are developing to meltano community? the current taps in git are no longer using the latest klaviyo api , so definitely would help anyone who could guide our community.
Paginator.py
Copy code
from __future__ import annotations
from urllib.parse import urlparse,parse_qs
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator,JSONPathPaginator

class KlaviyoPaginator(JSONPathPaginator):

    def get_next(self, response: Response) -> str | None:
        next_page_url = next(extract_jsonpath(self._jsonpath, response.json()))

        if next_page_url is None:
            return None

        parameters = urlparse(next_page_url).query
        return parse_qs(parameters)["page[cursor]"][0]
client.py ```"""REST client handling, including klaviyo_custom_dev_v2Stream base class.""" from future import annotations import requests from pathlib import Path from urllib.parse import urlparse,parse_qsl,urlsplit from typing import Any, Dict, Optional, Union, List, Iterable from memoization import cached from singer_sdk.helpers.jsonpath import extract_jsonpath from singer_sdk.streams import RESTStream from singer_sdk.authenticators import APIKeyAuthenticator from singer_sdk.pagination import BaseAPIPaginator,JSONPathPaginator from tap_klaviyo_custom_dev_v2.paginator import KlaviyoPaginator SCHEMAS_DIR = Path(file).parent / Path("./schemas") class klaviyo_custom_dev_v2Stream(RESTStream): """klaviyo_custom_dev_v2 stream class.""" # TODO: Set the API's base URL here: url_base = "https://a.klaviyo.com" # OR use a dynamic url_base: # @property # def url_base(self) -> str: # """Return the API URL root, configurable via tap settings.""" # return self.config["api_url"] records_jsonpath = "$.data[*]" # Or override
parse_response
. next_page_token_jsonpath = "$.links.next" # Or override
get_next_page_token
. def get_new_paginator(self) -> BaseAPIPaginator: return KlaviyoPaginator(self.next_page_token_jsonpath) @property def authenticator(self) -> APIKeyAuthenticator: """Return a new authenticator object.""" return APIKeyAuthenticator.create_for_stream( self, key="Authorization", value=self.config.get("auth_token"), location="header" ) @property def http_headers(self) -> dict: """Return the http headers needed.""" headers = {} if "user_agent" in self.config: headers["User-Agent"] = self.config.get("user_agent") headers["revision"] = "2022-10-17" headers["User-Agent"] = "application / json" return headers def get_url_params( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params: dict = {} if next_page_token: params["page"] = next_page_token if self.replication_key: params["sort"] = "asc" params["order_by"] = self.replication_key return params def prepare_request_payload( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Optional[dict]: """Prepare the data payload for the REST API request.
r
I think you are unintentionally combining the suggested fixes here - either you implement a
Paginator
and do not set
path
in
get_url_params
(or set it correctly), or you fix your existing
get_url_params
logic for setting
path
. In your
client.py
, in
klaviyo_custom_dev_v2Stream.get_url_params
you still have:
Copy code
if next_page_token:
                params["page"] = next_page_token
You should remove this if you are using
get_new_paginator
- I assume it's just overwriting the
path
query param that your
Paginator
will set. If you aren't using
get_new_paginator
, then you can just parse the query params from the next page URL manually:
Copy code
if next_page_token:
                params.update(dict(parse_qsl(urlsplit(next_page_token).query)))
t
This is amazing, Reuben!! Thank you so much for helping me understand the concept. I finally got it working. Pls let me know when you are in LA. Would love to buy you a beer or coffee 🙂 ! This has been such a struggle for me. But am excited that i finally got it working.
r
No problem at all, glad you got it working! I'm in the UK so not planning to be in LA anytime soon, but if I find myself there I'll let you know! 😅 Once you get the hang of the basics, the SDK is really great to work with! Hopefully things are a bit clearer for you now.
d
On a side note: Any chance Dylan you intend to consider publishing the klaviyo tap you are developing to meltano community? the current taps in git are no longer using the latest klaviyo api , so definitely would help anyone who could guide our community.
Yeah I actually published it in its current state here the other day. Most of the streams are not even tested, so it's not production ready just yet like I mentioned. I'll let you know when I get around to it (but you might have your own already working flawlessly at that point 😄)
t
Thats great, really appreciate you sharing it. Have you figured out how to get the list of customers who are in a list or a segment? This helps in campaign performance measurement. I wrote the tap to get list of profiles- but it doesnt give the list ids within profiles without performing a separate API call.
p
@trinath @dylan this is exciting! I created https://github.com/meltano/hub/issues/1003 to track getting one of your SDK based variants up on the hub as the default. Please let us know when its ready for use! Have you considered working together on the development of this tap? It sounds like you're both building the same features so it might be nice to team up on for development/maintenance. Also if youre interested and if its helpful we can put the repo in MeltanoLabs so you can share dev/maintenance vs having it in one of your personal or organization's github namespace. Let me know and I'm happy to help
t
Absolutely, Pat. I am glad to contribute. But, i might need some help on things such as figuring out incremental loads and what testing would need to be done to be considered production/shareable in github. In addition, @dylan version uses singer coding format ( which has schemas separately defined) whereas i am using the format on how it was shared in Meltano SDK documentation(where schemas are defined directly in streams- unsure which one is preferred). Happy to merge if we know which is more widely adopted/maintainable. I am still working on other apis within Klaviyo- should be done in couple more days, once i complete the dev- i can share it in MeltanoLabs
p
@trinath awesome! The community is here to help so if you have specific questions feel free to ask or share your development repo and ask others to review and provide feedback. As far as how schemas are defined, both methods are valid and are really just preference. Putting them in a schemas directory just give you the option to remove them from your python module if you have a bunch and it starts to get cluttered, both evaluate to the same result.