annamarie
10/03/2023, 4:45 PMschema defined in streams.py?
For example if the config has a name that is used to hit an api-endppoint, is there a way to pass this name into the return schema so it can be persisted in the db?edgar_ramirez_mondragon
10/03/2023, 4:58 PMname as a schema field name or as a value in a field with a fixed name (e.g. name_from_config)?annamarie
10/03/2023, 5:02 PMname but the field name itself is not particularly crucial, I just need it in the schema for a join I'll be performing later onedgar_ramirez_mondragon
10/03/2023, 5:11 PMclass MyStream:
schema = th.PropertiesList(
th.Property("name_from_config", th.String),
...
).to_dict()
def post_process(self, row, context):
row["name_from_config"] = self.config["name"]
return rowannamarie
10/03/2023, 5:18 PMannamarie
11/06/2023, 6:17 PMrow["name_from_config"] = self.config["name"] no longer solves my problem.
But this does not work, context is empty
def post_process(
self,
row: dict,
context: dict | None,
) -> dict | None:
print("CONTEXT:", context)
row['input_domain'] = context['website_domain']
return row
despite setting the context here, and successfully using this context to search against the correct domain:
def request_records(self, context: dict | None) -> t.Iterable[dict]:
"""Request records from REST endpoint(s), returning response records.
If pagination is detected, pages will be recursed automatically.
Args:
context: Stream partition or context dictionary.
Yields:
An item for every record in the response.
"""
decorated_request = self.request_decorator(self._request)
with metrics.http_request_counter(self.name, self.path) as request_counter:
request_counter.context = context
domains = self.config["domains"].split(",")
if context is None:
context = {}
for domain in domains:
context["website_domain"] = domain
prepared_request = self.prepare_request(
context,
next_page_token=None,
)
resp = decorated_request(prepared_request, context)
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
yield from self.parse_response(resp)