Hey everyone, I'm looking to move from pipelinewis...
# troubleshooting
x
Hey everyone, I'm looking to move from pipelinewise version of
tap-postgres
to the meltano labs version. When running
log_based
replication, I am getting the following error:
psycopg2.errors.ObjectInUse: replication slot "XXXX" is active for PID XXXX
It seems to be quite random when it occurs and some streams are able to complete syncing successfully.
Copy code
- name: tap-postgres
    variant: transferwise
    pip_url: git+<https://github.com/XiaozhouWang85/pipelinewise-tap-postgres.git> #Forked version of pipelinewise to bypass some bugs
    config:
      host: XXXXX
      user: XXXX
      dbname: XXXX
      ssl: false
      default_replication_method: LOG_BASED
      max_run_seconds: 43200
      logical_poll_total_seconds: 100
      break_at_end_lsn: true
      itersize: 50000
    select:
    - public-foobar.*
     ........

  - name: tap-postgres-new
    inherit_from: tap-postgres
    pip_url: git+<https://github.com/MeltanoLabs/tap-postgres.git@v0.1.0>
    config:
      host: XXXXXX
      user: XXXX
      ssl: false
      max_run_seconds: 43200
      database: XXXX
      replication_slot_name: XXXX
      default_replication_method: LOG_BASED
    metadata:
      '*':
        replication_method: $MELTANO_POSTGRES_REPLICATION_METHOD
        replication_key: $MELTANO_POSTGRES_REPLICATION_KEY
Calling the following command to run
meltano run tap-postgres-new target-s3-jsonl
So we only have one process reading the replication slot at any point in time. There's a few discussions threads talking about changing the
wal_receiver_timeout
wal_sender_timeout
settings. Is this explicitly defined anywhere in the code somewhere? It would be help a lot if someone can point me at the right place in the code for this
e
Those can be changed on the server config end, e.g.
postgresql.conf
x
Thanks a lot! I'll look into changing that on the postgres side and see if it helps
In addition, I notice that it seems to be reading the replication slot for changes once for each stream instead of reading it once and then pushing that data into each stream. This ends up being very inefficient. Going from pipelinewise version to meltano version results in my run going from 1-2minutes to ~15 minutes. Is this something on your roadmap to improve on? If not, would you be open to a PR adding this feature in?
Only had a quick look at the code but thinking maybe this can be a strategy for implementing it: 1. Replace the inherited
sync_all
and re-specify it in
tap-postgres
2. Insert a step that says "If log_based, get the log data and store it into an in-memory data structure against the
TapPostgres
class" - using list of streams filter the data such that only selected streams are stored into the in-memory structure 3. Switch each stream to read from the in-memory data structure instead of from the replication slot
e
Is this something on your roadmap to improve on? If not, would you be open to a PR adding this feature in?
I personally probably won't have time to look into it for a while, so I'm definitely happy to review a PR!
👍 1
x
Will ping you here and on Github once I have something. It might be a while before I get round to it
ty 1