Mishank Gehlot
03/19/2024, 7:55 AMmeltano --log-level=debug run tap-mysql custom-transformer target-postgres
, I encounter the error. However, when I run meltano --log-level=debug run tap-mysql target-postgres
, the pipeline executes successfully. This suggests that there might be an issue with the custom transformer.Reuben (Matatika)
03/19/2024, 9:34 AMtransformer
or mapper
? Only mappers can be placed between a tap and a target with meltano run
to constitute a valid block (hence your bad block sequence
error).Mishank Gehlot
03/19/2024, 11:45 AMReuben (Matatika)
03/19/2024, 12:30 PMtransformer
plugin available is dbt
currently, and you can execute it in an extract-load-transform (ELT) fashion, like
meltano run <tap> <target> dbt
or separate out the extract-load and transform processes
meltano run <tap> <target>
meltano run dbt # or `meltano invoke dbt`
It really depends on what you mean by "complex" though - what's your case for using a custom transformer over using a tool like dbt? It sounds to me like you are trying to implement an ETL (as opposed to an ELT), in which case I think your only option with Meltano would be to use a mapper.
https://meltano.com/blog/the-benefits-of-elt-vs-etl-what-you-need-to-know/Mishank Gehlot
03/19/2024, 12:48 PMReuben (Matatika)
03/19/2024, 1:05 PMmeltano.yml
plugins:
extractors:
- name: <tap>
# ...
config:
stream_maps:
# ...
loaders:
- name: <target>
# ...
meltano run <tap> <target>
Non-SDK tap
meltano.yml
plugins:
extractors:
- name: <tap>
# ...
mappers:
- name: meltano-map-transformer
# ...
mappings:
- name: my-mapping
config:
stream_maps:
# ...
loaders:
- name: <target>
# ...
meltano run <tap> my-mapping <target>
Mishank Gehlot
03/19/2024, 1:29 PMMishank Gehlot
03/19/2024, 8:27 PMReuben (Matatika)
03/19/2024, 10:19 PMconfig:
stream_maps:
user: __NULL__ # don't sync the base `user` table
user_1:
__source__: user
__filter__: companyId == 1
user_2:
__source__: user
__filter__: companyId == 2
user_3:
__source__: user
__filter__: companyId == 3
Mishank Gehlot
03/20/2024, 5:13 AM- name: custom-mapper
namespace: custom_mapper
pip_url: ./custom_mapper/dist/custom_mapper-0.1.tar.gz
executable: custom_mapper
mapping: transform_external_share_links
config:
database_connection:
host: abc
port: 5432
dbname: dbwarehouse
user: user
password: pass
additional_data_query: |
SELECT companyId, shortName FROM Company
Can you help me find out what's wrong?Reuben (Matatika)
03/20/2024, 10:54 AMuser
table ends up in Postgres as a source.Reuben (Matatika)
03/20/2024, 10:56 AMadditional_data_query
is meant to be under config
- not at the same level?Mishank Gehlot
03/20/2024, 11:20 AMReuben (Matatika)
03/20/2024, 1:13 PMconfig:
stream_maps:
Link:
__partition__: companyId
where your input is
# `Link` table
id | companyId
0 | 44
1 | 45
2 | 56
and your expected output is
# `Link_44` table
id | companyId
0 | 44
# `Link_45` table
id | companyId
1 | 45
# `Link_46` table
id | companyId
2 | 46
I'm not sure how you would join records from Link
and company
streams to make company name
in the partitioned stream available, but this is already quite complicated...
Another thought: is it possible you can run the transformation before the extract happens?Mishank Gehlot
03/20/2024, 1:35 PMReuben (Matatika)
03/20/2024, 1:40 PM__source__
and __filter__
. If you are interested, maybe it's at least worth opening an issue on the SDK though?Reuben (Matatika)
03/20/2024, 1:44 PM