Are there any example of REST taps that stream res...
# singer-taps
r
Are there any example of REST taps that stream responses? I'm trying to retry the request on an exception that occurs as a result of the following
iter_lines
call, but not having much success...
Copy code
@override
    def _request(self, prepared_request, context):
        response = self.requests_session.send(
            prepared_request,
            stream=True,  # streaming request
            timeout=self.timeout,
            allow_redirects=self.allow_redirects,
        )
        self._write_request_duration_log(
            endpoint=self.path,
            response=response,
            context=context,
            extra_tags={"url": prepared_request.path_url}
            if self._LOG_REQUEST_METRIC_URLS
            else None,
        )
        self.validate_response(response)

        return response

    @override
    def parse_response(self, response):
        with response:  # ensure connection is eventually released
            yield from (
                json.loads(line, parse_float=decimal.Decimal)
                for line in response.iter_lines()
            )
Namely •
urllib3.exceptions.ProtocolError: Response ended prematurely
• (during handling of above)
requests.exceptions.ChunkedEncodingError: Response ended prematurely
and •
http.client.IncompleteRead: IncompleteRead(506 bytes read, 6 more expected)
• (directly caused by above)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(506 bytes read, 6 more expected)', IncompleteRead(506 bytes read, 6 more expected))
• (during handling of above)
requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(506 bytes read, 6 more expected)', IncompleteRead(506 bytes read, 6 more expected))
Also, I can't replication these exceptions locally. They only seem to occur in the cloud runtime, which makes me think this could be a separate issue. Either way, would be nice to have some resilience to stuff like this.
So I can pretty easily suppress the error with
Copy code
@override
    def parse_response(self, response):
        with response:  # ensure connection is eventually released
            try:
                for line in response.iter_lines():
                    yield json.loads(line, parse_float=decimal.Decimal)
            except requests.exceptions.ChunkedEncodingError as e:
                self.logger.warning("Invalid chunk received, skipping", exc_info=e)
but I'd really like to retry the request starting from the last encountered replication key value - which to me sounds like a back-off/retry implementation for
request_records
. I did try to implement this but wasn't able to get it working, maybe because of https://github.com/litl/backoff/issues/171.
👀 1
Turns out the API really doesn't like the tap sitting idle with an open connection while records are processed by the target. Ended up just streaming the response to a temporary file first and then continuing with the sync (halfway to a
BATCH
implementation but still compatible with `stream_maps`/mapper plugins). Not sure there is much that can be done about the underlying issue...