Matt Menzenski
01/19/2023, 8:35 PM__source__
to split a stream. Can I use __filter__
to split a stream into several?
Use case: I have JSON data in S3 that is messages from the MongoDB change stream. Each record has keys database_name
, collection_name
, and document
, corresponding to the source mongodb database and collection and the JSON message, respectively.
I would like to be able to split this data into multiple streams, one for each combination of database_name and collection_name.
Can I do something like this?
plugins:
extractors:
- name: tap-name
config:
stream_maps:
payments:
__filter__: 'database_name == "payments" and collection_name == "Payment"'
profiles:
__filter__: 'database_name == "customers" and collection_name == "Profile"'
alexander_butler
01/19/2023, 9:48 PMplugins:
extractors:
- name: tap-name
config:
stream_maps:
payments:
__source__: s3_mongo_object_stream_name
__filter__: 'database_name == "payments" and collection_name == "Payment"'
profiles:
__source__: s3_mongo_object_stream_name
__filter__: 'database_name == "customers" and collection_name == "Profile"'
Matt Menzenski
01/19/2023, 9:48 PMMatt Menzenski
01/19/2023, 10:21 PM__filter__
expression, does that yield an empty stream gracefully, or will it error?Matt Menzenski
01/19/2023, 10:21 PMMatt Menzenski
01/19/2023, 10:29 PMMatt Menzenski
01/19/2023, 10:29 PMalexander_butler
01/19/2023, 10:48 PMmeltano run tap-spreadsheets-anywhere split_stream target-whatever
where split_stream
is defined in the map transform plugin config as outlined in the repoalexander_butler
01/19/2023, 11:22 PMimport json,os,sys;prt=("database_name","collection_name");cfg=json.loads(os.environ["STREAM_SPLITS"])
for line in sys.stdin:
try:l=json.loads(line);li=l["record"];old=str(l["stream"])
except KeyError:json.dump(l+"\n",sys.stdout);sys.stdout.flush();continue
except:continue # unparseable line
for f in cfg: # f[0] is the database name, f[1] is the collection name, f[2] is the new stream name
if f[0]==li.get(prt[0]) and f[1]==li.get(prt[1]):li["stream"]=f[0];json.dump(li+"\n",sys.stdout);sys.stdout.flush()
li["stream"]=old;json.dump(li+"\n",sys.stdout);sys.stdout.flush()
Half baked attempt at the smallest possible stream filter+splitter just because its hilarious 🙃Matt Menzenski
01/24/2023, 6:11 PMschema_mapping
configuration property in the target-redshift loader I’m using https://github.com/transferwise/pipelinewise-target-redshiftMatt Menzenski
02/06/2023, 9:58 PMWill Da Silva (Arch)
02/06/2023, 10:04 PMneed to update meltano-map-transform to allow python 3.11 firstIn the meantime you should be able to run
meltano install --force
to bypass the Python version constraint if no other changes are required. The update to meltano-map-transform
is ideal, of course.Matt Menzenski
02/06/2023, 10:05 PMMatt Menzenski
02/06/2023, 10:05 PMWill Da Silva (Arch)
02/06/2023, 10:06 PM--force
flag for meltano add
too - Matt Menzenski
02/06/2023, 10:33 PMmappers
plugin key inside an environment’s config
block, the same way I can for extractors
and loaders
- is that expected?Matt Menzenski
02/06/2023, 10:41 PM