Nir Diwakar (Nir)
12/19/2024, 8:18 AMclass EventsStream(EgnyteStream):
"""Define custom stream."""
name = "events"
path = "/pubapi/v2/audit/stream"
replication_key = "eventDate"
records_jsonpath = "$.events[*]"
is_sorted = True
page = 0
def get_url_params(
self,
context: Optional[dict],
next_page_token: Optional[Any],
) -> dict[str, Any]:
params: dict = {}
if next_page_token:
params["nextCursor"] = next_page_token
else:
params["startDate"] = self.start_time
return params
def parse_response(self, response: requests.Response | None):
if not response:
raise RetriableAPIError("No response from API")
try:
data = response.json()
except AttributeError as e:
<http://logging.info|logging.info>(f"{e} with response {response} and {response.text}")
return
events = data.get("events", [])
for event in events:
if not event:
continue
for key in ["loginDate", "logoutDate"]:
if event.get(key) == 0:
event.pop(key, None)
for key in ["loginDate", "logoutDate", "date"]:
if key in event:
dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + \
timedelta(milliseconds=event[key])
event["eventDate"] = dt.isoformat().replace("+00:00", "Z")
del event[key]
break
if events:
events.sort(key=lambda x: x.get("eventDate", ""))
self.page = self.page + 1
<http://logging.info|logging.info>(f'Page: {self.page} {events}')
yield from events or ([{}] if data.get("moreEvents") else [])
def post_process(
self,
row: dict,
context: Context | None = None,
) -> dict | None:
if not row:
return None
if 'id' not in row or row['id'] is None:
row['id'] = str(uuid.uuid4())
return super().post_process(row, context)
def get_new_paginator(self) -> EgnyteEventsPaginator:
return EgnyteEventsPaginator()
Error occurs here:
'e5deb04e-75ba-4894-8709-001c5f4295d5', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:14:18.591000Z'}]
2024-12-19 07:44:24,675 | INFO | target-elasticsearch.events | Starting batch {'batch_id': '720b0104-adee-4855-b3dc-420fe0033a2a', 'batch_start_time': datetime.datetime(2024, 12, 19, 7, 44, 24, 675328, tzinfo=datetime.timezone.utc)} for dev_events_raw_egnyte_000
2024-12-19 07:44:27,219 | INFO | root | Page: 2 [{'sourcePath': '/Shared/Projects/2022/22-0055 Vicksburg National Cemetery Burial Excavation and Stablization/Lab/FORDISC Results/Burial 19_Fordisc Results_updated.docx', 'targetPath': 'N/A', 'user': 'Brittany McClain ( <mailto:bmcclain@owest.com|bmcclain@owest.com> )', 'userId': '1121', 'action': 'Read', 'access': 'Desktop App', 'ipAddress': '174.26.41.229', 'actionInfo': '', 'auditSource': 'FILE_AUDIT', 'eventDate': '2024-12-19T07:09:36Z'},
Edgar Ramírez (Arch.dev)
12/19/2024, 5:40 PMget_records
to collect all records and sort them before they are further processed