hello, I’m having trouble getting the upsert loadi...
# troubleshooting
j
hello, I’m having trouble getting the upsert loading to work with a tap-postgres (transferwise variant) with target-redshift (ticketswap variant). What I mean by this is that records are sinking into the right schema/table but for some reason its working like “append-only” giving multiple versions of the record instead of just upserting one entry per record. I’m wondering what are the requirements to get upsert to work on target-redshift. Here is one snippet of my meltano.yml:
Copy code
environments:
- name: prod
  config:
    plugins:
      loaders:
      - name: target-redshift
        config: 
          load_method: upsert
plugins:
  extractors:
  - name: tap-postgres
    variant: transferwise
    pip_url: 
      git+<https://github.com/transferwise/pipelinewise.git#subdirectory=singer-connectors/tap-postgres>
    config:
      dbname: ${src_dbname}
      default_replication_method: LOG_BASED
      filter_schemas: public
      host: ${src_host}
      port: ${src_port}
      user: ${src_username}
      password: ${src_password}
    metadata:
      public-new_risk_decision:
        replication-method: LOG_BASED
        replication-key: _sdc_lsn
        table-key-properties: ["id"]
  loaders:
  - name: target-redshift
    variant: ticketswap
    pip_url: git+<https://github.com/TicketSwap/target-redshift.git>
    config:
      dbname: ${DB_ENV}
      default_target_schema: ...
      host: ${REDSHIFT_HOST}
      load_method: upsert
      user: ${tgt_username}
      password: ${tgt_password}
      s3_bucket: ...
      s3_key_prefix: ...
      aws_redshift_copy_role_arn:  ...
      port: '5439'
      s3_region: us-east-1
      ssl_enable: true
      ssl_mode: require
      validate_records: false
      batch_size_rows: 1000000
      activate_version: true
      add_record_metadata: true
h
I've seen this too, for mssql log_based into target-redshift. I had to fork to fix this. Basically, it is attempting to upsert, but the problem is the messages coming from the tap are not unique by the key property. since it is log_based replication method, the messages can include multiple messages for one key property in the same batch, which are not reduced to one message before upsert, and the upsert does not remove duplicates either. This is a full explanation of the issue and fix on my end
Copy code
## Problem
The resulting tables are not unique by the key property field.

## Why
This is due to the style of upserts done in target-redshift. It works as expected when the tap data is unique by the primary key, but with log-based CDC into target-redshift, we are sending non-unique ids when the target is expecting uniqueness.

more than one record is produced by the target for a given id in the source, one for each change made since the latest run (each change is captured, with CDC - change data capture)
we still configure a primary key of id implying uniqueness
records are not loaded one by one like other targets often are, but in bulk
In the target, Redshift's MERGE INTO is set to use REMOVE DUPLICATES, however Redshift's documentation notes that

> Rows in target_table can't match multiple rows in source_table.

source_table here means the records we are loading and target_table means the redshift table we want to upsert to.

Fix
We need target-redshift to apply only the final change from the change data capture records, but we still want all the records to land in S3 for our reference. Each record contains the state of the db row at the point of that change, and a few columns noting what the type of change was: insert, update, delete. The latest row is the current state, and what we want in the redshift table.

Meltano numbers the records that come through the tap/target in _sdc_sequence. We choose the record with the highest _sdc_sequence within each primary key partition, which ensures only one source row matching each target row in the merge command.
With this, I also had to increase the precision of
_sdc_sequence
to actually be unique.
Copy code
We deal with multiple changes to one primary key in the same run by ordering on _sdc_sequence. However, _sdc_sequence is time-based and can have the same value for two entries if they are processed in the same millisecond. Increasing this to nanosecond makes _sdc_sequence unique
Code changes In target_redshift/sinks.py::RedshiftSink - two changes. Changed to dedupe by key property before merging
Copy code
def upsert(
        self,
        from_table: sqlalchemy.Table,
        to_table: sqlalchemy.Table,
        join_keys: list[str],
        cursor: Cursor,
    ) -> None:
        """Merge upsert data from one table to another.
        Args:
            from_table: The source table.
            to_table: The destination table.
            schema: Singer Schema message.
            join_keys: The merge upsert keys, or `None` to append.
            cursor: The database cursor.
        Return:
            The number of records copied, if detectable, or `None` if the API does not
            report number of records affected/inserted.
        """
        if len(join_keys) > 0:
            primary_key_filter = sqlalchemy.func.row_number().over(
                partition_by=[from_table.columns[key] for key in join_keys],
                order_by=from_table.columns["_sdc_sequence"].desc(),
            )

            from_table_unique: sqlalchemy.Table = self.connector.copy_table_structure(
                full_table_name=f"{self.temp_table_name}_unique",
                from_table=from_table,
                as_temp_table=True,
                cursor=cursor,
            )

            sql = f"""
                insert into {self.connector.quote(str(from_table_unique))}
                select *
                from {self.connector.quote(str(from_table))} as {self.connector.quote(str(from_table))}
                qualify {primary_key_filter} = 1
                """  # noqa: S608
            cursor.execute(sql)

            join_predicates = []
            for key in join_keys:
                from_table_key: sqlalchemy.Column = from_table_unique.columns[key]
                to_table_key: sqlalchemy.Column = to_table.columns[key]
                join_predicates.append(from_table_key == to_table_key)

            join_condition = sqlalchemy.and_(*join_predicates)

            sql = f"""
                MERGE INTO {self.connector.quote(str(to_table))}
                USING {self.connector.quote(str(from_table_unique))}
                ON {join_condition}
                REMOVE DUPLICATES
                """
            cursor.execute(sql)
        else:
            sql = f"""
                INSERT INTO {self.connector.quote(str(to_table))}
                SELECT * FROM {self.connector.quote(str(from_table))}
                """  # noqa: S608
            cursor.execute(sql)
Added to increase precision of
_sdc_sequence
Copy code
def _add_sdc_metadata_to_record(
        self,
        record: dict,
        message: dict,
        context: dict,
    ) -> None:
        """Populate metadata _sdc columns from incoming record message.
        Record metadata specs documented at:
        <https://sdk.meltano.com/en/latest/implementation/record_metadata.html>
        Args:
            record: Individual record in the stream.
            message: The record message.
            context: Stream partition or context dictionary.
        """
        super()._add_sdc_metadata_to_record(record, message, context)

        record["_sdc_sequence"] = time.time_ns()
j
oh wow. Ok
h
Without forking, you could try a different replication_method. incremental or full_table
j
I actually want to try to fork the repo, possibly contribute. Never had to before so this would be cool if its ok
👍 1
h
yeah go for it! it's great to contribute
j
got the code to work. The changes were perfect
reading into it though, I do wonder if this would work if you use incremental upsert or if you do log-based without record metadata (though I think you would always need record metadata if you do log-based right?)
h
yes the incremental upsert does not have this issue since it's pulling from the database table as opposed to the CDC table. the latter is a list of changes which is not unique by the primary key whereas the former would be unique record metadata is not optional AFAIK when you want it to upsert instead of append. it has to know what makes the rows unique