https://meltano.com/ logo
#announcements
Title
# announcements
a

astonishing-alarm-71586

01/07/2021, 5:59 PM
I'm struggling a bit to get a mongodb -> postgres EL working and could really use some tips from the community. If I run just the tap-mongodb portion with
meltano invoke tap-mongodb > output/b.jsonl
I can see that the tap generates a schema of type object which makes sense.
Copy code
{
    "type": "STATE",
    "value": {
        "bookmarks": {"junk-stuff": {"last_replication_method": "FULL_TABLE"}},
        "currently_syncing": "junk-stuff",
    },
}
{
    "type": "SCHEMA",
    "stream": "stuff",
    "schema": {"type": "object"},
    "key_properties": ["_id"],
}
{
    "type": "STATE",
    "value": {
        "bookmarks": {
            "junk-stuff": {
                "last_replication_method": "FULL_TABLE",
                "version": 1610041761516,
            }
        },
        "currently_syncing": "junk-stuff",
    },
}
{"type": "ACTIVATE_VERSION", "stream": "stuff", "version": 1610041761516}
{
    "type": "RECORD",
    "stream": "stuff",
    "record": {
        "_id": "5ff64e1191e2891d8b6ba683",
        "id": 4711,
        "tasks": [
            {"created_on": "2021-01-06T16:56:01.058005", "value": 657},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 353},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 839},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 214},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 905},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 852},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 47},
            {"created_on": "2021-01-06T16:56:01.058005", "value": 858},
        ],
        "updated_on": "2021-01-06T16:57:39.025265",
    },
    "version": 1610041761516,
    "time_extracted": "2021-01-07T17:49:21.517186Z",
}
{"type": "ACTIVATE_VERSION", "stream": "stuff", "version": 1610041762219}
{
    "type": "STATE",
    "value": {
        "bookmarks": {
            "junk-stuff": {
                "last_replication_method": "FULL_TABLE",
                "version": 1610041762219,
                "initial_full_table_complete": true,
            }
        },
        "currently_syncing": null,
    },
}
When I run the full EL
meltano elt tap-mongodb target-postgres
I get the following error.
Copy code
meltano         | Running extract & load...
meltano         | No state was found, complete import.
tap-mongodb     | INFO Connected to MongoDB host: localhost, version: 4.2.11
tap-mongodb     | INFO Starting full table sync for junk-stuff
tap-mongodb     | INFO Querying junk-stuff with:
tap-mongodb     | 	Find Parameters: {'$lte': ObjectId('5ff64e7891e2891d8b6c5551')}
target-postgres | Traceback (most recent call last):
target-postgres |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/bin/target-postgres", line 10, in <module>
target-postgres |     sys.exit(main())
target-postgres |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/__init__.py", line 188, in main
target-postgres |     state = persist_lines(config, input)
target-postgres |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/__init__.py", line 150, in persist_lines
target-postgres |     stream_to_sync[stream] = DbSync(config, o)
target-postgres |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/db_sync.py", line 121, in __init__
target-postgres |     self.flatten_schema = flatten_schema(stream_schema_message['schema'])
target-postgres |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/loaders/target-postgres/venv/lib/python3.7/site-packages/target_postgres/db_sync.py", line 66, in flatten_schema
target-postgres |     for k, v in d['properties'].items():
target-postgres | KeyError: 'properties'
tap-mongodb     | INFO METRIC: {"type": "timer", "metric": "job_duration", "value": 0.10386991500854492, "tags": {"job_type": "sync_table", "database": "junk", "table": "stuff", "status": "failed"}}
tap-mongodb     | CRITICAL [Errno 32] Broken pipe
tap-mongodb     | Traceback (most recent call last):
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/bin/tap-mongodb", line 10, in <module>
tap-mongodb     |     sys.exit(main())
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 393, in main
tap-mongodb     |     raise exc
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 390, in main
tap-mongodb     |     main_impl()
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 386, in main_impl
tap-mongodb     |     do_sync(client, args.catalog.to_dict(), state)
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 347, in do_sync
tap-mongodb     |     sync_stream(client, stream, state)
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/__init__.py", line 328, in sync_stream
tap-mongodb     |     full_table.sync_collection(client, stream, state, stream_projection)
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/tap_mongodb/sync_strategies/full_table.py", line 126, in sync_collection
tap-mongodb     |     singer.write_message(record_message)
tap-mongodb     |   File "/home/nate/projects/wemlo-data-engineering/experiment/ELT-meltano/demo-project/.meltano/extractors/tap-mongodb/venv/lib/python3.7/site-packages/singer/messages.py", line 227, in write_message
tap-mongodb     |     sys.stdout.flush()
tap-mongodb     | BrokenPipeError: [Errno 32] Broken pipe
tap-mongodb     | Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
tap-mongodb     | BrokenPipeError: [Errno 32] Broken pipe
meltano         | Extraction failed (120): BrokenPipeError: [Errno 32] Broken pipe
meltano         | Loading failed (1): KeyError: 'properties'
meltano         | ELT could not be completed: Tap and target failed
ELT could not be completed: Tap and target failed
meltano.yml
Copy code
version: 1
send_anonymous_usage_stats: true
project_id: 02ab68b4-8b76-48e2-9c30-234e8aec9d30
plugins:
  extractors:
  - name: tap-mongodb
    variant: singer-io
    pip_url: tap-mongodb
    metadata:
      '*':
        replication-method: FULL_TABLE
  loaders:
  - name: target-jsonl
    variant: andyh1203
    pip_url: target-jsonl
  - name: target-postgres
    variant: meltano
    pip_url: git+<https://github.com/meltano/target-postgres.git>
r

ripe-musician-59933

01/07/2021, 6:08 PM
@astonishing-alarm-71586 The problem appears to be that
target-postgres
requires the
SCHEMA
message to define all of the record's properties so that it can create the appropriate tables, which
tap-mongodb
isn't doing (for understandable reasons). To work around this, you can explicitly specify the schema for each stream in
meltano.yml
using https://meltano.com/docs/plugins.html#schema-extra
In your case that would look something like:
Copy code
schema:
  stuff:
    _id:
      type: string
    id:
      type: integer
    tasks:
      type: array
      items:
        type: object
        properties:
          created_on:
            type: string
            format: date-time
          value:
            type: integer
    updated_on:
      type: string
      format: date-time
Then if you run
meltano invoke tap-mongodb
, you should see a more complete
SCHEMA
message.
You'll want to do this for every stream, of course.
a

astonishing-alarm-71586

01/07/2021, 6:12 PM
Thanks, I'll give it a try.
No luck. Did I get it in the right place? meltano.yml
Copy code
version: 1
send_anonymous_usage_stats: true
project_id: 02ab68b4-8b76-48e2-9c30-234e8aec9d30
plugins:
  extractors:
  - name: tap-mongodb
    variant: singer-io
    pip_url: tap-mongodb
    metadata:
      '*':
        replication-method: FULL_TABLE
    schema:
      stuff:
        _id:
          type: string
        id:
          type: integer
        tasks:
          type: array
          items:
            type: object
            properties:
              created_on:
                type: string
                format: date-time
              value:
                type: integer
        updated_on:
          type: string
          format: date-time
  loaders:
  - name: target-jsonl
    variant: andyh1203
    pip_url: target-jsonl
  - name: target-postgres
    variant: meltano
    pip_url: git+<https://github.com/meltano/target-postgres.git>
tap.properties.json
Copy code
{
  "streams": [
    {
      "table_name": "stuff",
      "stream": "stuff",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "table-key-properties": [
              "_id"
            ],
            "database-name": "junk",
            "row-count": 4995,
            "is-view": false,
            "valid-replication-keys": [
              "_id"
            ],
            "selected": true,
            "replication-method": "FULL_TABLE"
          }
        }
      ],
      "tap_stream_id": "junk-stuff",
      "schema": {
        "type": "object"
      },
      "selected": true,
      "replication_method": "FULL_TABLE"
    }
  ]
}
tap output
Copy code
{
  "type": "SCHEMA",
  "stream": "stuff",
  "schema": {
    "type": "object"
  },
  "key_properties": [
    "_id"
  ]
}
r

ripe-musician-59933

01/07/2021, 6:38 PM
@astonishing-alarm-71586 Ah, try using
junk-stuff
in
meltano.yml
instead of
stuff
— the identifier needs to match the stream's
tap_stream_id
, not its
stream
property!
(There's really no great reason why those are different in the first place, but so be it!)
a

astonishing-alarm-71586

01/07/2021, 6:39 PM
I tried that before and just now. Still no luck.
doooo! got it.
junk-stuff
not
junk_stuff
r

ripe-musician-59933

01/07/2021, 6:40 PM
😄 Does it work now?
a

astonishing-alarm-71586

01/07/2021, 6:42 PM
need to sort out my docker-compose file to get the right postgres database running.
👍 1
👍 thanks a bunch. I knew it had something to do with that I just couldn't see where to make the change. Now to work out a schema that works across 100+ collections. I already have all the DBT to unpack nested mess. I might do some refactoring to handle some of it here and some there. Cool, I can see where that puts it into the tap.properties.json file. One think I've noticed there is that it actually creates what appears to be duplicate entries for each stream. When the EL runs it generates to rows of data as a result. I can go in and delete the row out of the catalog, but something seem fishy there.
r

ripe-musician-59933

01/07/2021, 7:01 PM
One think I've noticed there is that it actually creates what appears to be duplicate entries for each stream.
Hmm, that sounds like a bug! Can you please file an issue with some more details/examples?
a

astonishing-alarm-71586

01/07/2021, 7:03 PM
I got it in the meltano repository where your other mongodb issues are.
r

ripe-musician-59933

01/07/2021, 8:51 PM
@astonishing-alarm-71586 Ah, does tap-mongodb always create two entries for each stream in the catalog and the actual sync output? Then it's an issue for https://github.com/singer-io/tap-mongodb. If it only does so when you set
schema
in
meltano.yml
, it's a Meltano issue
@astonishing-alarm-71586 I've also created a new issue about tap-mongodb requiring the
schema
to be specified explicitly: https://gitlab.com/meltano/meltano/-/issues/2517