Haaris Isaac Hussain
10/07/2024, 8:57 AMEdgar Ramírez (Arch.dev)
10/07/2024, 2:52 PMHaaris Isaac Hussain
10/07/2024, 3:02 PMHaaris Isaac Hussain
10/07/2024, 3:03 PMHaaris Isaac Hussain
10/07/2024, 3:10 PMdef __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.column_filters = kwargs.get('column_filters', None)
And added column_filters config to the metadata catalog:
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=None,
valid_replication_keys=None, # Must be defined by user
column_filters=column_filters
Also here:
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
column_filters=column_filters,
delimiter="-",
)
And here:
def discover_catalog_entry( # noqa: PLR0913
self,
engine: Engine,
inspected: Inspector,
schema_name: str,
table_name: str,
column_filters: str,
Haaris Isaac Hussain
10/07/2024, 3:11 PMHaaris Isaac Hussain
10/08/2024, 2:34 PMif self.column_filter and self.column_value:
column_filter_col = table.columns[self.column_filter]
column_value_col = table.columns[self.column_value]
query = query.filter_by(column_filter_col = column_value_col)
if self.replication_key:
replication_key_col = table.columns[self.replication_key]
query = query.order_by(replication_key_col)
start_val = self.get_starting_replication_key_value(context)
if start_val:
query = query.filter(replication_key_col >= start_val)
to the get_records definition and the columns to get the CatalogEntry:
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=None,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=None,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
column_filter=None,
column_value=None,
)
I have tried to make this similar to the replication_key metadata and it gives me this error although it is set up the same as replication_key:
"AttributeError: 'MySQLStream' object has no attribute 'column_filter'"Edgar Ramírez (Arch.dev)
10/08/2024, 4:08 PMcolumn_filter
as a new setting?Haaris Isaac Hussain
10/09/2024, 9:06 AMEdgar Ramírez (Arch.dev)
10/09/2024, 2:34 PM"column_filter": {
"stream1": [...],
"stream2": [...]
}