fred_reimer
01/28/2022, 4:18 PMdef get_child_context(self, record: dict, context: Optional[dict]) -> dict:
# Create child context for each account
return {"accountId": record["accountId"]}
and this in the child:
class AccountUtilizationStream(BaseStream):
"""Account Utilization."""
name = "account_utilization"
parent_stream_type = AccountsStream
ignore_parent_replication_keys = True
path = "/v1/search?accountIds={accountId}"
primary_keys = ["account_id"]
It is failing, because it is not replacing {accountId}
in the path. I do see the following in the logs:
Beginning incremental sync of 'account_utilization' with context: {'accountId': 123456}
and the following METRIC:
METRIC: {'type': 'timer', 'metric': 'http_request_duration', 'value': 0.190973, 'tags': {'endpoint': '/v1/search?accountIds={accountId}', 'http_status_code': 404, 'status': 'failed', 'context': {'accountId': 123456}}}
Any ideas?aaronsteers
01/28/2022, 4:32 PM?
in the above, and then the account id can be passed in get_params, with assist from context
.fred_reimer
01/28/2022, 4:33 PMaaronsteers
01/28/2022, 4:35 PMaaronsteers
01/28/2022, 4:37 PMpath
. (Open to an MR or issue though if you think this could be improved.)fred_reimer
01/28/2022, 4:47 PMdef get_url(self, context: Optional[dict]) -> str:
"""Get stream entity URL.
Developers override this method to perform dynamic URL generation.
Args:
context: Stream partition or context dictionary.
Returns:
A URL, optionally targeted to a specific partition or context.
"""
url = "".join([self.url_base, self.path or ""])
vals = copy.copy(dict(self.config))
vals.update(context or {})
for k, v in vals.items():
search_text = "".join(["{", k, "}"])
if search_text in url:
url = url.replace(search_text, self._url_encode(v))
return url
fred_reimer
01/28/2022, 5:17 PMaaronsteers
01/28/2022, 5:44 PM