Hi everyone, I'm a career switcher without an IT ...
# troubleshooting
s
Hi everyone, I'm a career switcher without an IT background, so bear with me on this one. 😉 I ran into a problem while working with a stream that uses offset pagination. I set it up by modifying the
get_new_paginator
method in
client.py
to return a
BaseOffsetPaginator
instance with the appropriate
page_size
according to the API docs. It worked fine — I was able to fetch all the data from the source. However, when I defined a child stream using this stream as a parent, it ended up in an endless loop, repeatedly fetching the same child records. It seemed like the parent key wasn’t updating correctly, causing the same data to be fetched over and over. I couldn’t figure out exactly why this was happening, but I ended up creating my own
Paginator
class by subclassing
BaseOffsetPaginator
. I modified the
has_more
method like this:
Copy code
python


def has_more(self, response: requests.Response) -> bool:
    data = response.json()
    has_more = data.get("has_more", False)
I can't recall if I got this from the docs or an AI suggestion, but it worked — up to a point. Once the parent stream reached its
page_size
, the
data
was no longer a
dict
but a
list
, which raised an
AttributeError
on
data.get()
. I fixed it by wrapping it in a try-except block:
Copy code
python


def has_more(self, response: requests.Response) -> bool:
    data = response.json()
    try:
        has_more = data.get("has_more", False)
    except AttributeError:
        has_more = super().has_more(response)
    return has_more
This works, but I still don’t fully understand what’s happening under the hood. Can anyone help explain this behavior? Also, what’s the best practice for handling offset pagination in parent-child stream setups? Thanks in advance!
e
Hi @Sac! There's no special set up required to handle any type of pagination for child streams. If the parent stream is generating duplicate contexts, that might explain why your seeing duplicate child syncs. What do your stream definitions look like?
s
Hi Edgar, thanks for your reply. That is my parent stream:
Copy code
class InvoicesStream(MyBaseStream):
    """Invoices stream (Parent Stream)."""
    name = "invoices"
    path = "/2.0/kb_invoice"
    primary_keys = ["id"]
    replication_key = None
    schema_filepath = SCHEMAS_DIR / "invoices.json"  

    def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
        """Return a context dictionary for child streams."""
        new_context = {"invoice_id": record["id"]}
        if context:
            return {**context, **new_context}
        return new_context
and that is the corresponding child stream:
Copy code
class InvoicePositionsStream(MyBaseStream):
    """Invoice Positions stream."""
    name = "invoice_positions"
    parent_stream_type = InvoicesStream
    path = "/2.0/kb_invoice/{invoice_id}"
    primary_keys = ["invoice_id", "internal_pos"]
    replication_key = None
    records_jsonpath = "$.positions[*]"  
    schema_filepath = SCHEMAS_DIR / "invoice_positions.json"
And these are the paginator and the base class (removed the docstrings and comments to make it more compact):
Copy code
class MyPaginator(BaseOffsetPaginator):

    def has_more(self, response: requests.Response) -> bool:
        data = response.json()
        try:
            has_more = data.get("has_more", False)
        except AttributeError:
            has_more = super().has_more(response)
        return has_more


class MyBaseStream(RESTStream):
    records_jsonpath = "$[*]"
    next_page_token_jsonpath = "$.next_page"  # noqa: S105

    @property
    def url_base(self) -> str:
        return "<https://api.bexio.com>"

    @property
    def authenticator(self) -> BearerTokenAuthenticator:
        return BearerTokenAuthenticator.create_for_stream(
            self,
            token=self.config.get("auth_token", ""),
        )

    @property
    def http_headers(self) -> dict:
        return {"Accept": "application/json"}

    def get_new_paginator(self) -> MyPaginator:
        return MyPaginator(start_value=0, page_size=500)

    def get_url_params(
        self,
        context: Context | None,  # noqa: ARG002
        next_page_token: t.Any | None,  # noqa: ANN401
    ) -> dict[str, t.Any]:

        params: dict = super().get_url_params(context, next_page_token)
        if next_page_token:
            params["offset"] = next_page_token
        if self.replication_key:
            params["sort"] = "asc"
            params["order_by"] = self.replication_key
        return params

    def prepare_request_payload(
        self,
        context: Context | None,  # noqa: ARG002
        next_page_token: t.Any | None,  # noqa: ARG002, ANN401
    ) -> dict | None:
        return None

    def parse_response(self, response: requests.Response) -> t.Iterable[dict]:
        yield from extract_jsonpath(
            self.records_jsonpath,
            input=response.json(parse_float=decimal.Decimal),
        )

    def post_process(
        self,
        row: dict,
        context: Context | None = None,  # noqa: ARG002
    ) -> dict | None:
        return row
So this is the version, which is working now, however I don't understand why I need the updated has_more method this way. Because without the added
Copy code
has_more = data.get("has_more", False)
The child stream takes the same invoice_id over and over again in an endless loop, and without the try-except block it gives the said AttributeError. Thank you for looking into it, maybe you can give me a clue what am I missing. Cheers!
r
You have to override
has_more
because the default implementation always returns
True
, and the
next_page_token
is always incrementing due to this logic so you will never see the "Loop detected in pagination" error.
i.e. 1.
<https://api.bexio.com/2.0/kb_invoice/><invoice_id>?offset=0
2.
<https://api.bexio.com/2.0/kb_invoice/><invoice_id>?offset=500
3.
<https://api.bexio.com/2.0/kb_invoice/><invoice_id>?offset=1000
4.
<https://api.bexio.com/2.0/kb_invoice/><invoice_id>?offset=1500
5.
<https://api.bexio.com/2.0/kb_invoice/><invoice_id>?offset=2000
and so on. Maybe you want to disable pagination for
InvoicePositionsStream
if the endpoint doesn't support it? That certainly appears to be the case looking at https://docs.bexio.com/#tag/Invoices/operation/v2ShowInvoice (docs for your child stream request).
> the
data
was no longer a
dict
but a
list
, which raised an
AttributeError
on
data.get()
Again, this is because you are using the same paginator for the parent and child stream -
data
is an array for the parent and an object for the child. Disabling pagination for the child would fix this, and you would only have to handle
data
as an array for the parent. Having said that, I don't see anything in the docs about a
has_more
property in either response - I would imagine instead you would be able to determine this by comparing the length of
data
vs the configured page size, i.e.
Copy code
def has_more(self, response: requests.Response) -> bool:
        data = response.json()
        return len(data) == self._page_size
Unfortunately (as far as I can see), disabling pagination for a stream isn't as simple as overriding
get_new_paginator
again to return
None
. Your best bet is probably to defer to
SimpleHeaderPaginator("X-Next-Page")
as the SDK does by default. EDIT: probably more optimal to defer to SinglePagePaginator.
or you just put
get_new_paginator
on
InvoicesStream
only, rather than on
MyBaseStream
.
s
Hi @Reuben (Matatika), Thank you so much—I really appreciate your replies. I feel like I’m getting closer to understanding. I’ll look into it in more detail later and try your suggestions.
👍 1
e
(thanks for the PR @Reuben (Matatika)!!)
🫡 1