Hey, everyone. I’m trying to move some data from M...
# getting-started
m
Hey, everyone. I’m trying to move some data from MongoDB to Snowflake. We already have a lot of dbt transformations running on Snoflake, so I’m just re-implementing our EL part of the ELT. I’ve decided to use the MongoDB Tap by MeltanoLabs and it successfully connected to my MongoDB database. I’m not sure, though, how am I supposed to: 1. Define which collections (and ideally which of their fields) to get moved to Snowflake (I figured I can use
select
in the
meltano.yml
, but not sure if that’s the right way to do it) 2. Define what strategy to use to sync the data - I’d like to generally use LOG_BASED, but need to do a complete sync first. It sees this particular tap doesn’t support FULL_SYNC. I also couldn’t get
LOG_BASED
to work (I set it up with
metadata
section in the
meltano.yml
file, where I set
*
under which
replication-key: replication_key
and
replication-method: LOG_BASED
) - I get
resume token string was not a valid hex string
which I assume is because I’ve never done a full sync, but when I try to do it, I get an error this tap doesn’t work with full sync. My use case sounds rather standard - I just need to move data from MongoDB (limited set of collections) => Snowflake tracking changes, but without applying any transformations nor parsing. I need initial full import, and log-based import after that, but unfortunately I couldn’t get it to work…
Some details: My
meltano.yml
(I’ve removed the snowflake part to test):
Copy code
version: 1
plugins:
  extractors:
    - name: tap-mongodb
      variant: meltanolabs
      pip_url: git+<https://github.com/MeltanoLabs/tap-mongodb.git@main>
      metadata:
        '*':
          replication-key: replication_key
          replication-method: LOG_BASED
      select:
        - 'organizations'

project_id: 78647a00-da00-49f1-a8b6-481cd9769235
default_environment: production
environments:
  - name: production
My `Dockerfile`:
Copy code
# Use official Python image
FROM meltano/meltano:latest-python3.11

RUN apt-get update && apt-get install -y libssl-dev

# Set working directory
WORKDIR /app

# Copy Meltano project
COPY meltano.yml ./
# Install plugins and dependencies
RUN meltano lock --update --all
RUN meltano install

COPY start.sh ./
COPY ./tap.singer_sdk_logging.json /app/.meltano/run/tap-mongodb/tap.singer_sdk_logging.json

# Set entrypoint to Meltano CLI
ENTRYPOINT ["/bin/sh", "-c"]

# Entrypoint for ECS or local runs
CMD ["./start.sh"]
my `start.sh`:
Copy code
#!/bin/bash

set -euo pipefail
echo "🔧 Starting Meltano project..."

meltano --log-level=debug invoke tap-mongodb
and my `tap.singer_sdk_logging.json`:
Copy code
{
  "version": 1,
  "disable_existing_loggers": false,
  "formatters": {
    "default": {
      "format": "[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s"
    }
  },
  "handlers": {
    "console": {
      "class": "logging.StreamHandler",
      "formatter": "default",
      "stream": "<ext://sys.stderr>"
    }
  },
  "root": {
    "level": "INFO",
    "handlers": [
      "console"
    ]
  }
}
I have
organizations
collection in MongoDB and all the connection-specific fields are passed as .env file:
Copy code
MELTANO_STATE_BACKEND_URI=s3://...
AWS_ACCESS_KEY_ID=AKI...
AWS_SECRET_ACCESS_KEY=r3...

TAP_MONGODB_MONGODB_CONNECTION_STRING=<mongodb+srv://user:pass@cluster.qiaea.mongodb.net?authSource=admin&ssl=true&readPreference=secondary>
TAP_MONGODB_DATABASE=my-db
TAP_MONGODB_START_DATE=2023-10-01T00:00:00Z
I can see the connection to the Mongo is working as the discovery is coming through.
e
Does the output of
meltano select tap-mongodb --list --all
look right to you?
m
No, it did not and I forgot to check it facepalm Selecting only the collection does not select all its keys - it selects only the
replication_key
. I’ve added
organizations.*
and the result from the select seems correct now:
Copy code
[selected   ] organizations._sdc_batched_at
	[selected   ] organizations._sdc_extracted_at
	[selected   ] organizations.cluster_time
	[selected   ] organizations.document
	[selected   ] organizations.namespace
	[selected   ] organizations.namespace.collection
	[selected   ] organizations.namespace.database
	[selected   ] organizations.object_id
	[selected   ] organizations.operation_type
	[automatic  ] organizations.replication_key
and a big list of excluded (all my other collections). I’ve tried setting
replication-key
to be
_id
and
replication_key
, and
replication-method
to both
LOG_BASED
and
INCREMENTAL
. The
LOG_BASED
sync throws the same error, but the
INCREMENTAL
raise a new one:
Copy code
ValueError: Invalid IncrementalId string
It seems I either couldn’t configure how the tap should tack the sync progress, or I need to somehow do a full sync first, and then run it. Can you help me understand this if a preparation step is needed, or the tap will handle it automatically, if no incremental key/resume token is available? Then, what could be the reason those to be invalid? I can see nothing was saved to my S3 bucket, so it shouldn’t be an old state file or something (at least that’s what I think).
Note that I’m running this within a docker container, so no local state is present as well, as I run a new instance every time I test and the volume is not mounted locally either.
e
Note that I’m running this within a docker container
Gotcha, that shouldn't be a problem. I'm not too familiar with the metadata handling in this tap, but your config seems correct according to https://hub.meltano.com/extractors/tap-mongodb--meltanolabs/#settings. @Matt Menzenski might know better.
m
Replication key should be set to
replication_key
- it is possible you just need to run rerun the incremental load with
--full-refresh
Oh I guess that shouldn’t be necessary in the docker setup you describe
Does it make a difference if the
select
is changed to
organizations.*
?
Can you help me understand this if a preparation step is needed, or the tap will handle it automatically, if no incremental key/resume token is available?
in incremental mode the tap should do a full load in ObjectId (
_id
) ascending order and then pick up new inserts on each run after the full load is complete. In log-based mode it does not do a load of existing data - it opens a change stream and captures events from that initial change stream resume token onwards.
m
Hey guys, thanks for the info. I needed some time to try this on and see where it leads me. I’ve changed the metadata as follows:
Copy code
metadata:
        '*':
          replication-key: replication_key
          replication-method: INCREMENTAL
and tried with the command
Copy code
meltano --log-level=debug run tap-mongodb target-snowflake --full-refresh
and got the first items coming through. Thanks a lot for helping me get the sync working. I’d like to ideally do: 1. full initial sync 2. start tracking the log and make sure any change happing in the MongoDB results in a change tracking record on the Snowflake side What I’ve noticed is that every run I make with
INCREMENTAL
replication method results in my records being updated in Snowflake, which is not what I need. Also, it seems to touch all records on the target, which might be quite expensive for bigger databases (which my real one is), even if I run without
--full-refresh
, which I expect to do an incremental sync of only what changed. I’ve tried running
LOG_BASED
right after initial sync was done, but they don’t seem to work well together or I might need a different setup. I feel I’m missing something here, but after reading the docs and looking into the code, I still cannot figure out what exactly, so your help would be greatly appreciated.
Just some more info on this - I don’t mind using any of the methods. What I need to achieve is: 1. Have a full sync initially 2. Make sure only the changes are sent afterword (i.e. I don’t update millions of records unnecessarily) 3. My old/deleted rows are left behind, so I can use them, but I know they are no longer relevant 4. My updated rows stay with the old values, but a new one with the updated values is added, and the old one is marked so I know it is not longer relevant From my understanding, if I’m using incremental, I’ll be getting all the rows every time and depending on the load method of the target, they might be appended or overridden, but I need only the updates/deletes/new ones to be added and the rest to eventually be updated that they are “inactive”.
I believe I’ve started figuring this out. What I should do is: 1. Do a --full-refresh with the INCREMENTAL method, so I get all the data 2. Start doing LOG_BASED sync on regular intervals to get what has changed 3. I need to filter out empty records, as LOG_BASED seems to be generating them when there are no changes 4. Additionally, I need to find a way to properly mark the obsolete rows in the target, but that’s a different topic, I think. That being said, I believe I understand how I should use the tap now. Thanks a lot for your help. Now, let’s focus on the Snowflake target 🙂
I’m wandering if I should do this with a target, or I should do something else, but the way I see it, once I receive a record, I should: 1. If the operation is
insert
- just add it to the destination 2. If the operation is
update, replace
- add it to the destination, and find all other rows in the destination with the same ID and mark them as INACTIVE 3. If the operation is
delete
- skip this row, but find all other rows in the destination with the same ID and mark them as INACTIVE Can I do things like this with TAPS or I should use something else?