astonishing-alarm-71586
01/07/2021, 5:59 PMmeltano invoke tap-mongodb > output/b.jsonl
I can see that the tap generates a schema of type object which makes sense.
{
"type": "STATE",
"value": {
"bookmarks": {"junk-stuff": {"last_replication_method": "FULL_TABLE"}},
"currently_syncing": "junk-stuff",
},
}
{
"type": "SCHEMA",
"stream": "stuff",
"schema": {"type": "object"},
"key_properties": ["_id"],
}
{
"type": "STATE",
"value": {
"bookmarks": {
"junk-stuff": {
"last_replication_method": "FULL_TABLE",
"version": 1610041761516,
}
},
"currently_syncing": "junk-stuff",
},
}
{"type": "ACTIVATE_VERSION", "stream": "stuff", "version": 1610041761516}
{
"type": "RECORD",
"stream": "stuff",
"record": {
"_id": "5ff64e1191e2891d8b6ba683",
"id": 4711,
"tasks": [
{"created_on": "2021-01-06T16:56:01.058005", "value": 657},
{"created_on": "2021-01-06T16:56:01.058005", "value": 353},
{"created_on": "2021-01-06T16:56:01.058005", "value": 839},
{"created_on": "2021-01-06T16:56:01.058005", "value": 214},
{"created_on": "2021-01-06T16:56:01.058005", "value": 905},
{"created_on": "2021-01-06T16:56:01.058005", "value": 852},
{"created_on": "2021-01-06T16:56:01.058005", "value": 47},
{"created_on": "2021-01-06T16:56:01.058005", "value": 858},
],
"updated_on": "2021-01-06T16:57:39.025265",
},
"version": 1610041761516,
"time_extracted": "2021-01-07T17:49:21.517186Z",
}
{"type": "ACTIVATE_VERSION", "stream": "stuff", "version": 1610041762219}
{
"type": "STATE",
"value": {
"bookmarks": {
"junk-stuff": {
"last_replication_method": "FULL_TABLE",
"version": 1610041762219,
"initial_full_table_complete": true,
}
},
"currently_syncing": null,
},
}
When I run the full EL meltano elt tap-mongodb target-postgres
I get the following error.
meltano | Running extract & load...
meltano | No state was found, complete import.
tap-mongodb | INFO Connected to MongoDB host: localhost, version: 4.2.11
tap-mongodb | INFO Starting full table sync for junk-stuff
tap-mongodb | INFO Querying junk-stuff with:
tap-mongodb | Find Parameters: {'$lte': ObjectId('5ff64e7891e2891d8b6c5551')}
target-postgres | Traceback (most recent call last):
target-postgres | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/bin/target-postgres", line 10, in <module>
target-postgres | sys.exit(main())
target-postgres | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/__init__.py", line 188, in main
target-postgres | state = persist_lines(config, input)
target-postgres | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/__init__.py", line 150, in persist_lines
target-postgres | stream_to_sync[stream] = DbSync(config, o)
target-postgres | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/db_sync.py", line 121, in __init__
target-postgres | self.flatten_schema = flatten_schema(stream_schema_message['schema'])
target-postgres | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/db_sync.py", line 66, in flatten_schema
target-postgres | for k, v in d['properties'].items():
target-postgres | KeyError: 'properties'
tap-mongodb | INFO METRIC: {"type": "timer", "metric": "job_duration", "value": 0.10386991500854492, "tags": {"job_type": "sync_table", "database": "junk", "table": "stuff", "status": "failed"}}
tap-mongodb | CRITICAL [Errno 32] Broken pipe
tap-mongodb | Traceback (most recent call last):
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/bin/tap-mongodb", line 10, in <module>
tap-mongodb | sys.exit(main())
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 393, in main
tap-mongodb | raise exc
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 390, in main
tap-mongodb | main_impl()
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 386, in main_impl
tap-mongodb | do_sync(client, args.catalog.to_dict(), state)
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 347, in do_sync
tap-mongodb | sync_stream(client, stream, state)
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 328, in sync_stream
tap-mongodb | full_table.sync_collection(client, stream, state, stream_projection)
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/sync_strategies/full_table.py", line 126, in sync_collection
tap-mongodb | singer.write_message(record_message)
tap-mongodb | File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/singer/messages.py", line 227, in write_message
tap-mongodb | sys.stdout.flush()
tap-mongodb | BrokenPipeError: [Errno 32] Broken pipe
tap-mongodb | Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
tap-mongodb | BrokenPipeError: [Errno 32] Broken pipe
meltano | Extraction failed (120): BrokenPipeError: [Errno 32] Broken pipe
meltano | Loading failed (1): KeyError: 'properties'
meltano | ELT could not be completed: Tap and target failed
ELT could not be completed: Tap and target failed
meltano.yml
version: 1
send_anonymous_usage_stats: true
project_id: 02ab68b4-8b76-48e2-9c30-234e8aec9d30
plugins:
extractors:
- name: tap-mongodb
variant: singer-io
pip_url: tap-mongodb
metadata:
'*':
replication-method: FULL_TABLE
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
- name: target-postgres
variant: meltano
pip_url: git+<https://github.com/meltano/target-postgres.git>
ripe-musician-59933
01/07/2021, 6:08 PMtarget-postgres
requires the SCHEMA
message to define all of the record's properties so that it can create the appropriate tables, which tap-mongodb
isn't doing (for understandable reasons).
To work around this, you can explicitly specify the schema for each stream in meltano.yml
using https://meltano.com/docs/plugins.html#schema-extraschema:
stuff:
_id:
type: string
id:
type: integer
tasks:
type: array
items:
type: object
properties:
created_on:
type: string
format: date-time
value:
type: integer
updated_on:
type: string
format: date-time
meltano invoke tap-mongodb
, you should see a more complete SCHEMA
message.astonishing-alarm-71586
01/07/2021, 6:12 PMversion: 1
send_anonymous_usage_stats: true
project_id: 02ab68b4-8b76-48e2-9c30-234e8aec9d30
plugins:
extractors:
- name: tap-mongodb
variant: singer-io
pip_url: tap-mongodb
metadata:
'*':
replication-method: FULL_TABLE
schema:
stuff:
_id:
type: string
id:
type: integer
tasks:
type: array
items:
type: object
properties:
created_on:
type: string
format: date-time
value:
type: integer
updated_on:
type: string
format: date-time
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
- name: target-postgres
variant: meltano
pip_url: git+<https://github.com/meltano/target-postgres.git>
tap.properties.json
{
"streams": [
{
"table_name": "stuff",
"stream": "stuff",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"table-key-properties": [
"_id"
],
"database-name": "junk",
"row-count": 4995,
"is-view": false,
"valid-replication-keys": [
"_id"
],
"selected": true,
"replication-method": "FULL_TABLE"
}
}
],
"tap_stream_id": "junk-stuff",
"schema": {
"type": "object"
},
"selected": true,
"replication_method": "FULL_TABLE"
}
]
}
tap output
{
"type": "SCHEMA",
"stream": "stuff",
"schema": {
"type": "object"
},
"key_properties": [
"_id"
]
}
ripe-musician-59933
01/07/2021, 6:38 PMjunk-stuff
in meltano.yml
instead of stuff
— the identifier needs to match the stream's tap_stream_id
, not its stream
property!astonishing-alarm-71586
01/07/2021, 6:39 PMjunk-stuff
not junk_stuff
ripe-musician-59933
01/07/2021, 6:40 PMastonishing-alarm-71586
01/07/2021, 6:42 PMripe-musician-59933
01/07/2021, 7:01 PMOne think I've noticed there is that it actually creates what appears to be duplicate entries for each stream.Hmm, that sounds like a bug! Can you please file an issue with some more details/examples?
astonishing-alarm-71586
01/07/2021, 7:03 PMripe-musician-59933
01/07/2021, 8:51 PMschema
in meltano.yml
, it's a Meltano issueschema
to be specified explicitly: https://gitlab.com/meltano/meltano/-/issues/2517