Stéphane Burwash
05/20/2022, 9:07 PMclass DealsStream(HubspotStream):
"""Define custom stream."""
name = "deals"
path = "/crm/v3/objects/deals"
primary_keys = ["id"]
partitions = [{"archived": True}, {"archived": False}]
def get_url_params(self, context: Optional[dict], next_page_token: Optional[Any]) -> Dict[str, Any]:
params = super().get_url_params(context, next_page_token)
params['properties'] = ','.join(self.properties)
params['archived'] = context['archived']
return params
@property
def schema(self) -> dict:
if self.cached_schema is None:
self.cached_schema, self.properties = self.get_custom_schema()
return self.cached_schema
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {
"deal_id": record["id"],
"archived": record["archived"]
}
Child:
class AssociationsDealsToCompaniesStream(HubspotStream):
name="associations_deals_companies"
path = "/crm/v4/objects/deals/{deal_id}/associations/companies"
deal_id = ""
replication_method = "FULL_TABLE"
replication_key = ""
parent_stream_type = DealsStream
ignore_parent_replication_keys = True
def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params = super().get_url_params(context, next_page_token)
self.deal_id = context['deal_id']
return params
def parse_response(self, response: requests.Response) -> Iterable[dict]:
data = response.json()['results']
ret = []
for e in data:
elem = e
elem['id'] = self.deal_id
ret.append(elem)
return ret