Having issues properly separating my custom taps d...
# singer-tap-development
e
Having issues properly separating my custom taps different streams when instantiating pipelines Does anyone who has written a custom tap, successfully created it so one pipeline gets one stream... another pipeline grabs a different stream?
my meltano.yml so far
Copy code
plugins:
  extractors:
  - name: tap-ibkr-tickers
    namespace: tap_ibkr
    executable: /home/tap-ibkr/tap-ibkr.sh
    config:
      host_tws_thrift: 127.0.0.1
      tws_thrift_port: 9090
    select:
    - ib_tickers.*
  - name: tap-ibkr-news
    namespace: tap_ibkr
    executable: /home/tap-ibkr/tap-ibkr.sh
    config:
      host_tws_thrift: 127.0.0.1
      tws_thrift_port: 9090
      target_host: 1.2.3.4
      target_username: myusername
      target_password: 1234
    select:
    - ib_news.*
As you can see.. I have 1 tap.. and trying to select the different streams streams.py
Copy code
class IBTickersStream(IBKRStream):

    name = "ib_tickers"
    primary_keys = ["contract_id", "query_start_time"]
    replication_key = None

    partitions = get_all_tickers()

    schema = th.PropertiesList(
        th.Property("symbol", th.StringType),
        th.Property("sec_type", th.StringType),
        th.Property("primary_exchange", th.StringType),
        th.Property("exchange", th.StringType),
        th.Property("currency", th.StringType),
        th.Property("contract_id", th.IntegerType),
        th.Property("query_start_time", th.DateTimeType)
    ).to_dict()


class IBNewsStream(NewsStream):

    name = "ib_news"
    primary_keys = ["article_id", "provider_code"]
    replication_key = None

    schema = th.PropertiesList(
        th.Property("symbol", th.StringType),
        th.Property("contract_id", th.IntegerType),
        th.Property("provider_code", th.StringType),
        th.Property("article_id", th.StringType),
        th.Property("article_timestamp", th.DateTimeType),
        th.Property("headline", th.StringType),
        th.Property("extra_data", th.StringType),
    ).to_dict()
Any advice appreciated.. as of now, the behavior I see is... if I call a pipeline attempting to ONLY select tickers... It will call news .... then the tickers.. and I do not understand how to properly segregate without writing a whole entire second tap.. which makes little sense to me and I must be doing something wrong
client.py
Copy code
class IBKRStream(Stream):

    def get_records(self, context: Optional[dict]) -> Iterable[dict]:
        # GETS THE TICKERS


class NewsStream(Stream):

    def get_records(self, context: Optional[dict]) -> Iterable[dict]:
        # GETS THE NEWS
v
Yes I've selected different streams. Your method should work, also using select_filters via env varialbes saves the need for inherit_from, but that may not be needed here as you have so few that are needed. https://docs.meltano.com/concepts/plugins#select_filter-extra This comes down to the tap implementation. Since you're using the singer sdk the only way additional streams get pulled is if there's a parent/child relationship. https://gitlab.com/meltano/sdk/-/blob/main/singer_sdk/tap_base.py#L347 https://gitlab.com/meltano/sdk/-/blob/main/singer_sdk/streams/core.py#L920 specifically https://gitlab.com/meltano/sdk/-/blob/main/singer_sdk/streams/core.py#L962 I can confirm I've used select filters to only select a parent stream and had the child streams not be synced before.
Nothing jumped at me from the code you posted as to why it'd happen. I'd want to see
get_all_tickers()
and
IBKRStream
and
NewsStream
e
Thanks Derik, i will open source the entire tap this week.. and come back.. been meaning to chase down this aspect and finally got some time
get_all_tickers()
simply returns partitions of combinations of the alphabet. (a, b, c….. zzzz) up to length 4. these partitions are fed.. and meant only for the Tickers stream.. with news we have to go search the last valid tickers already in the Target (postgresql)