I have a tap with three child streams of a parent ...
# singer-tap-development
s
I have a tap with three child streams of a parent endpoint (I don't want to sync the parent). Let's call it a
list_table
endpoint with
table_detail
,
table_viewers
,
table_columns
child streams. What is the best way to handle this? The way I have it built right now is with a
ParentBaseStream
that I wanted to not sync (but which I wanted to define the pagination logic for), and then build child streams off of. It works for running the tap, but I can't figure out how to exclude it from being treated like a "normal" stream.
Current code is here: https://github.com/immuta/tap-immuta/blob/6275d15fa84bc27deff1493d93f6c58273a0f556/tap_immuta/streams.py#L29
ParentBaseStream <--subclass-- DataSourceBaseStream <--child_of-- DataSourceStream
, for example
a
Hi, @stephen_bailey. In theory, you may be able to override enough class members to make the stream class "silent" but it's not yet a handled use case. I've actually been talking to @ken_payne recently about a similar case, and we were imagining a silent parent "MultiStream" class which would emit only child stream data. https://gitlab.com/meltano/sdk/-/issues/167
Do you mind taking a look at this and see if it matches your use case? If so, perhaps we can pilot something for your tap and incorporate that back into the SDK.
s
Thanks, AJ. I think a
SilentStream
is exactly what I'm looking for, but after some thinking, I'm actually not sure if it's the best approach for me. I think the best approach might be to add a
post_process
call so that the
list_tables
stream enriches each record that it returns using
table_datail
, so that the parent is not silent but rather just requires an extra call to retrieve the full record.
what i ended up doing for this tap was adding the "real" information for the table into the post_process call. I don't love it, because I would prefer to have a 1:1 relationship with a stream and an API endpoint, but I think it works in this case.
Copy code
def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
        """Append data source and connection string to record."""
        # Get additional data from direct endpoint
        prepared_request = self.prepare_request(context=context, next_page_token=None)
        prepared_request.url = f"{self.url_base}/project/{row['id']}"
        prepared_request.params = {}
        response = self._request_with_backoff(prepared_request, context)

        # Set emitted record to be the detailed record
        record  = response.json()
        return record
e
Hi @stephen_bailey! Is your use case more or less in line with what's described in this issue: https://gitlab.com/meltano/sdk/-/issues/93?
s
Yes, this is exactly what I ended up doing:
During data-append - The tap may need to supliment the core data with additional calls in the post_process() handler.
., and what I tried to avoid by using the private method
_request_with_backoff
.
the developer would likely implement calls against the requests library directly.
🙂 It's not ideal, because I'm not sure what the behavior would be if one of the
post_process
calls failed -- would the row fail, or the stream break? -- but it allowed me to stay within the built-in logic of the SDK