Jacob Ukokobili
01/23/2025, 5:55 PMJSON
data from the MySQL
tap configuration before it’s loaded into BigQuery
? I’ve already set flattening_enabled: true
in the BigQuery
target configuration, but I’m not achieving the desired result.
Here's my current `meltano.yml`:
plugins:
extractors:
- name: tap-mysql
variant: transferwise
pip_url:
git+<https://github.com/transferwise/pipelinewise.git#subdirectory=singer-connectors/tap-mysql>
select:
- '*.*' # Select all tables first
- '!*_audit*' # Then exclude audit tables
metadata:
'*.*': # Apply metadata to all non-excluded tables
replication-method: INCREMENTAL
replication_key: update_time
key_properties:
- id
mappers:
- name: meltano-map-transformer
variant: meltano
pip_url: git+<https://github.com/MeltanoLabs/meltano-map-transform.git>
executable: meltano-map-transform
mappings:
- name: mysql_to_bq
config:
flattening_enabled: true
flattening_max_depth: 3 # flatten only top-level properties
stream_maps:
"*":
__alias: "{{ stream_name | regex_replace('^smartterm_', '') }}"
loaders:
- name: target-bigquery
variant: z3z1ma
pip_url: git+<https://github.com/z3z1ma/target-bigquery.git>
config:
batch_size: 10000
flattening_enabled: true
flattening_max_depth: 3
partition_granularity: day
dedupe_before_upsert: true
timeout: 3600
upsert: true
Edgar Ramírez (Arch.dev)
01/23/2025, 8:00 PMmeltano invoke tap-mysql --discover > catalog.json
and you inspect the contents of the file for the type of the JSON
field, it should be object
.Jacob Ukokobili
01/24/2025, 7:35 PM{
"streams": [
{
"tap_stream_id": "smartterm-account_medical_history",
"table_name": "account_medical_history",
"schema": {
"properties": {
"id": {
"inclusion": "automatic",
"maxLength": 45,
"type": [
"null",
"string"
]
},
"account_id": {
"inclusion": "available",
"maxLength": 45,
"type": [
"null",
"string"
]
},
// More fields...
},
"type": "object"
}
}
]
}
Jacob Ukokobili
01/24/2025, 8:19 PMReuben (Matatika)
01/24/2025, 11:30 PMstring
, so they won't be flattened (as Edgar said, only object
can be flattened).Jacob Ukokobili
01/25/2025, 5:48 PMReuben (Matatika)
01/25/2025, 7:29 PMid
and account_id
.Jacob Ukokobili
01/26/2025, 8:45 AMstream_id
as the table name. However, what I need is to load the exact table_name
that is returned. I believe achieving this would require flattening the stream
.Reuben (Matatika)
01/26/2025, 9:21 AMstream_maps
. It looks like you are trying to do something like that here
stream_maps:
"*":
__alias: "{{ stream_name | regex_replace('^smartterm_', '') }}"
but the correct key is __alias__
. Also, that expression syntax is incorrect - it should be Python:
stream_maps:
"*":
__alias__: '__stream_name__.replace("smartterm_", "")'
https://sdk.meltano.com/en/latest/stream_maps.html#constructing-expressions
Curious where you got that expression syntax from?Jacob Ukokobili
01/27/2025, 6:50 PMtap-mysql
throwback this error:
2025-01-27T18:15:47.090749Z [error ] Extractor failed
2025-01-27T18:15:47.091211Z [error ] Block run completed. block_type=ExtractLoadBlocks err=RunnerError('Extractor failed') exit_codes={<PluginType.EXTRACTORS: 'extractors'>: 1} set_number=0 success=False
Run invocation could not be completed as block failed: Extractor failed
Reuben (Matatika)
01/27/2025, 7:04 PMmeltano invoke tap-mysql
Jacob Ukokobili
01/27/2025, 8:05 PMJacob Ukokobili
01/27/2025, 8:07 PMblock_type=ExtractLoadBlocks err=RunnerError('Extractor failed') exit_codes={<PluginType.EXTRACTORS: 'extractors'>: 1} set_number=0 success=False
Reuben (Matatika)
01/27/2025, 8:52 PMJacob Ukokobili
01/27/2025, 10:11 PMJacob Ukokobili
01/29/2025, 7:19 PMtap-mysql
with LOG_BASED
replication to extract ~100GB of data and load it into BigQuery via target-bigquery
. My current config:
version: 1
default_environment: dev
project_id: 76220d7a-72d0-4a02-a8df-2e5012c509aa
environments:
- name: dev
- name: staging
- name: prod
plugins:
extractors:
- name: tap-mysql
variant: transferwise
pip_url:
git+<https://github.com/transferwise/pipelinewise.git#subdirectory=singer-connectors/tap-mysql>
config:
session_sqls:
- SET @@session.max_execution_time=0 # No limit
- SET @@session.net_read_timeout=3600 # 1 hour
- SET @@session.net_write_timeout=3600 # 1 hour
# Set other session variables to the default PPW ones
- SET @@session.time_zone="+0:00"
- SET @@session.wait_timeout=28800
- SET @@session.innodb_lock_wait_timeout=3600
select:
- '*.*' # Select all tables first
- '!*_audit*' # Then exclude audit tables
metadata:
'*.*': # Apply metadata to all non-excluded tables
replication-method: LOG_BASED
# replication_key: update_time
key_properties:
- id
mappers:
- name: meltano-map-transformer
variant: meltano
pip_url: git+<https://github.com/MeltanoLabs/meltano-map-transform.git>
executable: meltano-map-transform
mappings:
- name: rename_stream
config:
stream_maps:
'*':
__alias__: __stream_name__ .replace("smartterm-", "")
loaders:
- name: target-bigquery
variant: z3z1ma
pip_url: git+<https://github.com/z3z1ma/target-bigquery.git>
config:
batch_size: 500
partition_granularity: month
dedupe_before_upsert: true
timeout: 3600
upsert: true
cluster_on_key_properties: false
options:
max_workers: 50 # Optimized for parallel processing
process_pool: false
storage_write_batch_mode: false
method: storage_write_api
flattening_enabled: true
flattening_max_depth: 10
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
Despite this, I'm getting duplicate records in BigQuery. What's the best way to ensure proper deduplication? Should I switch to INCREMENTAL
replication, or is there something I’m missing in my config?
Appreciate any insights!Edgar Ramírez (Arch.dev)
01/29/2025, 10:58 PM