I find the role of `def discover_streams` unclear,...
# singer-taps
r
I find the role of
def discover_streams
unclear, if it's called even if --catalog is passed. I'd expect it to be called only without the catalog or with --discover, when a discovery is actually needed
e
Yeah, a more accurate name would be
register_streams
.
r
now I'm more confused 🤷
e
So what's the unexpected behavior of
discover_streams
when
--catalog
is passed that you're noticing? We have to register the streams at some point, either to generate the catalog for
--discover
or to sync them.
SQL (or other dynamic taps) behave different of course, where the catalog does take priority for registering only the streams that were selected.
r
register_streams
suggest a side effect. Is it to pass its streams somewhere? I thought the tap would do actual discovery when either in discovery mode or sync mode without input catalog, but I find it's called in sync mode with catalog, in which case the streams are not really discovered, but instead tap must return it. I thought in this case SDK would infer them from catalog
e
> I thought the tap would do actual discovery when either in discovery mode or sync mode without input catalog That's the case already: https://github.com/meltano/sdk/blob/85fd1551f6feb8ed7ca0d5842a84c95c0827a46a/singer_sdk/tap_base.py#L312-L320. The confusing part is that
discover_streams
has to run regardless of whether the tap is running in DISCOVERY or SYNC mode. But I'm curious if there's any unexpected behavior or bug that you're running into. Perhaps you're expecting streams that are not in the catalog to not be instantiated?
r
when in SYNC mode with catalog it still calls discover_streams:
sync_all
(either directly or via ->
_set_compatible_replication_methods
) ->
streams
->
load_streams
->
discover_streams
I don't see any conditions in that call hierarchy that would avoid it
e
I guess my question is why would you want to avoid calling it beyond the semantics of the method name 😅
r
in sync mode with catalog there's no need for discovery, or is there? So the SDK should only conditionally call discover_streams, no?
r
I think the point is that
discover_streams
isn't actually responsible for running the discovery process, despite the method name (IIRC that falls to
discover_catalog_entries
). I do agree it is a bit confusing.
r
so what should it do, what does it mean to register streams?
r
To instantiate stream class instances?
r
sure, but based on what? upstream data source or the user-provided catalog?
r
Based on the stream classes the tap declares (usually in
streams.py
). I guess you could think of it as a reflection of the upstream source.
r
my tap doesn't have classes per stream, it's dynamic, based on excel file it reads.
same case with any sql tap
r
Then you need to "register" an
ExcelStream
(or whatever client stream class you have) for each sheet in the file. I imagine it would be very similar to our
tap-google-sheets
implementation: https://github.com/Matatika/tap-google-sheets/blob/7994d3ad40c0bb5f1400b1539a4cf4a8d5f59880/tap_google_sheets/tap.py#L94:L125
1
r
but when user (meltano) passes a catalog the tap already knows streams, why should it additionally read any files? The thing is we have two sources of truth, and why it's the tap to decide which one to use and not SDK
r
OK, I think I see what you mean actually. I think it might be because the input catalog properties are treated as overrides for the catalog the tap generates, rather than a complete replacement: https://github.com/meltano/sdk/blob/85fd1551f6feb8ed7ca0d5842a84c95c0827a46a/singer_sdk/tap_base.py#L158:L159 https://github.com/meltano/sdk/blob/85fd1551f6feb8ed7ca0d5842a84c95c0827a46a/singer_sdk/streams/core.py#L1316:L1348
r
then there's get_effective_schema that uses input schema if available, or the stream schema otherwise, without merging/overriding...
r
Yeah, all quite confusing. I think this would work though:
Copy code
def discover_streams(self) -> list[streams.ExcelStream]:
        if self.input_catalog:
            streams = []
            for name, entry in self.input_catalog.items():
                stream = streams.ExcelStream(self, name, entry.schema)
                stream.apply_catalog(self.input_catalog)
                streams.append(stream)
            return streams

        # else dynamic discovery logic
r
but that's pretty much
if self.input_catalog:
# build streams from catalog
else:
# discover streams
looks to me it should be done in SDK and
discover_streams
only used for discovery
r
Yes, I don't see why that couldn't be the default behaviour.
e
Ok, I think I get where you guy are coming from. PRs are welcome by all means.
👍 2
r