The <docs for Stream Maps> mention using `__source...
# troubleshooting
m
The docs for Stream Maps mention using
__source__
to split a stream. Can I use
__filter__
to split a stream into several? Use case: I have JSON data in S3 that is messages from the MongoDB change stream. Each record has keys
database_name
,
collection_name
, and
document
, corresponding to the source mongodb database and collection and the JSON message, respectively. I would like to be able to split this data into multiple streams, one for each combination of database_name and collection_name. Can I do something like this?
Copy code
plugins:
  extractors:
  - name: tap-name
    config:
      stream_maps:
        payments:
          __filter__: 'database_name == "payments" and collection_name == "Payment"'
        profiles:
          __filter__: 'database_name == "customers" and collection_name == "Profile"'
a
Did you try this:
Copy code
plugins:
  extractors:
  - name: tap-name
    config:
      stream_maps:
        payments:
          __source__: s3_mongo_object_stream_name
          __filter__: 'database_name == "payments" and collection_name == "Payment"'
        profiles:
          __source__: s3_mongo_object_stream_name
          __filter__: 'database_name == "customers" and collection_name == "Profile"'
m
ooh, no I haven’t - i will give this a shot thankyou
if there are no records that match the
__filter__
expression, does that yield an empty stream gracefully, or will it error?
if that will yield an empty stream this should work perfectly for my use case
ugh, this will only work if the tap uses the meltano SDK won’t it? I’m not sure the tap I’m using was built with the SDK
tap-spreadsheets-anywhere
a
There is a standalone version that may get the job done https://github.com/MeltanoLabs/meltano-map-transform
meltano run tap-spreadsheets-anywhere split_stream target-whatever
where
split_stream
is defined in the map transform plugin config as outlined in the repo
Copy code
import json,os,sys;prt=("database_name","collection_name");cfg=json.loads(os.environ["STREAM_SPLITS"])
for line in sys.stdin:
 try:l=json.loads(line);li=l["record"];old=str(l["stream"])
 except KeyError:json.dump(l+"\n",sys.stdout);sys.stdout.flush();continue
 except:continue # unparseable line
 for f in cfg:  # f[0] is the database name, f[1] is the collection name, f[2] is the new stream name
  if f[0]==li.get(prt[0]) and f[1]==li.get(prt[1]):li["stream"]=f[0];json.dump(li+"\n",sys.stdout);sys.stdout.flush()
 li["stream"]=old;json.dump(li+"\n",sys.stdout);sys.stdout.flush()
Half baked attempt at the smallest possible stream filter+splitter just because its hilarious 🙃
m
I’m actually wondering now if I can accomplish at least some of this behavior with the
schema_mapping
configuration property in the target-redshift loader I’m using https://github.com/transferwise/pipelinewise-target-redshift
Finally getting around to testing these options and need to update meltano-map-transform to allow python 3.11 first https://github.com/MeltanoLabs/meltano-map-transform/pull/102
w
need to update meltano-map-transform to allow python 3.11 first
In the meantime you should be able to run
meltano install --force
to bypass the Python version constraint if no other changes are required. The update to
meltano-map-transform
is ideal, of course.
m
Whoa, good tip!
I swear, there’s some sort of easy workaround like this for almost every issue I encounter
w
(to do: support the
--force
flag for
meltano add
too - I should log an issue for that if there isn't one EDIT: https://github.com/meltano/meltano/issues/7277)
m
I seem not to be able to configure a
mappers
plugin key inside an environment’s
config
block, the same way I can for
extractors
and
loaders
- is that expected?
@alexander_butler thank you for the link to the standalone meltano-map-transform, that worked perfectly 🦜