Hey - new here. I’m unable to connect to MongoDB w...
# getting-started
m
Hey - new here. I’m unable to connect to MongoDB with Meltano using the existing tap
tap-mongodb
. I’m able to connect via a db connection string using the python mongo client, but parsing out that same string doesn’t seem to work when setting the variables in
meltano.yml
. Is there a way to directly use the db connection string URI without having to pass all of those parameters? If not, here is my
meltano.yml
file:
Copy code
version: 1
default_environment: staging
project_id: 90ea9b0c-a807-4f93-9f54-0b8cec9c519f
environments:
- name: dev
- name: staging
- name: prod
plugins:
  extractors:
  - name: tap-mongodb
    variant: transferwise
    pip_url: pipelinewise-tap-mongodb
    config:
      user: staging
      host: <host>
      auth_database: Staging
      database: admin
      srv: mongodb+srv
      port: 27017
      replica_set: atlas-ehcfwa-shard-0
      ssl: 'true'
      select:
      - '*.*'
  - name: tap-csv
    variant: meltanolabs
    pip_url: git+<https://github.com/MeltanoLabs/tap-csv.git>
    config:
      files:
      - entity: tmp_csv_example
        path: csvs/tmp_csv_example.csv
        keys: [col1, col2]
  loaders:
  - name: target-snowflake
    variant: transferwise
    pip_url: pipelinewise-target-snowflake
    config:
      account: account
      user: MELTANO
      warehouse: MY_WAREHOUSE
      dbname: MELTANO
      file_format: CSV_FORMAT
      role: MELTANO
      default_target_schema: TMP_SCHEMA
  - name: target-jsonl
    variant: andyh1203
    pip_url: target-jsonl
When running
meltano config tap-mongodb test
I get a run timeout error ```Plugin configuration is invalid Catalog discovery failed: command ['/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/bin/tap-mongodb', '--config', '/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/run/tap-mongodb/tap.fc8b4358-69cb-4558-b8c3-3f6986fbedea.config.json', '--discover'] returned 1 with stderr: time=2023-04-25 233205 name=tap_mongodb level=ERROR message=No replica set members available for replica set name "atlas-ehcfwa-shard-0", Timeout: 30s, Topology Description: <TopologyDescription id: 6448a927e78fa2444067000f, topology_type: ReplicaSetNoPrimary, servers: []> Traceback (most recent call last): File "/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/lib/python3.9/site-packages/tap_mongodb/__init__.py", line 322, in main main_impl() File "/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/lib/python3.9/site-packages/tap_mongodb/__init__.py", line 305, in main_impl client.server_info().get('version', 'unknown')) File "/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1994, in server_info return self.admin.command("buildinfo", File "/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/lib/python3.9/site-packages/pymongo/database.py", line 757, in command with self.__client._socket_for_reads( File "/opt/homebrew/Cellar/python@3.9/3.9.16/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 119, in enter return next(self.gen) File "/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1387, in _socket_for_reads server = self._select_server(read_preference, session) File "/Users/melgazar9/scripts/github/DG/data-science/projects/elt/meltano_projects/tap_dg_mongodb/.meltano/extractors/tap-mongodb/venv/lib/python3.9/site-packages/pymongo/mongo_cl…
m
I don’t know if this is the issue, but it looks like the
select
key is indented within the
config
options - it should be de-dented one unit so that it’s the same indentation as the
config
key.
p
I'm not familiar with connecting to mongodb specifically but one thing that might be relevant is https://meltano.slack.com/archives/C01UGBSJNG5/p1681482241550669. We were discussing making the SDK based z3z1ma variant the default tap-mongodb tap. I know Matt also has a variant as well that might be worth testing out
m
Hey @Matt Menzenski I tried moving the select key around and taking it out, but same error. I’m only running
meltano config tap-mongodb test
which should just test the connection to MongoDB. Not sure what else I can do here. The db connection string works as expected when running in python natively, but not with meltano. I tested my connection to
target-snowflake
and those credentials work as expected. It is something going on with tap-mongodb
m
FWIW, my version of tap-mongodb does support a regular connection string as a config setting https://github.com/menzenski/tap-mongodb maybe that will work for you?
m
Wow nice! I’ll check it out. My only concern with using it is how well maintained it will be
Not sure why the core branch doesn’t support db connection strings
m
yeah, that’s fair - it is very new
but we’re running it in production - our main database is MongoDB/DocumentDB
m
amazing - I’ll check it out for sure. thanks!
but if you do see any obvious things I’m doing wrong in my
meltano.yml
file that would be great as well!
One more question. I’m trying to add extractor linked to your git repo. Should I simple run
meltano add extractor tap-mongodb
but change the
meltano.yml
to look like this?
Copy code
extractors:
- name: tap-mongodb
  variant: transferwise
  pip_url: git+<https://github.com/menzenski/tap-mongodb.git@f51ecab>
  config:
    mongodb_connection_string: <long-string>
    database_includes:
      - database: test-db
Would this work or do I need to add extractor that hits your git repo in the cli?
m
Copy code
plugins:
  extractors:
    - name: tap-mongodb
      namespace: tap_mongodb
      # variant: menzenski
      pip_url: git+<https://github.com/menzenski/tap-mongodb.git@f51ecab72a5d7b2d4dae108356ec395c405dafc6>
      executable: tap-mongodb
      capabilities:
        - state
        - catalog
        - discover
        - about
        - stream-maps
      config:
        add_record_metadata: true
        allow_modify_change_streams: true
      settings:
        - name: mongodb_connection_string
          kind: password
        - name: documentdb_credential_json_string
          kind: password
        - name: documentdb_credential_json_extra_options
          kind: string
        - name: prefix
          kind: string
        - name: start_date
          kind: date_iso8601
        - name: database_includes
          kind: array
        - name: add_record_metadata
          kind: boolean
        - name: allow_modify_change_streams
          kind: boolean
        - name: operation_types
          kind: array
in my experience, for a tap like this (that’s not on the Hub, but has the same name as a tap on the Hub), it’s necessary to leave out the
variant
and include a
namespace
(which I believe can be anything)
but this works for me (and then I have other taps that inherit from this for individual mongodb databases)
m
FYI I was able to get it working with this config!
Copy code
- name: tap-mongodb
  variant: z3z1ma
  pip_url: git+<https://github.com/z3z1ma/tap-mongodb.git>
  config:
    mongo:
      host: '<DB Connection String>'
Do you know if there is a way to only select a few collections and filter those collections to some fields (and load them incrementally)? I went though the github code and I see a
database_includes
filter but not a
collection_includes
filter. I’m super new to meltano so not sure if this is a setting somewhere? Looking for something like this:
Copy code
config:
  mongo:
    host: <DB connection string>
  strategy: raw
  collections: ['collection1', 'collection2', 'collection3']
  query:
    collection1:
      load_type: full-refresh
      filter: {}
    collection2:
      load_type: incremental
      filter: {}
    collection3:
      load_type: incremental
      filter: {'_id': 1, 'winnerSeatNumber': 1, 'playerActions': 1, 'createdAt': 1, 'updatedAt': 1}
m
that’s one of the reasons I made my own tap 🙂 with mine you can specify
Copy code
config:
        mongodb_connection_string: <mongodb://admin:password@localhost:27017/>
        database_includes:
          - database: test-database
            collection: TestDocument
and it will load incrementally by default
m
nice! Can you also filter the fields?
m
I haven’t tested that, but in theory, yes, the
select
syntax should work
m
How is it handled when in mongo there are multiple nested jsons but target to structured DB like snowflake? It flattens every nested value?
m
typically that sort of flattening will happen in the target, not the tap. My tap outputs the entire mongodb document as one JSON object
I don’t have any experience with snowflake so can’t speak to that specifically
p
m
@pat_nadolny Nice! @Matt Menzenski I see. I’m still trying to get your repo working but I am unable to connect. Here’s my YAML
Copy code
- name: tap-mongodb
    namespace: tap_mongodb
    # variant: menzenski
    pip_url: git+<https://github.com/menzenski/tap-mongodb.git@f51ecab72a5d7b2d4dae108356ec395c405dafc6>
    executable: tap-mongodb
    capabilities:
      - state
      - catalog
      - discover
      - about
      - stream-maps
    config:
      add_record_metadata: true
      allow_modify_change_streams: true
      mongodb_connection_string: <db string>
      database_includes:
        - database: Staging
          collection: collection1, collection2
    settings:
      - name: mongodb_connection_string
        kind: password
      - name: documentdb_credential_json_string
        kind: password
      - name: documentdb_credential_json_extra_options
        kind: string
      - name: prefix
        kind: string
      - name: start_date
        kind: date_iso8601
      - name: database_includes
        kind: array
      - name: add_record_metadata
        kind: boolean
      - name: allow_modify_change_streams
        kind: boolean
      - name: operation_types
        kind: array
Copy code
meltano config tap-mongodb test
2023-04-26T18:31:34.852644Z [info     ] The default environment 'staging' will be ignored for `meltano config`. To configure a specific environment, please use the option `--environment=<environment name>`.
Need help fixing this problem? Visit <http://melta.no/> for troubleshooting steps, or to
join our friendly Slack community.

Plugin configuration is invalid
ValueError: Unrecognized replication method FULL_TABLE. Only INCREMENTAL and LOG_BASED replication methods are supported.
m
add this to your yaml?
metadata
here should be the same indenattion level as
config
and
settings
Copy code
metadata:
        '*':
          replication-key: _id
          replication-method: INCREMENTAL
I guess it must default to
FULL TABLE
u
Question to both of you 😅 - shouldnt you be able to use meltano's select criteria to filter for collections? I havent used the tap but I would assume that if you configure a database then run
meltano select tap-mongodb --list
you'd see all available collections as individual streams and then you could chose the ones you want?
m
@Matt Menzenski Amazing!
m
I’ve thought a little about this. I think it should be possible to do that, yes. I had wanted a single tap to pull from all databases in our mongodb cluster but after some use of the tap I’m not sure it’s the best approach, partly for that reason.
It’s something I’d like to experiment with
m
Works! However when I delete what is in
outputs
and run
meltano run tap-mongodb target-jsonl --full-refresh
it says it’s ignoring the state, but no data gets populated in the
outputs
directory. Is this expected?
m
no, that’s not expected
to confirm,
output
or
outputs
? singular or plural? I would expect
output
(singular) to receive that output
m
typo,
output
maybe I’ll have to manually delete the state?
if i want to do a full-refresh
I’m looking through tap.py on your repo. Can I specify multiple collections?
something like collections_include: [collection1, collection2], select: -collection1: col1, col2
m
yes:
Copy code
database_includes:
          - database: test-database
            collection: TestDocument
          - database: test-database
            collection: OtherCollection
          - database: other-database
            collection: ThirdCollection
m
awesome!
Thanks very much for your help. I think I’m all set!
Ah sorry one more question 😂 How do you configure the different environments: dev / staging / prod? ```version: 1 default_environment: staging project_id: 90ea9b0c-a807-4f93-9f54-0b8cec9c519f environments: - name: dev - name: staging config: plugins: extractors: - name: tap-csv variant: meltanolabs pip_url: git+https://github.com/MeltanoLabs/tap-csv.git config: files: - entity: ban_exemptions path: csvs/ice_ban_exemptions.csv keys: [ date_exempted, wallet ] - name: tap-mongodb namespace: tap_mongodb # variant: menzenski pip_url: git+https://github.com/menzenski/tap-mongodb.git@32f75c0 executable: tap-mongodb capabilities: - state - catalog - discover - about - stream-maps config: add_record_metadata: true allow_modify_change_streams: true mongodb_connection_string: <staging db connection string> database_includes: - database: Staging collection: collection1 select: - _id - address - type metadata: '*': replication-key: _id replication-method: INCREMENTAL settings: - name: mongodb_connection_string kind: password - name: documentdb_credential_json_string kind: password - name: documentdb_credential_json_extra_options kind: string - name: prefix kind: string - name: start_date kind: date_iso8601 - name: database_includes kind: array - name: add_record_metadata kind: boolean - name: allow_modify_change_streams kind: boolean - name: operation_types kind: array loaders: - name: target-snowflake variant: transferwise pip_url: pipelinewise-target-snowflake config: account: account user: MELTANO warehouse: SEGMENT_WAREHOUSE dbname: MELTANO file_format: CSV_FORMAT role: MELTANO default_target_schema: MONGODB_PROD - name: target-jsonl variant: andyh1203 pip_url: target-jsonl - name: prod config: plugins: extractors: - name: tap-mongodb namespace: tap_mongodb # variant: menzenski pip_url: git+https://github.com/menzenski/tap-mongodb.git@32f75c0 executable: tap-mongodb capabilities: - state - catalog - discover - about - stream-maps config: add_record_metadata: true allow_modify_change_streams: true mongodb_connection_string: <prod db connection string> database_includes: - database: Production metadata: '*': replication-key: _id replication-method: INCREMENTAL settings: - name: mongodb_connection_string kind: password - name: documentdb_credential_json_string kind: password - name: documentdb_credential_json_extra_options kind: string - name: prefix kind: string - name: start_date kind: date_iso8601 - name: database_includes kind: array - name: add_record_metadata kind: boolean - name: allow_modify_change_streams kind: boolean - name: operation_types kind: array loader…
m
personally, I lean on environment variables for this. You can use the
env
setting in an environment to set a
TAP_MONGODB_MONGODB_CONNECTION_STRING
environment variable per meltano environment, for example: https://docs.meltano.com/guide/configuration#environment-variables
m
I have different db connection strings for dev / staging / prod, and I want to tap mongoDB into snowflake schema
MONGODB_DEV
,
MONGODB_STAGING
,
MONGODB_PROD
. Can I set those extractors and loaders in the yml file within your fork?
m
in the scenario you describe, I would define a
tap-mongodb
and a
target-snowflake
in the top level of you meltano.yml file (not within the
environments
). Then I would add overrides of those values per environment, like in https://docs.meltano.com/guide/configuration#environment-variables:
Copy code
plugins:
  extractors:
    - name: tap-mongodb
      # config here
  loaders:
    - name: target-snowflake
      # config here
environments:
  - name: dev
    config:
      plugins:
        extractors:
          - name: tap-mongodb
            env:
              TAP_MONGODB_MONGODB_CONNECTION_STRING: "dev-connection-string"
        loaders:
          - name: target-snowflake
            env:
              TARGET_SNOWFLAKE_DEFAULT_TARGET_SCHEMA: "dev-schema"
and repeat for staging, prod environments etc
m
oh boy this is a big config 😅 I’m almost there! Can you take a look at this? I don’t think I’m doing this right
m
I believe you will need the
Copy code
metadata:
              '*':
                replication-key: _id
                replication-method: INCREMENTAL
on your tap-mongodb under the top
plugins:
block
and for the
select
part:
Copy code
select:
                    - _id
                    - address
                    - type
again, I haven’t tested
select
behavior myself but I would expect these to look more like
- collection1._id
or possibly even
- Staging_collection1._id
m
Wow this is really amazing. Saves a ton of time once you get through the yml file. It works! Huge config LOL
do you put the db connection strings in your
.env
? I don’t see where it’s referenced in the yml file
m
I have it in the
.env
file for local testing, yes - when running meltano non-locally we set them in the runtime environment as environment variables
m
How do you set it in the
.env
so it knows when you run
meltano --environment=dev run tap-csv target-jsonl
it reads the dev db connection string?
or do you leave it in the yml file?
m
it’ll read whatever is in the .env file
I typically only have the “dev” environment (our lowest environment) variables in the .env file for testing
but sometimes if needed I’ll comment a value out and add the staging one in the .env file if I need to run in
staging
environment, etc
m
ah i see, i was thinking i can run all dev, staging, testing, and prod once every 12 hours using one
.env
and one
meltano.yml
file. Is this reasonable?
automatically
u
If its helpful you can use templating to build your connection string - heres an example of how I access a shared env var as a config value https://github.com/meltano/squared/blob/37f33c44ba1bc6613108efd575324f09c1c3e9e3/data/extract/extractors.meltano.yml#L7
m
it’s possible that you could use https://docs.meltano.com/guide/plugin-management#plugin-inheritance to define tap-mongodb-dev, tap-mongodb-staging, etc, and then set
TAP_MONGODB_DEV_MONGODB_CONNECTION_STRING
(for dev) and
TAP_MONGODB_STAGING_MONGODB_CONNECTION_STRING
(for staging) environment variables in the .env file. I think that would work, based on my experience. But personally I’d recommend keeping your environments well separated.
u
or this where I add my username as a prefix to the snowflake target schema when I'm in dev https://github.com/meltano/squared/blob/37f33c44ba1bc6613108efd575324f09c1c3e9e3/data/environments/userdev.meltano.yml#L50
m
Interesting, ok this is very helpful. Thanks again!
Hey @Matt Menzenski if I want to run the ELT on all collections how would you do that in meltano.yml
Copy code
config:
              mongodb_connection_string: ${TESTING_MONGODB_CONNECTION_STRING}
              database_includes:
                - database: Testing
                  collection: *
^ something like that
m
the tap doesn’t support that behavior currently
happy to accept pull requests though 🙂
m
cool! Let me take a look
Any suggestions on how to learn how to develop a tap and contribute to your repo? I was going through the SDK documentation but I can’t figure out how to run the tap in debug mode with break points.
@Matt Menzenski I want to add something like this but I can’t figure out how to test / debug it. I know this below won’t work because it needs to loop through each database name and collection, but you get the idea.
Copy code
if self.config.get("database_includes", []) != '*.*':
    collections_to_tap = self.config.get("database_includes", [])
else:
    collections_to_tap = client.get_collections()
for included in collections_to_tap:
    db_name = included["database"]
    collection = included["collection"]
Location in tap.py: https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/tap.py#L227
u
m
Yea I was reading that, but when I run
tap.py
debug mode it breaks at this point
Copy code
if raise_errors:
    raise ConfigValidationError(summary)
`'Config validation failed: \'database_includes\' is a required property`…etc
u
ah yeah you need to point it to a real config.json file. Usually in tap repos theres a
.secrets/
directory thats gitignored where people store their config.json containing real credentials
u
so you'd update the debug launch config to look something like:
Copy code
"args": ["--config", ".secrets/config.json", "--discover"],
m
Here’s what I’m doing, I’m pretty sure I misunderstood the docs. • git pull repo • add this to
.secrets/config.json
{'mongodb_connection_string': <string>}
• Go in my IDE and debug
tap.py
u
That sounds correct but the tap looks like it requires
database_includes
so you need that in your config
m
I have that in
meltano.yml
, are you saying that should be in config.json?
u
Yeah youre calling the tap directly when debugging vs calling it via meltano. You can run
meltano config tap-x
to print your config contents. Also check out https://hub.meltano.com/singer/spec#taps to see more about Singer
m
yeah, you should be able to do
meltano config tap-mongodb > config.json
to create that JSON file IIRC
fwiw I’m working on a refactor of this tap using the “select” logic @pat_nadolny brought up earlier in this thread: https://meltano.slack.com/archives/CMN8HELB0/p1682534466101709?thread_ts=1682484019.956399&amp;cid=CMN8HELB0 I think I’m nearly done with it. I’ll open a PR and share the link here possibly as early as today, more likely early next week.
m
Each time I open pycharm and try to debug
tap.py
I get stopped at the block
raise ConfigValidationError(summary)
because it’s missing config params. I think it’s pretty straight forward to add a rule like
if config value = '*.*' then grab all collections in database
but I am having a hard time getting into debug mode 😂. Sorry I’ve only been reading about meltano for about 3 days now. Here are my steps •
meltano config tap-mongodb > config.json
and added my credentials • run
tap.py
in debug mode • get stopped at
raise ConfigValidationError(summary)
• Same steps as above but copied
config.json
to
.secrets/
p
That debug docs section is for VS code specifically. I used to use py charm also and I sort of remember it having a different mechanism for passing in input parameters 🤔
m
Figured it out 🙂 @Matt Menzenski I submitted a PR. I tested my code changes locally and everything works as expected. However, since I’m still relatively new to Meltano (only been using it for about 3 days), please let me know if you spot any issues or have any feedback. I want to make sure I’m following best practices and not missing anything important.
Hey @Matt Menzenski have you had a chance to review my PR
hoping to deploy a version like that this week, so i can tap all DBs in mongo
m
@matt_elgazar I’ve merged https://github.com/menzenski/tap-mongodb/pull/2 which brings this tap into more standard meltano behavior around selecting. It does limit the tap to one database but allows selecting all collections in that database with standard malton select syntax. Can you retest with that change?
m
Would this PR allow you to select all collections within a database in the meltano.yml file? For example: • In db Testing: select collection A, B, and C • in db Prod: select all (in my version i set it to ‘*.*’
m
yes
let me get an example together
if those are separate database names in the same meltano environment:
Copy code
plugins:
  extractors:
    - name: tap-mongodb-testing
      inherit_from: tap-mongodb
      config:
        database: Testing
      select:
        - A.*
        - B.*
        - C.*
    - name: tap-mongodb-prod
      inherit_from: tap-mongodb
      config:
        database: Prod
      select:
        - '*.*'     # this is the default behavior so shouldn't need to be specified explicitly like this
m
Killin! I’ll have to look at your code and see what you did. In my example I just wrote a for loop
Copy code
if collection == '*.*':
    < tap all collections >
m
I copied a lot of the implementation logic from the meltanolabs tap-postgres plugin
m
and in database: Testing, I’m assuming you can also replace
A.*
with
A.field1, A.field2
?
m
as far as discovering streams (= collections, here) dynamically
and in database: Testing, I’m assuming you can also replace
A.*
with
A.field1, A.field2
?
There’s a caveat for this - see the Settings header on https://hub.meltano.com/extractors/tap-mongodb--menzenski/ where I’ve added some notes on this.
Individual database collections may be selected using standard Meltano catalog selection. Note, though, that the field values which may be selected are not the fields on the database document, but rather the fields on the schema used by this tap. That is, while it is possible for example to opt out of the
ns
field:
```select:
- '!*.ns````
the
document
field will always contain the entirety of the database document. This is true for log-based replication as well, as the change stream in that case is opened with the option
full_document="updateLookup"
. If you would prefer different behavior, please open an issue with the tap.
so you can use the field selection syntax, but the entire MongoDB document will always be present in the
document
field currently. The field selection syntax will control what other fields in the record schema for this tap (documented here) are present.
I’m not sure the best way to handle field selection on the document
m
Ok I see. For my use there’s no need to add a field filter per each document, so I’ll take a look at this and test it out later tonight
I’m confused how Meltano knows what
*.*
means 😅 I looked through your PR and the only place I see it referencing
*.*
is in the config. I know generally,
*
means everything, so
*.*
means select everything with any file type. Does it reference this via
glob
or something similar in
tap.py
?
@Matt Menzenski I was unable to run it because of line 43 in
tap_mongodb/connector.py
here https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/connector.py#L43 Error
Copy code
TypeError: unsupported operand type(s) for |: 'type' and 'NoneType'
I got it to run by changing
prefix: str | None = None
to
prefix=None
but it does not only run one collection. Here’s my
meltano.yml
file
Copy code
version: 1
send_anonymous_usage_stats: true
project_id: tap-mongodb

default_environment: dev

plugins:
  extractors:
  - name: tap-mongodb
    namespace: tap_mongodb
    pip_url: git+<https://github.com/menzenski/tap-mongodb.git@74c80ab38db6a607b6d121ea9c720bbe1a93241c>
    capabilities:
    - state
    - catalog
    - discover
    - about
    - stream-maps

    config:
      add_record_metadata: true
      allow_modify_change_streams: true
      metadata:
        '*':
          replication-key: _id
          replication-method: INCREMENTAL

  loaders:
    - name: target-jsonl
      variant: andyh1203
      pip_url: target-jsonl
      
    - name: target-snowflake
      variant: transferwise
      pip_url: pipelinewise-target-snowflake
      config:
        account: ${SNOWFLAKE_ACCOUNT}
        user: ${SNOWFLAKE_USER}
        warehouse: ${SNOWFLAKE_WAREHOUSE}
        dbname: ${SNOWFLAKE_DB}
        file_format: CSV_FORMAT
        role: ${SNOWFLAKE_ROLE}
        default_target_schema: MONGODB_DEV

environments:
  - name: dev
    config:
      plugins:
        extractors:
        - name: tap-csv
          files:
          - entity: ban_exemptions
            path: csvs/ice_ban_exemptions.csv
            keys: [date_exempted, wallet]
        - name: tap-mongodb
          config:
            mongodb_connection_string: ${DG_DEV_MONGODB_CONNECTION_STRING}
            database: DG_Dev
            select:
              - tournamentnftinfos.*
          loaders:
            - name: target-snowflake
              env:
                TARGET_SNOWFLAKE_DEFAULT_TARGET_SCHEMA: MONGODB_DEV

  - name: testing
    config:
      plugins:
        extractors:
          - name: tap-mongodb
            config:
              mongodb_connection_string: ${DG_TESTING_MONGODB_CONNECTION_STRING}
              database: DG_Testing
              select:
                - bannedusers.*
        loaders:
          - name: target-snowflake
            env:
              TARGET_SNOWFLAKE_DEFAULT_TARGET_SCHEMA: MONGODB_TESTING
👀 1
m
What version of Python are you using? The
prefix: str | None = None,
syntax I think might be Python 3.10+ only. Changing that to
Optional[str]
(with
from typing import Optional
) should work if you’re on an older version of Python. (Feel free to log an issue, I should change this)
the
select
key should be a sibling of
config
, not nested beneath it. The form is
entity.field
, where “entity” for this tap is equal to the name of the collection in lower case (plus the prefix if you are using it, which you don’t appear to be). so fi you have a collection
bannedUsers
in the
DG_Testing
database, you could specify
Copy code
select:
 - bannedusers.*
to include that collection with all possible fields.
m
Thanks! Would love to see the update to accommodate older versions of py. I edited my yml file and it’s still selecting all collections.
Hey @Matt Menzenski do you think you can change that one line to support python 3.9? Also do you know why
meltano run tap-mongodb target-jsonl
is hitting all collections given my yml above?
m
This is the specific commit that fixes the syntax for earlier python versions https://github.com/menzenski/tap-mongodb/pull/3/commits/18d902d200274753b3744901635b0b2c42bae416
I’ll merge that PR in a minute, once the checks pass
@matt_elgazar that PR’s been merged - can you be sure you’re on the latest version of the tap and retry?
m
Getting this error:
ModuleNotFoundError: No module named 'backports'
Here are my steps: • Go in meltano and change
pip_url: git+<https://github.com/menzenski/tap-mongodb.git@80341d4a7f6d9ba3f7224e14f7492f17bd6f212f>
• in terminal run
meltano add extractor tap-mongodb
• run
meltano run tap-mongodb target-jsonl
@Matt Menzenski happens at
from backports.cached_property import cached_property
I’m on python 3.9.15
m
dangit
I’m getting that too 😬
fix incoming
m
Great fixed that problem! Only thing is when I run it it still taps ALL collections
m
can you share the relevant part of your meltano.yml again?
m
Copy code
environments:
  - name: dev
    config:
      plugins:
        extractors:
          - name: tap-mongodb
            config:
              mongodb_connection_string: ${DG_DEV_MONGODB_CONNECTION_STRING}
              database: DG_Dev
              select:
                - tournamentnftinfos.*

        loaders:
          - name: target-snowflake
            env:
              TARGET_SNOWFLAKE_DEFAULT_TARGET_SCHEMA: MONGODB_DEV
m
Copy code
config:
              mongodb_connection_string: ${DG_DEV_MONGODB_CONNECTION_STRING}
              database: DG_Dev
              select:
                - tournamentnftinfos.*
should be
Copy code
config:
              mongodb_connection_string: ${DG_DEV_MONGODB_CONNECTION_STRING}
              database: DG_Dev
            select:
              - tournamentnftinfos.*
m
Ahh ok I see, fidxed it!
thanks !
Hey @Matt Menzenski one more request 🙂 I am getting an error running this tap in production because I was not granted access to ALL collections in production mongo DB database. The error occurs in this line https://github.com/menzenski/tap-mongodb/blob/main/tap_mongodb/connector.py#L98 My request is can we change this line from •
for collection in self.database.list_collection_names():
• to
for collection in <get collections from select in yml file>:
m
Can you share the error message that you got? I thought I was handling the possible permission error via the try/catch that follows that line, but I didn’t know that the
list_collection_names()
would error if you didn’t have permission to all collections.
I will take a look at this
m
Here’s the error
^ it’s much longer but i’ve narrowed it down to that because when I use the mongo client natively I also get error from
list_collection_names
So if select not defined or select == * (or .) then keep logic the same, else only select collections in yml select
Man i’ve never had so much trouble entering debug mode
It works when i run
poetry run tap-mongodb --config .secrets/config.json
u
Did you see this article? https://www.arecadata.com/meltano-development-how-to-debug-singer-taps-in-pycharm/ Looks like they describe how to do it with pycharm
m
WOW!!!!! That was fast LOL
No i did not see that article but that did Lol!! thank you!!
@Matt Menzenski I think if fixed it. Will open a PR shortly
@Matt Menzenski @pat_nadolny Any reasons you guys think what I did here isn’t working properly? Also tried with
--full-refresh
Here’s the link. https://github.com/melgazar9/tap-mongodb/commit/03c8e978a2f9c286594454e95e6cc5b7abbc04e8
When i run
meltano run tap-mongodb target-jsonl
i get the everything ran successfully but no data is populated in
output/
m
collections = self.config.get(‘select’)
select is not within the config object, it’s a sibling of config. In theory, that line of code isn’t hit if you have the
select
key defined, because it should be using that provided catalog https://github.com/melgazar9/tap-mongodb/commit/03c8e978a2f9c286594454e95e6cc5b7abbc04e8#diff-a3089305435827b54212373[…]3c254329e112cdd8f4b68686L191
you might add a logger line there to see what the
input_catalog
is being set to
if you specify collections under
select
they should be present in that input_catalog object
m
well, when I ran it shows that it’s selecting those. Check this out: Here’s meltano.yml and here’s the output in the screenshot
Copy code
extractors:
      - name: tap-mongodb
        config:
          mongodb_connection_string: ${MONGODB_CONNECTION_STRING}
          database: Testing
          select:
            - clientConfigs.*
            - bannedusers.*
m
Copy code
config:
          mongodb_connection_string: ${MONGODB_CONNECTION_STRING}
          database: Testing
          select:
            - clientConfigs.*
            - bannedusers.*
should be
Copy code
config:
          mongodb_connection_string: ${MONGODB_CONNECTION_STRING}
          database: Testing
        select:
          - clientConfigs.*
          - bannedusers.*
m
oh really? When I wanted to use debug mode
meltano config tap-mongodb > .secrets/config.json
no
select
appeared in the config using this way. It’s going to select everything then
m
it shouldn’t select everything
the
select
is not part of the
config
- it’ll be passed by Meltano to the tap as the
--catalog
argument
m
well it
Skipping
it
but the problem is i don’t have access to run
list_collection_names()
m
right, I agree that that’s a bug in the tap
I wonder if
clientConfigs
should be
clientconfigs
(lower case) too
m
oh right that is true too
not sure why it’s designed where
select
is not under the
config
portion?
m
I went down this exact same rabbit hole a few weeks ago when I started working on this tap - it wasn’t clear to me either. This is from the Singer spec (which Meltano is built around): https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md in particular,
tap --config CONFIG [--state STATE] [--catalog CATALOG]
is the Singer interface for calling a tap. The
select
settings that you define in meltano.yml are parsed by Meltano and passed to the tap as that
--catalog
parameter (not as
--config
)
m
dangit
It’s literally impossible to convince devops to grant read access to all mongodb tables lol!!
Is there a way you can think of to get this in there the right way? 🙂
Copy code
collections = self.config.get('select')
if collections:
    result["streams"].extend(self.connector.discover_catalog_entries(collections=collections))
else:
    result["streams"].extend(self.connector.discover_catalog_entries())
m
when you pass the
select
options to meltano, it should pass them to the tap-mongodb tap as that
self.input_catalog
property. In theory, if that input_catalog property exists, the tap will use that and not execute that
discover_catalog_entries
at all
you might log out the value of
self.input_catalog
just before the return in
return self.<http://input_catalog.to|input_catalog.to>_dict()
on line 192 here https://github.com/melgazar9/tap-mongodb/commit/03c8e978a2f9c286594454e95e6cc5b7abbc04e8#diff-a3089305435827b54212373[…]3c254329e112cdd8f4b68686L191
if you have the
select
key on the same level as
config
(as a sibling of it, not as a child), does that
self.input_catalog
show those collections ?
m
Just ran it - yep it shows ALL of the collections
hmmm i’m in debug mode now.. I don’t see where it’s getting the ‘select’ criteria. It’s gotta be in here somewhere
m
@matt_elgazar I need to step away from the computer for a bit, but I’ll try to get a fix out tonight. I believe that if you change
Copy code
for collection in self.database.list_collection_names():
to
Copy code
for collection in self.database.list_collection_names(authorizedCollections=True, nameOnly=True):
that this should resolve your permissions error.
Users without the required privilege can run the command with both
authorizedCollections
and
nameOnly
options set to
true
. In this case, the command returns just the name and type of the collection(s) to which the user has privileges.
per the docs: • https://www.mongodb.com/docs/manual/reference/command/listCollections/#required-accesshttps://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.database.Database.list_collection_nameshttps://www.mongodb.com/docs/manual/reference/command/listCollections/#command-fields
m
Yep that did solve the permission error. In a perfect world I would still like it to not hit all collections though 🙂 - but if you can pus that it’d be great!
@Matt Menzenski I just did it, created a PR if you want to just merge it here https://github.com/menzenski/tap-mongodb/pull/9/files
m
merged, thank you!
m
Hey @Matt Menzenski I just ran 2 tables on production and noticed the
_id
objectId column in mongo is not set properly. Take a look at the screen shot. I thought this was the section in
meltano.yml
that accounts for it? Is there
Copy code
config:
      add_record_metadata: true
      allow_modify_change_streams: true
      metadata:
        '*':
          replication-key: _id
          replication-method: INCREMENTAL
Here is the prod section of meltano.yml
Copy code
- name: production
    config:
      plugins:
        extractors:
          - name: tap-mongodb
            config:
              mongodb_connection_string: ${PRODUCTION_MONGODB_CONNECTION_STRING}
              database: Production
            select:
              - bannedusers.*
              - arcadehandanalyticsdata.*
        loaders:
          - name: target-snowflake
            env:
              TARGET_SNOWFLAKE_DEFAULT_TARGET_SCHEMA: MONGODB_PRODUCTION
Maybe there’s something I’m not setting properly in the config?
m
this is something I’ve been fighting with. If the
_id
column is set to the string value of the document’s object ID (that is, to
str(document["_id"])
, the tap doesn’t work well - that hex string is not alphanumerically sortable, and that means that the tap cannot resume from a saved checkpoint. It must start over from the beginning each time it’s run. My first attempt at a workaround (which you’re seeing there) involved using an ISO-8601 datetime string as the _id field, and setting that to the timestamp value of the document’s object ID (that is, to
document["_id"].generation_time.isoformat()
). That works, in that it is sortable (so the tap can now resume from a saved checkpoint if it errors out, for instance), but the timestamp component of the ObjectID is only granular to whole seconds. This is unlikely to cause an issue, but I was concerned about the potential for an individual record to be missed, if multiple records were added during the same second and the tap had an error when processing those records. I pushed a change last night that sets the
_id
field (which probably should have been named
replication_key
or something instead, in hindsight) to a string with the format
2021-09-22T01:02:48+00:00|614a80b81ad8c60001b7d5f3
- this is the timestamp, then a pipe
|
delimiter, then the hex object ID. This should account for everything from the perspective of the tap’s replication key - it is both alphanumerically sortable and it uniquely identifies a record. Last night I also updated the schema to add an explicit
object_id
field in the tap’s output. This column will always contain the hex ObjectID string, like
614a80b81ad8c60001b7d5f3
.
Also, I incremented the tap’s version to 1.0.0 with that change last night. I am intending that the interface/behavior will be more stable now.
So, my suggestion is to update to the latest version of the tap, and then the
object_id
column will have the value you expect
m
Ok great to know. Interesting. For my use case, seconds are not granulary enough and I will definitely be missing records. I can research to see if there is a way to convert the
_id
object in mongodb to a time sensitive format in python
Is your most recent branch merged? What’s the most recent commit?
m
most recent commit is 6656fe34e3a8ebf2d7b8e26552f0ca9d591444b3
m
after the change I made last night, there shouldn’t be a concern about any possibility of missed records
m
Oh i see, misread it! Testing it now
Getting an error without full-refresh mode, but no error when running in full-refresh mode
m
is there an error message in
.meltano/logs
?
m
I don’t see an error, but here are my steps - may be useful. 1. deleted schema
mongodb_staging
in snowflake 2. ran
meltano --environment=staging run tap-mongodb target-snowflake
- errored out 3. ran
meltano --environment=staging run tap-mongodb target-snowflake --full-refresh
- no error 4. ran
meltano --environment=staging run tap-mongodb target-snowflake
- no error I do see in snowflake as the
_id
column: `2023-03-13T201934+00:00|640f855606ce35ab100be533`so it pipes the timestamp and the object ID
so it looks like on new tables that first step will error out if it’s not ran in full-refresh mode
raise ValueError("Invalid IncrementalId string")
m
ok dang, thank you for the info. I will look
before you updated to the latest version of the tap, which version (commit) were you using?
was the value of the
_id
column set to the hex string like
640f855606ce35ab100be533so
the last time you ran the tap?
if so, that would explain this behavior
m
I’m using this commit
6656fe34e3a8ebf2d7b8e26552f0ca9d591444b3
Ah yea the last time i ran the tap I was on a previous commit. That could explain it
let me try again by dropping the schema and re-running
m
if you had the _id field set to a string like 640f855606ce35ab100be533so coming out of the tap, that is something I should be able to push a change to handle
m
just reran
meltano --environment=staging run tap-mongodb target-snowflake
and it worked after dropping the schema. I think the issue was the previous run was from a prior commit.
@Matt Menzenski Do you think you can also implement getting up to microseconds on the timestam? I know it’s a small chance of a second + _id having different records
m
I don’t believe it’s possible to get greater granularity out of the ObjectID’s generation time - that timestamp is just “seconds since epoch”. That said, the hex string form of the ObjectId (640f855606ce35ab100be533so) is going to be globally unique/specific to a single document. It includes that epoch seconds timestamp value as well as a random value component and an incrementing counter component.
m
Ahh ok good to know. Running now on a pretty big production table. If there’s no missing rows I think we’ll be good to go!
m
👍🏼
I’ve had my big production table load running since friday, lol. About 50% done now and so far so good 🙏🏼
m
All looks pretty good to me so far 👍
hey @Matt Menzenski saw you made some recent pushes. If I repull to the most up to date commit, it necessary I re-run all models with full-refresh?
m
@matt_elgazar what commit are you on currently?
m
6656fe34e3a8ebf2d7b8e26552f0ca9d591444b3
m
these are the changes since that version https://github.com/menzenski/tap-mongodb/compare/6656fe34e3a8ebf2d7b8e26552f0ca9d591444b3..08585f2109ed1b0bf1ff40c16d365e4218190c46 the
_id
column in the output has been renamed to
replication_key
likewise,
operationType
->
operation_type
clusterTime
->
cluster_time
ns
->
namespace
(and child fields
db
->
database
,
coll
->
collection
)
m
@Matt Menzenski just seeing this.. ok sounds good. I will probably upgrade sometime next week. Not sure if anyone else has had this problem but one thing VERY strange I am noticing is when i call
meltano run
sometimes it updates the table but sometimes no rows are sent to the target. Is this just me?
i have to call it 2-3 times with full-refresh and then it will populate snowflake
Matt do you plan on making a lot more changes or is it safe to put the most recent branch in production and not touch it for a long time?
m
I bumped the version to 1.0.0 recently - I am considering the package to be “stable” at this point
I am planning to transfer the repo to the MeltanoLabs GitHub organization soon though, for what that’s worth.
m
nice - no it’s working well. Have you ever experienced that issue with meltano not populating the target, having to run it 2-3 times?
hey @Matt Menzenski I’m getting an error on the newest commit.
KeyError: '_id'
m
in your metadata do you have
replication-key: replication_key
?
m
ah yea that fixed the problem, but now i got this
m
that looks to me like a database connection issue 😕
m
hmm ok, i’ll look into it. Thanks! i’ll be using this commit from now on when i get this sorted out
@Matt Menzenski I keep getting this event loop is closed error. Do you think this is on the target-snowflake side or mongodb side?
it’s happening on bigger tables
m
the error is saying “Loader failed” so I assume it’s on the Snowflake side
FWIW I’ve run this tap for days continuously without issue
(I’ve only tried redshift and postgres targets, though, i’m not familiar with snowflake)
m
ah yea i see that, ok makes sense
a
Thank you for informative thread!
👍 1