Hi all, I'm trying to catch the HARD DELETED recor...
# troubleshooting
e
Hi all, I'm trying to catch the HARD DELETED records from source db and apply the same logic to target DB. Source DB is Postgresql and Target DB is also Postgresql both of them version is same 14.5. I have a table "*cus.customer*" at source DB which has 2500 records. And sync it to target database as "*raw_cus*.customer" via meltano at the initial step
meltano run tap-postgres target-postgres
It works perfect. ***I deleted some of the rows (210 records) hardly from source db table. and re-run the same pipeline which I'm expecting it will truncate or delete all the records from target and insert all the records from scracth because "*replication_method=FULL_TABLE"*. (h*ard_delete, add_metadata_columns, primary_key_required* configs are true)
meltano --log-level=debug run tap-postgres target-postgres
but it tries to replicate 2290 rows again and tries to delete the records which is not null the "*_sdc_deleted_at*" column unfortunatelly there is none of any record's "_sdc_deleted_at" column has any value so that it can't delete anything (DELETE 0) from the target table and total row count stay 2500 instead of the 2290. Can you please guide or help me about this issue or am i missing something? btw, I also tried to --full-refresh parameter and it deosnt affect anything again.
meltano run tap-postgres target-postgres --full-refresh
meltano.yaml =============
Copy code
plugins:
  extractors:
  - name: tap-postgres
    variant: transferwise
    pip_url: pipelinewise-tap-postgres
    config:
      host: localhost
      port: 54321
      dbname: aipg_test_db
      user: postgres
      default_replication_method: FULL_TABLE
      filter_schemas: cus
      max_run_seconds: 43200
      ssl: false
      break_at_end_lsn: true
	select:
    - cus-customer.*
	
  loaders:
	- name: target-postgres
    variant: transferwise
    pip_url: pipelinewise-target-postgres
    config:
      host: localhost
      port: 54322
      dbname: pg2pg_meltano
      user: postgres
      schema_mapping:
        cus.target_schema: raw_cus
      default_target_schema: test
      # to catch the hard delete records
      hard_delete: true
      add_metadata_columns: true
      primary_key_required: true
      validate_records: true
      temp_dir:
etl.log (after delete from source table) ======= [info ] {"type": "ACTIVATE_VERSION", "stream": "cus-customer", "version": 1665658359861} cmd_type=elb consumer=False name=tap-postgres producer=True stdio=stdout string_id=tap-postgres [info ] {"type": "STATE", "value": {"bookmarks": {"cus-customer": {"last_replication_method": "FULL_TABLE", "version": 1665658359861, "xmin": null}}, "currently_syncing": null}} cmd_type=elb consumer=False name=tap-postgres producer=True stdio=stdout string_id=tap-postgres [info ] time=2022-10-13 135241 name=singer level=INFO message=METRIC: {"type": "counter", "metric": "record_count", "value": 2290, "tags": {}} cmd_type=elb consumer=False name=tap-postgres producer=True stdio=stderr string_id=tap-postgres [debug ] head producer completed first as expected name=tap-postgres [debug ] tail consumer is next block, wrapping up [info ] time=2022-10-13 135241 name=target_postgres level=INFO message=Loading 2290 rows into 'raw_cus."customer"' cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres [info ] time=2022-10-13 135242 name=target_postgres level=INFO message=Loading into raw_cus."customer": {"inserts": 0, "updates": 2290, "size_bytes": 632142} cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target-postgres [info ] time=2022-10-13 135242 name=target_postgres level=INFO message=Creating index on 'raw_cus."customer"' table on '_sdc_deleted_at' column(s)... CREATE INDEX IF NOT EXISTS i_customer__sdc_deleted_at ON raw_cus."customer" (_sdc_deleted_at) cmd_type=elb consumer=True name=target-postgres producer=False stdio=stderr string_id=target…
t
Hi @erkan_ciftci. Two things here: 1. the
--full-refresh
option resets Meltano's internal state for the pipeline, which will cause the tap to retrieve all data from the source. It has no impact on the target, however. If you want all the rows in the destination DB to be removed you'll have to remove them somehow first. a. Think of it this way: If the target were JSON files, what would you expect to happen? Nothing, right? Because there's no way to remove data from JSON files. 2. Detecting hard deleted columns requires using LOG_BASED replication mode. Rows that have been hard deleted no longer exist in the database so there's no way for INCREMENTAL or FULL_TABLE modes to know they were ever there.
e
Hi @thomas_briggs thanks for your answers but I think I'm misunderstood or explain the case a little bit wrong. 1. I used the "--full-refresh" parameter just for testing purpose does If the replication_method=FULL_TABLE I'll expecting the each pipeline execution it should be delete/truncate the destination table and load/overwrite it from scrach, isn't it? without looking the state of the stream. ( if the target were jsonl it should be overwrite the records existing file so that I'll see the latest records 2920 rows instead of 2500 rows. 2. If FULL_TABLE doesn't support for the hard delete mode "no way for INCREMENTAL or FULL_TABLE modes to know they were ever there" why it is trying to execute this query in the logs? Creating index on 'raw_cus."customer"' table on '_sdc_deleted_at' column(s)... CREATE INDEX IF NOT EXISTS i_customer__sdc_deleted_at ON raw_cus."customer" (_sdc_deleted_at) Deleting rows from 'raw_cus."customer"' table... DELETE FROM raw_cus."customer" WHERE _sdc_deleted_at IS NOT NULL RETURNING _sdc_deleted_at DELETE 0
t
FULL_TABLE mode tells the tap to retrieve all the data in the table. The target doesn't know anything about the replication mode - it just gets records from the tap and writes them to the DB. Note that the replication-method option is defined in the tap - the target doesn't have any way to see that value.
The target is executing a DELETE because the
hard_delete
option is enabled. That will remove any rows marked as deleted, They'll only be marked as deleted if the tap says there were deleted though, and that can only happen when using LOG_BASED replication.
e
Ok @thomas_briggs I got the idea I was already thinking to use the LOG_BASED option for all the pipelines but tries to understand and clarify the where can I use the FULL_TABLE option in case simple data transfers or initial loads etc.
t
I don't think there are a lot of uses for FULL_TABLE mode because unless you're only going to run the pipeline once you need something outside Meltano to clean up the destination each time. 😕
f
@erkan_ciftci I’ve just been running into the same problem, but with
tap-postgres
and
target-snowflake
. The documentation for ACTIVATE_VERSION states precisely that it is designed to be used with the FULL_TABLE option for this use case:
This is where the
ACTIVATE_VERSION
message comes in! The
ACTIVATE_VERSION
message type is a mechanism that is used in full table sync mode which tells the target to disregard all previously received records and only consider the current version as “active”. By using this mechanism the sync between the tap and target can properly delete inactive versions of the data in the destination to clean up any stale records that were hard deleted.
In theory the target should be able to handle “delete everything that wasn’t sent in this run”, but after trying various combinations of parameters I’ve given up trying to get deletes working (either soft or hard!) 😞
t
I don't think the Pipelinewise targets support activate_version 😞 Hence the need to manually clear the table before running the meltano pipeline.
f
Ah - that would certainly explain it! That might also explain @erkan_ciftci’s lack of success if he’s using the default “transferwise” variant of target-postgres.
e
Hi @felix_kenton i saw and checked the active_version implementation but it doesn't work standard targets so that i tested the wise version unfortunately it doesn't work again as expected. So i switched to pipelinewise which is really working much better and leave the meltano, thanks for all helps and answers.
f
For anyone running into this in future, I got a definitive confirmation that it doesn’t support it from a maintainer here: https://github.com/transferwise/pipelinewise-target-snowflake/issues/327