I had a long running `meltano run tap-mongodb targ...
# troubleshooting
m
I had a long running
meltano run tap-mongodb target-postgres
job that got ~85% of the way through an incremental load yesterday and then errored out. When I ran that job again today, I was surprised to see it run from the beginning, rather than starting from that 85% mark. Is the state not saved in a resumable way if the job errors out? Or is there potentially something I need to be doing differently in the tap implementation?
sadly I didn’t retrieve the state value before beginning the new job
well, the rerun failed too and I have now captured the state
Copy code
{
  "singer_state": {
    "bookmarks": {
      "backfill_talk_box_conversation": {
        "starting_replication_value": "1970-01-01",
        "progress_markers": {
          "Note": "Progress is not resumable if interrupted.",
          "replication_key": "_id",
          "replication_key_value": "5e15ef2c18fdcf0001eaf01e"
        }
      }
    }
  }
}
v
If you share your
meltano.yml
it'd be helpful that way we can see which taps/targets you're using etc
m
Copy code
plugins:
  extractors:
    - name: tap-mongodb
      variant: menzenski
      pip_url: git+<https://github.com/menzenski/tap-mongodb.git@1712b4f0d59db434413cc6e4a01a4f199ef0164f>
      config:
        add_record_metadata: true
        allow_modify_change_streams: true
    - name: tap-talk-box-backfill
      inherit_from: tap-mongodb
      config:
        database: talk-box
        prefix: backfill_talk_box
      select:
        - backfill_talk_box_conversation.*
        - backfill_talk_box_conversationdefinition.*
      metadata:
        '*':
          replication-key: _id
          replication-method: INCREMENTAL
  loaders:
    - name: target-postgres
      variant: meltanolabs
      pip_url: git+<https://github.com/MeltanoLabs/target-postgres.git@85d932ab14b94f9595a84ade39f9a8e7fa0c5213>
      config:
        add_record_metadata: true
        database: paw_crucible
    - name: target-postgres-staging-payit
      inherit_from: target-postgres
      config:
        default_target_schema: staging_payit
Specific command run was
meltano run tap-talk-box-backfill target-postgres-staging-payit
I’m trying to understand why it’s not resuming from that
"replication_key_value": "5e15ef2c18fdcf0001eaf01e"
value (which is the behavior that I want)
it’s not clear to me whether this is just the behavior of
meltano run
and I should be using
meltano elt
, or whether the issue is that these BSON ObjectId strings aren’t alphanumerically sortable, or if there’s something else
p
One aspect that I'm aware of is the
is_sorted
attribute thats defaulted to False. See https://sdk.meltano.com/en/latest/incremental_replication.html#example-code-timestamp-based-incremental-replication. If the tap thinks the output is unsorted it wont be resumable because it cant be sure all records have been retrieved up until the replication key value until the entire stream completes successfully
m
ooh, thank you ! that seems easy enough to change and retest
hmm, this line on that page may be an issue though:
• The SDK will throw an error if records come out of order when
is_sorted
is true.
but I suppose I don’t technically need to be using the hex string representation of the ObjectId as the replication key
an ObjectID has a timestamp component, and that’s really all that I’m using it for, so maybe I should just use that timestamp component as the replication key value thinkspin
p
yeah if its set to is_sorted=true then the SDK tries to be safe and error when it detects unsorted data. I do some weird batching/pagination stuff with cloudwatch in https://github.com/MeltanoLabs/tap-cloudwatch/blob/c83a222be106ac251af39fc2212b78a8b368af70/tap_cloudwatch/client.py#L14 and ended up using the
check_sorted
method too to disable those checks. For me I had to get subbatches that are sorted but I use
>=
so sometimes theres an overlap on the edge of a batch where a duplicate record with an earlier timestamp gets sent again and the SDK was throwing an error because it thought it was unsorted but its not
^^idk if thats helpful context
m
it is, thanks
setting
is_sorted
to True (when running in incremental replication mode) and setting the replication key value to an ISO-8601 string representation of the ObjectId’s timestamp component (rather than the ObjectId’s hex string representation that I had been using previously) fixed this issue. Now, when a tap errors, the state is saved in a way that can be resumed on the next run.