Miroslav Nedyalkov
06/19/2025, 8:23 AMselect
in the meltano.yml
, but not sure if that’s the right way to do it)
2. Define what strategy to use to sync the data - I’d like to generally use LOG_BASED, but need to do a complete sync first. It sees this particular tap doesn’t support FULL_SYNC. I also couldn’t get LOG_BASED
to work (I set it up with metadata
section in the meltano.yml
file, where I set *
under which replication-key: replication_key
and replication-method: LOG_BASED
) - I get resume token string was not a valid hex string
which I assume is because I’ve never done a full sync, but when I try to do it, I get an error this tap doesn’t work with full sync.
My use case sounds rather standard - I just need to move data from MongoDB (limited set of collections) => Snowflake tracking changes, but without applying any transformations nor parsing. I need initial full import, and log-based import after that, but unfortunately I couldn’t get it to work…Miroslav Nedyalkov
06/19/2025, 8:29 AMmeltano.yml
(I’ve removed the snowflake part to test):
version: 1
plugins:
extractors:
- name: tap-mongodb
variant: meltanolabs
pip_url: git+<https://github.com/MeltanoLabs/tap-mongodb.git@main>
metadata:
'*':
replication-key: replication_key
replication-method: LOG_BASED
select:
- 'organizations'
project_id: 78647a00-da00-49f1-a8b6-481cd9769235
default_environment: production
environments:
- name: production
My `Dockerfile`:
# Use official Python image
FROM meltano/meltano:latest-python3.11
RUN apt-get update && apt-get install -y libssl-dev
# Set working directory
WORKDIR /app
# Copy Meltano project
COPY meltano.yml ./
# Install plugins and dependencies
RUN meltano lock --update --all
RUN meltano install
COPY start.sh ./
COPY ./tap.singer_sdk_logging.json /app/.meltano/run/tap-mongodb/tap.singer_sdk_logging.json
# Set entrypoint to Meltano CLI
ENTRYPOINT ["/bin/sh", "-c"]
# Entrypoint for ECS or local runs
CMD ["./start.sh"]
my `start.sh`:
#!/bin/bash
set -euo pipefail
echo "🔧 Starting Meltano project..."
meltano --log-level=debug invoke tap-mongodb
and my `tap.singer_sdk_logging.json`:
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"default": {
"format": "[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "<ext://sys.stderr>"
}
},
"root": {
"level": "INFO",
"handlers": [
"console"
]
}
}
I have organizations
collection in MongoDB and all the connection-specific fields are passed as .env file:
MELTANO_STATE_BACKEND_URI=s3://...
AWS_ACCESS_KEY_ID=AKI...
AWS_SECRET_ACCESS_KEY=r3...
TAP_MONGODB_MONGODB_CONNECTION_STRING=<mongodb+srv://user:pass@cluster.qiaea.mongodb.net?authSource=admin&ssl=true&readPreference=secondary>
TAP_MONGODB_DATABASE=my-db
TAP_MONGODB_START_DATE=2023-10-01T00:00:00Z
I can see the connection to the Mongo is working as the discovery is coming through.Edgar Ramírez (Arch.dev)
06/19/2025, 3:40 PMmeltano select tap-mongodb --list --all
look right to you?Miroslav Nedyalkov
06/20/2025, 6:26 AMreplication_key
. I’ve added organizations.*
and the result from the select seems correct now:
[selected ] organizations._sdc_batched_at
[selected ] organizations._sdc_extracted_at
[selected ] organizations.cluster_time
[selected ] organizations.document
[selected ] organizations.namespace
[selected ] organizations.namespace.collection
[selected ] organizations.namespace.database
[selected ] organizations.object_id
[selected ] organizations.operation_type
[automatic ] organizations.replication_key
and a big list of excluded (all my other collections).
I’ve tried setting replication-key
to be _id
and replication_key
, and replication-method
to both LOG_BASED
and INCREMENTAL
. The LOG_BASED
sync throws the same error, but the INCREMENTAL
raise a new one:
ValueError: Invalid IncrementalId string
It seems I either couldn’t configure how the tap should tack the sync progress, or I need to somehow do a full sync first, and then run it. Can you help me understand this if a preparation step is needed, or the tap will handle it automatically, if no incremental key/resume token is available?
Then, what could be the reason those to be invalid? I can see nothing was saved to my S3 bucket, so it shouldn’t be an old state file or something (at least that’s what I think).Miroslav Nedyalkov
06/20/2025, 6:28 AMEdgar Ramírez (Arch.dev)
06/23/2025, 11:30 PMNote that I’m running this within a docker containerGotcha, that shouldn't be a problem. I'm not too familiar with the metadata handling in this tap, but your config seems correct according to https://hub.meltano.com/extractors/tap-mongodb--meltanolabs/#settings. @Matt Menzenski might know better.
Matt Menzenski
06/24/2025, 12:42 AMreplication_key
- it is possible you just need to run rerun the incremental load with --full-refresh
Matt Menzenski
06/24/2025, 12:43 AMMatt Menzenski
06/24/2025, 12:45 AMselect
is changed to organizations.*
?Matt Menzenski
06/24/2025, 12:48 AMCan you help me understand this if a preparation step is needed, or the tap will handle it automatically, if no incremental key/resume token is available?in incremental mode the tap should do a full load in ObjectId (
_id
) ascending order and then pick up new inserts on each run after the full load is complete. In log-based mode it does not do a load of existing data - it opens a change stream and captures events from that initial change stream resume token onwards.Miroslav Nedyalkov
06/26/2025, 5:26 AMmetadata:
'*':
replication-key: replication_key
replication-method: INCREMENTAL
and tried with the command
meltano --log-level=debug run tap-mongodb target-snowflake --full-refresh
and got the first items coming through. Thanks a lot for helping me get the sync working.
I’d like to ideally do:
1. full initial sync
2. start tracking the log and make sure any change happing in the MongoDB results in a change tracking record on the Snowflake side
What I’ve noticed is that every run I make with INCREMENTAL
replication method results in my records being updated in Snowflake, which is not what I need. Also, it seems to touch all records on the target, which might be quite expensive for bigger databases (which my real one is), even if I run without --full-refresh
, which I expect to do an incremental sync of only what changed.
I’ve tried running LOG_BASED
right after initial sync was done, but they don’t seem to work well together or I might need a different setup. I feel I’m missing something here, but after reading the docs and looking into the code, I still cannot figure out what exactly, so your help would be greatly appreciated.Miroslav Nedyalkov
06/26/2025, 9:34 AMMiroslav Nedyalkov
06/27/2025, 5:28 AMMiroslav Nedyalkov
06/27/2025, 5:34 AMinsert
- just add it to the destination
2. If the operation is update, replace
- add it to the destination, and find all other rows in the destination with the same ID and mark them as INACTIVE
3. If the operation is delete
- skip this row, but find all other rows in the destination with the same ID and mark them as INACTIVE
Can I do things like this with TAPS or I should use something else?