I have a tap for Egnyte, Egnyte has a REST API whe...
# singer-tap-development
n
I have a tap for Egnyte, Egnyte has a REST API where it sends response in multiple pages, the pages are batched in time slices. Unfortunately, it seems there might be some error in their paging logic. What's happening is, the last record of 1 page is of greater value than 1st record of 2 page. I sort all records by time in parse_response. But due to records being not sorted inter page, I get singer_sdk.exceptions.InvalidStreamSortException Is it possible to retrieve data from all pages and then perform a sort by date? This is the code I am using:
Copy code
class EventsStream(EgnyteStream):
    """Define custom stream."""

    name = "events"
    path = "/pubapi/v2/audit/stream"
    replication_key = "eventDate"
    records_jsonpath = "$.events[*]"
    is_sorted = True
    page = 0

    def get_url_params(
        self,
        context: Optional[dict],
        next_page_token: Optional[Any],
    ) -> dict[str, Any]:
        params: dict = {}
        if next_page_token:
            params["nextCursor"] = next_page_token
        else:
            params["startDate"] = self.start_time
        return params
    
    def parse_response(self, response: requests.Response | None):
        if not response:
            raise RetriableAPIError("No response from API")

        try:
            data = response.json()
        except AttributeError as e:
            <http://logging.info|logging.info>(f"{e} with response {response} and {response.text}")
            return

        events = data.get("events", [])

        for event in events:
            if not event:
                continue

            for key in ["loginDate", "logoutDate"]:
                if event.get(key) == 0:
                    event.pop(key, None)

            for key in ["loginDate", "logoutDate", "date"]:
                if key in event:
                    dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + \
                        timedelta(milliseconds=event[key])
                    event["eventDate"] = dt.isoformat().replace("+00:00", "Z")
                    del event[key]
                    break

        if events:
            events.sort(key=lambda x: x.get("eventDate", ""))
        self.page = self.page + 1
        <http://logging.info|logging.info>(f'Page: {self.page} {events}')
        yield from events or ([{}] if data.get("moreEvents") else [])

    def post_process(
        self,
        row: dict,
        context: Context | None = None,
    ) -> dict | None:
        if not row:
            return None

        if 'id' not in row or row['id'] is None:
            row['id'] = str(uuid.uuid4())

        return super().post_process(row, context)

    def get_new_paginator(self) -> EgnyteEventsPaginator:
        return EgnyteEventsPaginator()
Error occurs here:
Copy code
'e5deb04e-75ba-4894-8709-001c5f4295d5', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:14:18.591000Z'}]
2024-12-19 07:44:24,675 | INFO     | target-elasticsearch.events | Starting batch {'batch_id': '720b0104-adee-4855-b3dc-420fe0033a2a', 'batch_start_time': datetime.datetime(2024, 12, 19, 7, 44, 24, 675328, tzinfo=datetime.timezone.utc)} for dev_events_raw_egnyte_000
2024-12-19 07:44:27,219 | INFO     | root                 | Page: 2 [{'sourcePath': '/Shared/Projects/2022/22-0055 Vicksburg National Cemetery Burial Excavation and Stablization/Lab/FORDISC Results/Burial 19_Fordisc Results_updated.docx', 'targetPath': 'N/A', 'user': 'Brittany McClain ( <mailto:bmcclain@owest.com|bmcclain@owest.com> )', 'userId': '1121', 'action': 'Read', 'access': 'Desktop App', 'ipAddress': '174.26.41.229', 'actionInfo': '', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:09:36Z'},
e
You might need to override
get_records
to collect all records and sort them before they are further processed
1