Reuben (Matatika)
02/14/2025, 2:39 PMiter_lines
call, but not having much success...
@override
def _request(self, prepared_request, context):
response = self.requests_session.send(
prepared_request,
stream=True, # streaming request
timeout=self.timeout,
allow_redirects=self.allow_redirects,
)
self._write_request_duration_log(
endpoint=self.path,
response=response,
context=context,
extra_tags={"url": prepared_request.path_url}
if self._LOG_REQUEST_METRIC_URLS
else None,
)
self.validate_response(response)
return response
@override
def parse_response(self, response):
with response: # ensure connection is eventually released
yield from (
json.loads(line, parse_float=decimal.Decimal)
for line in response.iter_lines()
)
Namely
• urllib3.exceptions.ProtocolError: Response ended prematurely
• (during handling of above) requests.exceptions.ChunkedEncodingError: Response ended prematurely
and
• http.client.IncompleteRead: IncompleteRead(506 bytes read, 6 more expected)
• (directly caused by above) urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(506 bytes read, 6 more expected)', IncompleteRead(506 bytes read, 6 more expected))
• (during handling of above) requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(506 bytes read, 6 more expected)', IncompleteRead(506 bytes read, 6 more expected))
Reuben (Matatika)
02/14/2025, 3:24 PMReuben (Matatika)
02/14/2025, 4:27 PM@override
def parse_response(self, response):
with response: # ensure connection is eventually released
try:
for line in response.iter_lines():
yield json.loads(line, parse_float=decimal.Decimal)
except requests.exceptions.ChunkedEncodingError as e:
self.logger.warning("Invalid chunk received, skipping", exc_info=e)
but I'd really like to retry the request starting from the last encountered replication key value - which to me sounds like a back-off/retry implementation for request_records
. I did try to implement this but wasn't able to get it working, maybe because of https://github.com/litl/backoff/issues/171.Reuben (Matatika)
02/20/2025, 11:42 AMBATCH
implementation but still compatible with `stream_maps`/mapper plugins). Not sure there is much that can be done about the underlying issue...