I have run into an issue with two taps this week w...
# singer-tap-development
s
I have run into an issue with two taps this week where the streams are outputting records that have null values in primary key columns. They are primary key columns (
id
, for example), but the source system apparently does not have strict limitations on them, or has special cases where they might be omitted. The tap completes successfully, but there are errors on load due to the null primary keys. I'd like to just remove these from the yielded records, and was wondering if others have suggestions. Right now, I'm just adding a filter into `parse_response`:
Copy code
def parse_response(self, response: requests.Response) -> Iterable[dict]:
        records = extract_jsonpath(self.records_jsonpath, input=response.json())
        yield from [
            row for row in records
            if all(row.get(k) is not None for k in self.primary_keys)
        ]
But wondering if others have tackled this before, or if this would be a generally useful tap feature?
For a generalizable approach, I was thinking that having some default enforcement on the
required=True
attribute in the catalog may make sense, with something like a
filter_records_with_schema_violations=True
flag
e
Hi @stephen_bailey! Have you looked at stream maps? About validating every record against a schema and drop mismatches, would you mind creating an issue
s
No, I haven't looked at that -- is this a config-level setting?
whaaaaaaaaaaaaaaaaaaat
this is amazing
stream_maps definitely would have solved my initial "broken pipe" problem and is a great tool for the end user. I'll make an issue that calls out the idea that maybe we could use
stream_maps
+ the catalog
required=True
to do this automatically.
a
@stephen_bailey - As @edgar_ramirez_mondragon mentions, Stream Maps should give you filtering capability. Since you own this tap, you could also return "None" from
post_process()
to just always filter out certain records based on a condition.
The behavior of
post_process()
is that you can alter the record or just not return it.
s
Oh, I was thinking that it would return an empty record if I went the post_process route. But if I return
None
and that simply omits the record, that would be perfect
a
Yeah - sorry, that might not be 100% clear from the docs. But returning
None
has the effect of filtering out the record.
Definitely not clear from the docs. I'll open an issue.
s
This works nicely
Copy code
def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
        if any(row.get(k) == None for k in self.primary_keys):
            return None
        return row