gemma_down
03/21/2023, 2:25 PM# New implementation
from singer_sdk.pagination import BaseHATEOASPaginator
class MyPaginator(BaseHATEOASPaginator):
def get_next_url(self, response, previous_token):
data = response.json()
return data.get("next")
class MyStream(RESTStream):
def get_new_paginator(self) -> RESTPaginator:
return BaseHATEOASPaginator()
def get_url_params(self, context, next_page_token):
params = {}
# Next page token is a URL, so we can to parse it to extract the query string
if next_page_token:
params.update(parse_qsl(next_page_token.query))
return params
• MyPaginator doesn’t appear to be used after it is defined - should it be used in the get_new_paginator method?
• RESTPaginator isn’t imported - I’m assuming this is from singer_sdk.pagination too?
• parse_qsl isn’t imported in this example - including for completeness but this is an easy fix.
Thanks in advance 😄omar_abed
03/21/2023, 2:39 PMget_new_paginator method should return a MyPaginator object, not the BaseHATEOASPaginator
• I'm not sure about the RESTPaginator. I was able to return a BaseHATEOASPaginator object from my get_new_paginator method, because that's what the MyPaginator class is derived from.gemma_down
03/21/2023, 2:57 PMedgar_ramirez_mondragon
03/21/2023, 3:01 PM