Matt Menzenski
01/19/2023, 8:35 PM__source__ to split a stream. Can I use __filter__ to split a stream into several?
Use case: I have JSON data in S3 that is messages from the MongoDB change stream. Each record has keys database_name , collection_name, and document , corresponding to the source mongodb database and collection and the JSON message, respectively.
I would like to be able to split this data into multiple streams, one for each combination of database_name and collection_name.
Can I do something like this?
plugins:
extractors:
- name: tap-name
config:
stream_maps:
payments:
__filter__: 'database_name == "payments" and collection_name == "Payment"'
profiles:
__filter__: 'database_name == "customers" and collection_name == "Profile"'alexander_butler
01/19/2023, 9:48 PMplugins:
extractors:
- name: tap-name
config:
stream_maps:
payments:
__source__: s3_mongo_object_stream_name
__filter__: 'database_name == "payments" and collection_name == "Payment"'
profiles:
__source__: s3_mongo_object_stream_name
__filter__: 'database_name == "customers" and collection_name == "Profile"'Matt Menzenski
01/19/2023, 9:48 PMMatt Menzenski
01/19/2023, 10:21 PM__filter__ expression, does that yield an empty stream gracefully, or will it error?Matt Menzenski
01/19/2023, 10:21 PMMatt Menzenski
01/19/2023, 10:29 PMMatt Menzenski
01/19/2023, 10:29 PMalexander_butler
01/19/2023, 10:48 PMmeltano run tap-spreadsheets-anywhere split_stream target-whatever where split_stream is defined in the map transform plugin config as outlined in the repoalexander_butler
01/19/2023, 11:22 PMimport json,os,sys;prt=("database_name","collection_name");cfg=json.loads(os.environ["STREAM_SPLITS"])
for line in sys.stdin:
try:l=json.loads(line);li=l["record"];old=str(l["stream"])
except KeyError:json.dump(l+"\n",sys.stdout);sys.stdout.flush();continue
except:continue # unparseable line
for f in cfg: # f[0] is the database name, f[1] is the collection name, f[2] is the new stream name
if f[0]==li.get(prt[0]) and f[1]==li.get(prt[1]):li["stream"]=f[0];json.dump(li+"\n",sys.stdout);sys.stdout.flush()
li["stream"]=old;json.dump(li+"\n",sys.stdout);sys.stdout.flush()
Half baked attempt at the smallest possible stream filter+splitter just because its hilarious 🙃Matt Menzenski
01/24/2023, 6:11 PMschema_mapping configuration property in the target-redshift loader I’m using https://github.com/transferwise/pipelinewise-target-redshiftMatt Menzenski
02/06/2023, 9:58 PMWill Da Silva (Arch)
02/06/2023, 10:04 PMneed to update meltano-map-transform to allow python 3.11 firstIn the meantime you should be able to run
meltano install --force to bypass the Python version constraint if no other changes are required. The update to meltano-map-transform is ideal, of course.Matt Menzenski
02/06/2023, 10:05 PMMatt Menzenski
02/06/2023, 10:05 PMWill Da Silva (Arch)
02/06/2023, 10:06 PM--force flag for meltano add too - Matt Menzenski
02/06/2023, 10:33 PMmappers plugin key inside an environment’s config block, the same way I can for extractors and loaders - is that expected?Matt Menzenski
02/06/2023, 10:41 PM