Justin Yang
08/05/2025, 4:42 PMenvironments:
- name: prod
config:
plugins:
loaders:
- name: target-redshift
config:
load_method: upsert
plugins:
extractors:
- name: tap-postgres
variant: transferwise
pip_url:
git+<https://github.com/transferwise/pipelinewise.git#subdirectory=singer-connectors/tap-postgres>
config:
dbname: ${src_dbname}
default_replication_method: LOG_BASED
filter_schemas: public
host: ${src_host}
port: ${src_port}
user: ${src_username}
password: ${src_password}
metadata:
public-new_risk_decision:
replication-method: LOG_BASED
replication-key: _sdc_lsn
table-key-properties: ["id"]
loaders:
- name: target-redshift
variant: ticketswap
pip_url: git+<https://github.com/TicketSwap/target-redshift.git>
config:
dbname: ${DB_ENV}
default_target_schema: ...
host: ${REDSHIFT_HOST}
load_method: upsert
user: ${tgt_username}
password: ${tgt_password}
s3_bucket: ...
s3_key_prefix: ...
aws_redshift_copy_role_arn: ...
port: '5439'
s3_region: us-east-1
ssl_enable: true
ssl_mode: require
validate_records: false
batch_size_rows: 1000000
activate_version: true
add_record_metadata: true
Holly Evans
08/05/2025, 5:50 PM## Problem
The resulting tables are not unique by the key property field.
## Why
This is due to the style of upserts done in target-redshift. It works as expected when the tap data is unique by the primary key, but with log-based CDC into target-redshift, we are sending non-unique ids when the target is expecting uniqueness.
more than one record is produced by the target for a given id in the source, one for each change made since the latest run (each change is captured, with CDC - change data capture)
we still configure a primary key of id implying uniqueness
records are not loaded one by one like other targets often are, but in bulk
In the target, Redshift's MERGE INTO is set to use REMOVE DUPLICATES, however Redshift's documentation notes that
> Rows in target_table can't match multiple rows in source_table.
source_table here means the records we are loading and target_table means the redshift table we want to upsert to.
Fix
We need target-redshift to apply only the final change from the change data capture records, but we still want all the records to land in S3 for our reference. Each record contains the state of the db row at the point of that change, and a few columns noting what the type of change was: insert, update, delete. The latest row is the current state, and what we want in the redshift table.
Meltano numbers the records that come through the tap/target in _sdc_sequence. We choose the record with the highest _sdc_sequence within each primary key partition, which ensures only one source row matching each target row in the merge command.
With this, I also had to increase the precision of _sdc_sequence
to actually be unique.
We deal with multiple changes to one primary key in the same run by ordering on _sdc_sequence. However, _sdc_sequence is time-based and can have the same value for two entries if they are processed in the same millisecond. Increasing this to nanosecond makes _sdc_sequence unique
Holly Evans
08/05/2025, 5:51 PMdef upsert(
self,
from_table: sqlalchemy.Table,
to_table: sqlalchemy.Table,
join_keys: list[str],
cursor: Cursor,
) -> None:
"""Merge upsert data from one table to another.
Args:
from_table: The source table.
to_table: The destination table.
schema: Singer Schema message.
join_keys: The merge upsert keys, or `None` to append.
cursor: The database cursor.
Return:
The number of records copied, if detectable, or `None` if the API does not
report number of records affected/inserted.
"""
if len(join_keys) > 0:
primary_key_filter = sqlalchemy.func.row_number().over(
partition_by=[from_table.columns[key] for key in join_keys],
order_by=from_table.columns["_sdc_sequence"].desc(),
)
from_table_unique: sqlalchemy.Table = self.connector.copy_table_structure(
full_table_name=f"{self.temp_table_name}_unique",
from_table=from_table,
as_temp_table=True,
cursor=cursor,
)
sql = f"""
insert into {self.connector.quote(str(from_table_unique))}
select *
from {self.connector.quote(str(from_table))} as {self.connector.quote(str(from_table))}
qualify {primary_key_filter} = 1
""" # noqa: S608
cursor.execute(sql)
join_predicates = []
for key in join_keys:
from_table_key: sqlalchemy.Column = from_table_unique.columns[key]
to_table_key: sqlalchemy.Column = to_table.columns[key]
join_predicates.append(from_table_key == to_table_key)
join_condition = sqlalchemy.and_(*join_predicates)
sql = f"""
MERGE INTO {self.connector.quote(str(to_table))}
USING {self.connector.quote(str(from_table_unique))}
ON {join_condition}
REMOVE DUPLICATES
"""
cursor.execute(sql)
else:
sql = f"""
INSERT INTO {self.connector.quote(str(to_table))}
SELECT * FROM {self.connector.quote(str(from_table))}
""" # noqa: S608
cursor.execute(sql)
Added to increase precision of _sdc_sequence
def _add_sdc_metadata_to_record(
self,
record: dict,
message: dict,
context: dict,
) -> None:
"""Populate metadata _sdc columns from incoming record message.
Record metadata specs documented at:
<https://sdk.meltano.com/en/latest/implementation/record_metadata.html>
Args:
record: Individual record in the stream.
message: The record message.
context: Stream partition or context dictionary.
"""
super()._add_sdc_metadata_to_record(record, message, context)
record["_sdc_sequence"] = time.time_ns()
Justin Yang
08/05/2025, 5:52 PMHolly Evans
08/05/2025, 5:53 PMJustin Yang
08/05/2025, 5:57 PMHolly Evans
08/05/2025, 7:58 PMJustin Yang
08/06/2025, 2:54 PMJustin Yang
08/06/2025, 2:55 PMHolly Evans
08/11/2025, 4:47 PM