I’d like to split the stream of records produced b...
# troubleshooting
m
I’d like to split the stream of records produced by a tap into individual streams based on a property on the records. I believe this is possible (using __source__ and/or __filter__, not sure exactly how yet). After splitting into individual streams, I’d like to write the records to Snowflake using the MeltanoLabs variant of the target-snowflake loader. But, I’d like to write the records to multiple schemas, and I’m not sure if this is possible - the
schema
(and
default_schema
) property gets set as part of the target configuration and it doesn’t seem like there’s a way to configure it so that records with
source_name=service_a
get written to schema
service_a
while records with
source_name=service_b
get written to schema
service_b
. Is there any to accomplish this kind of behavior?
apparently this sort of thing is possible with the
schema_mapping
config property in the transferwise variant 🤔 But we’re already using the MeltanoLabs variant in production and I don’t want to switch
e
schema_mapping isn't yet a builtin SDK setting, but target-snowflake supports creating schemas based on the
<schema>-<table>
stream name pattern. Maybe that's enough for a workaround?
m
Ooh, yeah, thank you - that should work fine
Although this maybe makes the stream splitting the big unknown step then… Can I use an inline stream map to dynamically split a stream? A record in the stream will have a property
namespace
which is a key-value dict:
”namespace”: {“database”: “customer_service”, “collection”: “Customer”}
- I would like to be able to split the single stream into a stream for each database and collection (so this example would be split into a new
customer_service-customer
stream) without having to hardcode a lot of all possible databases and collections (as new ones will be created and I want them to be picked up automatically).
I am now thinking that defining a standalone mapper plugin that can set stream_id is maybe the best way to do this
e
Ah, you're right it'd be nice to automate the renaming based on the actual records. Something like this but in reverse, ie splitting instead of merging, so not even
schema_mapping
would help with that.
👍 1
m
If I split a stream (by associating a record with a new stream_id), do I need to do something with schema messages as well? The inline stream maps docs don’t make any mention of having to add a new schema record for the new stream - is there something happening under the hood that allows a target to “know” that the new stream uses the same schema as the original stream?
e
A mapper is expected to yield zero or more messages for each one in the input. That's how you can, for example, alias streams. At the implementation level, zero or more messages are yielded for each message of every type: https://github.com/MeltanoLabs/meltano-map-transform/blob/2d57a57e594e0c987f825a0f04e54e92ffeb16e7/meltano_map_transform/mapper.py#L109-L115
m
thanks! to confirm I’m understanding that particular block of code correctly - that is yielding a schema message to all known streams, each time one is received? If so that sounds like exactly what I need to do
e
Hmm, not sure what to "all known streams" refers to specifically
m
I am maybe misinterpreting the
for stream_map in self.mapper.stream_maps[stream_id]:
but, if I split one stream into ten, dynamically, it sounds like I should be emitting a schema message to each of the ten new streams
target-snowflake supports creating schemas based on the
<schema>-<table>
stream name pattern. Maybe that’s enough for a workaround?
Specifically, I am going to try to split one stream into many using this naming pattern for the new streams so that I can make target-snowflake land the data into the tables that we’re already using, rather than into a single giant table containing all events.
e
but, if I split one stream into ten, dynamically, it sounds like I should be emitting a schema message to each of the ten new streams
that is correct
essentially each new stream should have SCHEMA and RECORD messages. STATE doesn't matter because it's just the future input to the un-split tap, so it can be left untouched.
m
Thanks Edgar! FWIW I’ve opened https://github.com/meltano/sdk/issues/2502 for this (for this idea specifically)
👀 1
I’m going to continue to noodle on this. In this case, we’re using a tap built with the SDK (a tap that we wrote/we control) so I’m wondering if maybe it’s easier to push this splitting up into the tap and do dynamic stream names there 🤔
e
Thanks, left a comment on the issue.
I’m wondering if maybe it’s easier to push this splitting up into the tap and do dynamic stream names there
Yeah, if it's not ever meant to be public or generally useful outside your org it might definitely make sense to do it in the tap.
m
Is this an appropriate use case for stream partitioning? https://sdk.meltano.com/en/latest/partitioning.html
parent-child streams almost seems like it would work, except that I don’t really want the “parent” stream, just the “children”, and I want to determine the “child” streams dynamically 🤔