Stéphane Burwash
04/11/2022, 7:26 PMStéphane Burwash
04/11/2022, 7:27 PMclass OwnersStream(HubspotStream):
"""Define custom stream."""
_LOG_REQUEST_METRIC_URLS=True
name = "owners"
path = "/crm/v3/owners"
primary_keys = ["id"]
replication_key = "updatedAt"
records_jsonpath = "$.results[*]"
next_page_token_jsonpath = "$.paging.next.link"
visch
04/11/2022, 7:31 PMHubspotStream
Stéphane Burwash
04/11/2022, 7:35 PMStéphane Burwash
04/11/2022, 7:35 PMclass HubspotStream(RESTStream):
"""Hubspot stream class."""
# TODO: Set the API's base URL here:
url_base = "<https://api.hubapi.com>"
# OR use a dynamic url_base:
# @property
# def url_base(self) -> str:
# """Return the API URL root, configurable via tap settings."""
# return self.config["api_url"]
records_jsonpath = "$[*]" # Or override `parse_response`.
next_page_token_jsonpath = "$.next_page" # Or override `get_next_page_token`.
@property
def schema_filepath(self) -> Path:
return SCHEMAS_DIR / f"{self.name}.json"
@property
def authenticator(self) -> BearerTokenAuthenticator:
"""Return a new authenticator object."""
return BearerTokenAuthenticator.create_for_stream(
self,
token=self.config.get("access_token"),
)
@property
def http_headers(self) -> dict:
"""Return the http headers needed."""
headers = {}
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
# If not using an authenticator, you may also provide inline auth headers:
# headers["Private-Token"] = self.config.get("auth_token")
return headers
def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""Return a token for identifying next page or None if no more pages."""
# TODO: If pagination is required, return a token which can be used to get the
# next page. If this is the final page, return "None" to end the
# pagination loop.
if self.next_page_token_jsonpath:
all_matches = extract_jsonpath(
self.next_page_token_jsonpath, response.json()
)
first_match = next(iter(all_matches), None)
next_page_token = first_match
else:
next_page_token = response.headers.get("X-Next-Page", None)
return next_page_token
def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
if self.replication_key:
params["sort"] = "asc"
params["order_by"] = self.replication_key
return params
def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
"""Prepare the data payload for the REST API request.
By default, no payload will be sent (return None).
"""
# TODO: Delete this method if no payload is required. (Most REST APIs.)
return None
def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
# TODO: Parse response body and return a set of records.
yield from extract_jsonpath(self.records_jsonpath, input=response.json())
def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""As needed, append or transform raw data to match expected structure."""
# TODO: Delete this method if not needed.
return row
visch
04/11/2022, 7:39 PMnext_page_token_jsonpath = "$.paging.next.link"
Have you looked at the response? It looks like $.paging.next.link is a full URL for the next page. You add that as a parameter of page
(which hubspot's api probably ignores) , then hubspot's api gives you the same response back, and you add page
again to the full url (including the old page)visch
04/11/2022, 7:40 PMdef get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
Is where this gets set. Hope that helps!Stéphane Burwash
04/11/2022, 7:40 PMStéphane Burwash
04/11/2022, 7:41 PMvisch
04/11/2022, 7:41 PMvisch
04/11/2022, 7:41 PMvisch
04/11/2022, 7:42 PMStéphane Burwash
04/11/2022, 7:43 PMvisch
04/11/2022, 7:44 PMvisch
04/11/2022, 7:45 PMparams["page"] = next_page_token
is wrongvisch
04/11/2022, 7:45 PMStéphane Burwash
04/11/2022, 8:01 PM