I was wondering if anyone has successfully applied...
# singer-tap-development
h
I was wondering if anyone has successfully applied column filters in tapmyql - I'm using the MeltanoLabs variant and extracting data from MySQL and want to filter records. If not what would be required in the repo to have this work?
e
Hi @Haaris Isaac Hussain! What have you tried so far?
h
@Edgar Ramírez (Arch.dev) I've added this to the get_records defintion: if self.column_filters: query = query.filter_by(**self.column_filters) self.logger.info(f"Applied filters to query: {self.column_filters}")
testing it locally at the moment but can't seem to get it to work
Got this in my SQLStream Class:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.column_filters = kwargs.get('column_filters', None)
And added column_filters config to the metadata catalog:
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=None,
valid_replication_keys=None, # Must be defined by user
column_filters=column_filters
Also here:
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
column_filters=column_filters,
delimiter="-",
)
And here:
def discover_catalog_entry(  # noqa: PLR0913
self,
engine: Engine,
inspected: Inspector,
schema_name: str,
table_name: str,
column_filters: str,
So in meltano.yml I have set it up as: metadata: <schema>-<table>: replication-method: FULL_TABLE column_filters: "<column>= '<value>'" However it's just bringing in all the columns without applying my filter
@Edgar Ramírez (Arch.dev) Can I not just add
if self.column_filter and self.column_value:
column_filter_col = table.columns[self.column_filter]
column_value_col = table.columns[self.column_value]
query = query.filter_by(column_filter_col = column_value_col)
if self.replication_key:
replication_key_col = table.columns[self.replication_key]
query = query.order_by(replication_key_col)
start_val = self.get_starting_replication_key_value(context)
if start_val:
query = query.filter(replication_key_col >= start_val)
to the get_records definition and the columns to get the CatalogEntry:
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=None,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=None,
valid_replication_keys=None,  # Must be defined by user
),
database=None,  # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None,  # Must be defined by user
column_filter=None,
column_value=None,
)
I have tried to make this similar to the replication_key metadata and it gives me this error although it is set up the same as replication_key: "AttributeError: 'MySQLStream' object has no attribute 'column_filter'"
e
Rather than making it part of the metadata, would it make more sense to add
column_filter
as a new setting?
h
@Edgar Ramírez (Arch.dev) the reason behind me adding it to the metadata is because each table can have a unique column filter and the code parses through metadata specifically for each table right? I don't know how to make it work as a stand alone setting
e
Right, so you could make it work as a mapping, e.g.
Copy code
"column_filter": {
  "stream1": [...],
  "stream2": [...]
}
1