Hello Everyone, I'm currently working on developin...
# troubleshooting
m
Hello Everyone, I'm currently working on developing a custom transformer. However, I've encountered an error when integrating the transformer with tap-mysql (for retrieving data from MySQL DB) and target-postgres (for storing data in PostgreSQL DB), as shown in the attached image. Can someone assist me with this? Interestingly, when I execute
meltano --log-level=debug run tap-mysql custom-transformer target-postgres
, I encounter the error. However, when I run
meltano --log-level=debug run tap-mysql target-postgres
, the pipeline executes successfully. This suggests that there might be an issue with the custom transformer.
r
Is this a custom
transformer
or
mapper
? Only mappers can be placed between a tap and a target with
meltano run
to constitute a valid block (hence your
bad block sequence
error).
m
this is a transformer. @Reuben (Matatika) So can we do some complex transformations using a custom mapper? or we need a transformer for that? Also can you tell me what is the sequence of transformer along with extractor and loader.
r
Generally not, no. I've mostly seen mappers used for small transformations to normalise data or add properties (e.g. map-gpt-embeddings), but not much beyond that. Mappers don't really suit themselves to more complex transformations, mostly because you are essentially just modifying the stdout of the tap before it is sent to the target. The only other
transformer
plugin available is
dbt
currently, and you can execute it in an extract-load-transform (ELT) fashion, like
Copy code
meltano run <tap> <target> dbt
or separate out the extract-load and transform processes
Copy code
meltano run <tap> <target>
meltano run dbt  # or `meltano invoke dbt`
It really depends on what you mean by "complex" though - what's your case for using a custom transformer over using a tool like dbt? It sounds to me like you are trying to implement an ETL (as opposed to an ELT), in which case I think your only option with Meltano would be to use a mapper. https://meltano.com/blog/the-benefits-of-elt-vs-etl-what-you-need-to-know/
m
@Reuben (Matatika) Can we do something like duplicating rows of specific tables and in those duplicate rows add/update some properties of that rows using mapper? all operations will be done on single row so mostly it is not complex transformation just changing some name value pair changes in existing json stdout by duplicating rows.
r
If that's the requirement, then there's not much need for a custom mapper since you can use stream maps with an SDK tap or the existing meltano-map-transform mapper (a compatibility layer to leverage stream maps for non-SDK taps) : • Duplicating a row: https://sdk.meltano.com/en/latest/stream_maps.html#duplicating-or-splitting-a-stream-using-source • Adding/updating properties: https://sdk.meltano.com/en/latest/stream_maps.html#constructing-expressions SDK tap
meltano.yml
Copy code
plugins:
  extractors:
  - name: <tap>
    # ...
    config:
      stream_maps:
        # ...
  loaders:
  - name: <target>
    # ...
Copy code
meltano run <tap> <target>
Non-SDK tap
meltano.yml
Copy code
plugins:
  extractors:
  - name: <tap>
    # ...
  mappers:
  - name: meltano-map-transformer
    # ...
    mappings:
    - name: my-mapping
      config:
        stream_maps:
          # ...
  loaders:
  - name: <target>
    # ...
Copy code
meltano run <tap> my-mapping <target>
❤️ 1
m
Thanks for the help!
np 1
@Reuben (Matatika), In my use case, I aim to customize data streams containing a 'companyId' field. Consider the 'user' table, which includes data from three different companies with companyId values of 1, 2, and 3 in MySQL. Initially, the data will be extracted using tap-mysql and then loaded into PostgreSQL using target-postgres. In PostgreSQL, I require separate tables for each company alongside the main 'user' table. For instance, I need tables named 'user', 'user_1', 'user_2', and 'user_3' to store data specific to each company. How can I achieve this customization?
r
With stream maps:
Copy code
config:
  stream_maps:
    user: __NULL__  # don't sync the base `user` table
    user_1:
      __source__: user
      __filter__: companyId == 1
    user_2:
      __source__: user
      __filter__: companyId == 2
    user_3:
      __source__: user
      __filter__: companyId == 3
m
@Reuben (Matatika) The perspective is we don't know how many companies are there it can increase with time so we need a generic logic which duplicates the rows of streams which has companyId after extracted from MySQL using tap-mysql and based on companyId create company specific tables in postgres using target-postgres. current meltano.yml config for above use-case, I have done has some issue. config:
Copy code
- name: custom-mapper
  namespace: custom_mapper
  pip_url: ./custom_mapper/dist/custom_mapper-0.1.tar.gz
  executable: custom_mapper
  mapping: transform_external_share_links
  config:
    database_connection:
      host: abc
      port: 5432
      dbname: dbwarehouse
      user: user
      password: pass
  additional_data_query: |
    SELECT companyId, shortName FROM Company
Can you help me find out what's wrong?
r
I don't know anything about your custom mapper, but trying to execute SQL in between a tap and target doesn't seem like the right approach to me. I would consider using dbt for this kind of work (where you can write your transform logic in layers using SQL), if its not critical that your
user
table ends up in Postgres as a source.
If you want to continue trying with a custom mapper, then I assume in your example
additional_data_query
is meant to be under
config
- not at the same level?
m
@Reuben (Matatika) I aim to retrieve records from tap-mysql and perform minor transformations on them before sending the adjusted JSON to target-postgres. I checked the existing plugins but seems as per my use case they won't fit so i need to create a custom one. Now, regarding the SQL as you suggested it can be transfered at dbt side but there are some transformation which needs to be done during meltano execution as per use-case. Use-Case: we have some tables in MySQL having companyId column in and we need to segregate them by companyId and store it in postgres having tablename as tablename_companyName. Example: Link table : linkId, Link, companyId (example : 1, abc.com, 44) company table : id, name (name: 44, company44) Now, I want to have Link_company44 table in postgres. Please note that the number of companies in the company table can vary, and all tables containing a companyId column in MySQL are provided as input. The output will consist of company-specific tables created in PostgreSQL.
r
OK, I understand what you want to do and why you are trying to create a custom mapper. Fundamentally, you are describing an ETL process and if you are locked to that requirement, a mapper is really your only option with Meltano (as far as I am aware). On SQL in a mapper: bear in mind that data has already been extracted by the tap, so it is probably undesirable/unnecessary to run a supplemental query like you are trying to do. I would have a look at how to implement some kind of record partitioning logic in your custom mapper Python code, or some enhancement to stream maps to add partitioning support, so that you would able to do something like
Copy code
config:
  stream_maps:
    Link:
      __partition__:  companyId
where your input is
Copy code
# `Link` table
id | companyId
0  | 44
1  | 45
2  | 56
and your expected output is
Copy code
# `Link_44` table
id | companyId
0  | 44

# `Link_45` table
id | companyId
1  | 45

# `Link_46` table
id | companyId
2  | 46
I'm not sure how you would join records from
Link
and
company
streams to make company
name
in the partitioned stream available, but this is already quite complicated... Another thought: is it possible you can run the transformation before the extract happens?
m
@Reuben (Matatika), For now it is not possible to run transformation before extraction. Also, just to confirm are you planning to enhance the stream maps to include a config called partition ?
r
No, it was just an idea, I don't have a requirement for it at the moment. I'm also not fully aware of the implications something like this has downstream - in theory it's just a dynamic way of using
__source__
and
__filter__
. If you are interested, maybe it's at least worth opening an issue on the SDK though?
@Edgar Ramírez (Arch.dev) Any thoughts here? Just wanna be sure I haven't missed anything obvious.