Hey everybody, I'm using tap-mongodb by meltanolab...
# plugins-general
c
Hey everybody, I'm using tap-mongodb by meltanolabs variant and I received an error when I try using replication-method LOG_BASED. My definition are like: metadata: '*': replication-key: replication_key replication-method: LOG_BASED When I execute: meltano run tap-mongodb target-postgres --full-refresh. The error returned: pymongo.errors.OperationFailure: The $changeStream stage is only supported on replica sets, full error: {'ok': 0.0, 'errmsg': 'The $changeStream stage is only supported on replica sets', 'code': 40573, 'codeName': 'Location40573'}
m
How are you running MongoDB? If you are running a standalone instance, the change stream (which is used for log-based replication) is not available.
c
Tks! I'll verified it. Probably we're using the standalone instance.
I created a mongodb instance locally with replica set. This is the string connection: mongodb://127.0.0.1:27017/?replicaSet=rs0 I achieved connect in mongodb compass. But returned the below erro: Plugin configuration is invalid pymongo.errors.OperationFailure: resume token string was not a valid hex string, full error: {'ok': 0.0, 'errmsg': 'resume token string was not a valid hex string', 'code': 9, 'codeName': 'FailedToParse', '$clusterTime': {'clusterTime': Timestamp(1709308449, 1), 'signature': {'hash': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 'keyId': 0}}, 'operationTime': Timestamp(1709308449, 1)} I used the meltanolabs variant
m
resume token string was not a valid hex string
had you run the same meltano tap previously in incremental replication mode? You might need to rerun it with
--full-refresh
to ignore any saved state / replication key
(author of the meltanolabs tap-mongodb variant here, btw)
c
Yes, I can fix it. I have a doubt. When I use replication-method: LOG_BASED the documents not are loaded only replication_key. If I use replication-method the documents are loaded. Is possible to use de LOG_BASED and send the document values to postgres database?
Here my meltano.yml
Copy code
version: 1
default_environment: dev
project_id: fe37a16b-a105-4c84-8af7-649ff980ebdf
environments:
- name: dev
- name: staging
- name: prod
plugins:
  extractors:
  - name: tap-mongodb
    variant: meltanolabs
    pip_url: git+<https://github.com/MeltanoLabs/tap-mongodb.git>
    config:
      mongodb_connection_string: <mongodb://127.0.0.1:27017/?replicaSet=rs0>
      database: checklist_facil
      flattening_enabled: true
      flattening_max_depth: 100
      add_record_metadata: true
      allow_modify_change_streams: true
    select:
    - countries.*
    metadata:
      '*':
        replication-key: replication_key
        replication-method: LOG_BASED
    stream_maps:
      countries:

  loaders:
  - name: target-jsonl
    variant: andyh1203
    pip_url: target-jsonl
  - name: target-postgres
    variant: meltanolabs
    pip_url: meltanolabs-target-postgres
    config:
      add_record_metadata: true
      database: postgres
      default_target_schema: checklist_facil
      flattening_enabled: true
      flattening_max_depth: 100
      host: localhost
      port: 5432
      user: postgres
  mappers:
  - name: meltano-map-transformer
    variant: meltano
    pip_url: git+<https://github.com/MeltanoLabs/meltano-map-transform.git>
    mappings:
    - name: set_fields_jd
      config:
        stream_maps:
          countries:
            document___id: document___id
m
I have a doubt. When I use replication-method: LOG_BASED the documents not are loaded only replication_key. If I use replication-method the documents are loaded.
Is possible to use de LOG_BASED and send the document values to postgres database?
The log-based replication mode sets
full_document=updateLookup
(source) so it will return the full documents, but the change stream is limited in its scope. You’ll only get events that have occurred since the last run of the tap (and the tap must be run frequently enough that the saved replication key is still available in the change stream/operations log). I usually run two versions of the tap, one in incremental replication mode (to load all already-existing records) and one in log-based replication mode (to consume updates from that point on).
If you run the tap in log-based mode, and then update a document in the database, and then run the tap again, you should see it emit the full document that was changed.
c
Tks, I'll try it.