<@U06CASRQ0H0> Could you please guide me on how to...
# singer-taps
j
@Edgar Ramírez (Arch.dev) Could you please guide me on how to flatten the
JSON
data from the
MySQL
tap configuration before it’s loaded into
BigQuery
? I’ve already set
flattening_enabled: true
in the
BigQuery
target configuration, but I’m not achieving the desired result. Here's my current `meltano.yml`:
Copy code
plugins:
  extractors:
  - name: tap-mysql
    variant: transferwise
    pip_url: 
      git+<https://github.com/transferwise/pipelinewise.git#subdirectory=singer-connectors/tap-mysql>
    select:
    - '*.*'      # Select all tables first
    - '!*_audit*'      # Then exclude audit tables
    metadata:
      '*.*':    # Apply metadata to all non-excluded tables
        replication-method: INCREMENTAL
        replication_key: update_time
        key_properties:
        - id

  mappers:
  - name: meltano-map-transformer
    variant: meltano
    pip_url: git+<https://github.com/MeltanoLabs/meltano-map-transform.git>
    executable: meltano-map-transform
    mappings:
    - name: mysql_to_bq
      config:
        flattening_enabled: true
        flattening_max_depth: 3   # flatten only top-level properties
        stream_maps:
          "*":
            __alias: "{{ stream_name | regex_replace('^smartterm_', '') }}"

  loaders:
  - name: target-bigquery
    variant: z3z1ma
    pip_url: git+<https://github.com/z3z1ma/target-bigquery.git>
    config:
      batch_size: 10000
      flattening_enabled: true
      flattening_max_depth: 3
      partition_granularity: day
      dedupe_before_upsert: true
      timeout: 3600
      upsert: true
e
You probably need to make sure the type of the field emitted by tap-mysql can be flattened. If you run
meltano invoke tap-mysql --discover > catalog.json
and you inspect the contents of the file for the type of the
JSON
field, it should be
object
.
j
Yes, it is JSON object and below is a snippet the json file:
Copy code
{
  "streams": [
    {
      "tap_stream_id": "smartterm-account_medical_history",
      "table_name": "account_medical_history",
      "schema": {
        "properties": {
          "id": {
            "inclusion": "automatic",
            "maxLength": 45,
            "type": [
              "null",
              "string"
            ]
          },
          "account_id": {
            "inclusion": "available",
            "maxLength": 45,
            "type": [
              "null",
              "string"
            ]
          },
          // More fields...
        },
        "type": "object"
      }
    }
  ]
}
@Reuben (Matatika) please, do you how I can resolve this?
r
Which properties are you expecting to be flattened? You are only showing two above and they are both
string
, so they won't be flattened (as Edgar said, only
object
can be flattened).
j
The JSON output is large that's I shared a snippet. It means I will have to flatten the tables when performing the transformation.
r
Still not sure I follow. What happens when you load without flattening? What exactly are you expecting to be loaded? From the above, I'd expect a BigQuery table with at least two columns:
id
and
account_id
.
j
@Reuben (Matatika) When the schema is loaded into BigQuery without being flattened, it uses the
stream_id
as the table name. However, what I need is to load the exact
table_name
that is returned. I believe achieving this would require flattening the
stream
.
r
Don't think that is anything to do with flattening. If you want the exact same table name from MySQL mirrored in BigQuery, then you need to manipulate the stream ID using
stream_maps
. It looks like you are trying to do something like that here
Copy code
stream_maps:
  "*":
    __alias: "{{ stream_name | regex_replace('^smartterm_', '') }}"
but the correct key is
__alias__
. Also, that expression syntax is incorrect - it should be Python:
Copy code
stream_maps:
  "*":
    __alias__: '__stream_name__.replace("smartterm_", "")'
https://sdk.meltano.com/en/latest/stream_maps.html#constructing-expressions Curious where you got that expression syntax from?
j
Thanks @Reuben (Matatika) that has been resolved. However, I'm being face with another error and it's the extractor
tap-mysql
throwback this error:
Copy code
2025-01-27T18:15:47.090749Z [error    ] Extractor failed              
2025-01-27T18:15:47.091211Z [error    ] Block run completed.           block_type=ExtractLoadBlocks err=RunnerError('Extractor failed') exit_codes={<PluginType.EXTRACTORS: 'extractors'>: 1} set_number=0 success=False

Run invocation could not be completed as block failed: Extractor failed
🙌 1
r
Nothing else in the error trace? You can always isolate the tap process to debug:
Copy code
meltano invoke tap-mysql
j
It return the stream and the nested schemas as well as the tables
I think the extractor timer is exceeded;
block_type=ExtractLoadBlocks err=RunnerError('Extractor failed') exit_codes={<PluginType.EXTRACTORS: 'extractors'>: 1} set_number=0 success=False
r
Don't think so. Do you see an error trace in the output?
j
I'm not quite sure how else to explain it... I will try and troubleshoot in the meantime. Thanks for your assistance so far.
@Reuben (Matatika) @Edgar Ramírez (Arch.dev) I'm using
tap-mysql
with
LOG_BASED
replication to extract ~100GB of data and load it into BigQuery via
target-bigquery
. My current config:
Copy code
version: 1
default_environment: dev
project_id: 76220d7a-72d0-4a02-a8df-2e5012c509aa
environments:
- name: dev
- name: staging
- name: prod
plugins:
  extractors:
  - name: tap-mysql
    variant: transferwise
    pip_url: 
      git+<https://github.com/transferwise/pipelinewise.git#subdirectory=singer-connectors/tap-mysql>
    config:
      session_sqls:
        - SET @@session.max_execution_time=0       # No limit
        - SET @@session.net_read_timeout=3600      # 1 hour
        - SET @@session.net_write_timeout=3600     # 1 hour

          # Set other session variables to the default PPW ones
        - SET @@session.time_zone="+0:00"
        - SET @@session.wait_timeout=28800
        - SET @@session.innodb_lock_wait_timeout=3600
    select:
      - '*.*'          # Select all tables first
      - '!*_audit*'          # Then exclude audit tables
    metadata:
      '*.*':    # Apply metadata to all non-excluded tables
        replication-method: LOG_BASED
        # replication_key: update_time
        key_properties:
         - id

  mappers:
  - name: meltano-map-transformer
    variant: meltano
    pip_url: git+<https://github.com/MeltanoLabs/meltano-map-transform.git>
    executable: meltano-map-transform
    mappings:
    - name: rename_stream
      config:
        stream_maps:
          '*':
            __alias__: __stream_name__ .replace("smartterm-", "")

  loaders:
  - name: target-bigquery
    variant: z3z1ma
    pip_url: git+<https://github.com/z3z1ma/target-bigquery.git>
    config:
      batch_size: 500
      partition_granularity: month
      dedupe_before_upsert: true
      timeout: 3600
      upsert: true
      cluster_on_key_properties: false
      options:
        max_workers: 50  # Optimized for parallel processing
        process_pool: false
        storage_write_batch_mode: false
      method: storage_write_api
      flattening_enabled: true
      flattening_max_depth: 10
  - name: target-jsonl
    variant: andyh1203
    pip_url: target-jsonl
Despite this, I'm getting duplicate records in BigQuery. What's the best way to ensure proper deduplication? Should I switch to
INCREMENTAL
replication, or is there something I’m missing in my config? Appreciate any insights!
e
That particular target-bigquery has a number of ways of dealing with duplicates, so I'll just link to the readme here 😅 https://github.com/z3z1ma/target-bigquery?tab=readme-ov-file#upsert-aka-merge
1