sean_glynn
12/06/2021, 5:31 PMclass TimeViewedFlattenedStream(cloudflare_graphqlStream):
name = "timeViewedFlattenedStream"
schema_filepath = SCHEMAS_DIR / "timeViewedFlattened.json"
primary_keys = ["pk"]
replication_key = "dimensions_datetime"
last_updated = None
start_date = date_yesterday()
page_limit = "10000"
query = generate_graphql_query(start_date, page_limit)
I render the GraphQl query like so:
query {
{
graphX(
filter: {
date_geq: "2021-12-05"
}
orderBy:[fieldY]
limit: 10000 )
{ count
avg {
sampleInterval
}
dimensions{
uid
clientCountryName
deviceType
deviceOs
datetime
date
}
sum {
fieldZ
}
}
}
}
(Please excuse the bad indentation)
^This returns all of the records from yesterday's date until now^
The initial meltano invocation works as expected:
╰─$ meltano elt --job_id incrementaltest tap_cloudflare_graphql target-bigquery 1 ↵
2021-12-06T17:04:46.725968Z [info ] Running extract & load... job_id=incrementaltest name=meltano run_id=0cbc34a9-f6aa-473d-9549-2f8b780a11bb
2021-12-06T17:04:46.814121Z [warning ] No state was found, complete import.
2021-12-06T17:04:50.804806Z [info ] INFO Pushing state: {} cmd_type=loader job_id=incrementaltest name=target-bigquery run_id=0cbc34a9-f6aa-473d-9549-2f8b780a11bb stdio=stderr
2021-12-06T17:04:50.811995Z [info ] Incremental state has been updated at 2021-12-06 17:04:50.811874.
2021-12-06T17:04:50.973960Z [info ] Installing dependencies from lock file cmd_type=extractor job_id=incrementaltest name=tap_cloudflare_graphql run_id=0cbc34a9-f6aa-473d-9549-2f8b780a
But as I want to incrementally load this stream to BigQuery, I want to stream only the freshest records to BQ, so I set:
replication_key = "dimensions_datetime"
When I run for the first time I can see that meltano has taken the max(replication_key) and added it to job state:
2021-12-06T17:04:56.921098Z [info ] INFO Updating state with {'bookmarks': {'timeViewedFlattenedStream': {'replication_key_signpost': '2021-12-06T17:04:52.681858+00:00', 'starting_replication_value': None, 'progress_markers': {'Note': 'Progress is not resumable if interrupted.', 'replication_key': 'dimensions_datetime', 'replication_key_value': '2021-12-05T19:02:47Z'}}}} cmd_type=loader job_id=incrementaltest name=target-bigquery run_id=0cbc34a9-f6aa-473d-9549-2f8b780a11bb stdio=stderr
^
This stores the latest load timestamp in the job's state in the job table within MeltanoDB:
```sqlite> SELECT * FROM job WHERE job_id='incrementaltest';
92|incrementaltest|SUCCESS|2021-12-06 170445.140626|2021-12-06 170503.930382|{"singer_state": {"bookmarks": {"timeViewedFlattenedStream": {"progress_markers": {"Note": "Progress is not resumable if interrupted.", "replication_key": "dimensions_datetime", "replication_key_value": "2021-12-05T190247Z"}, "replication_key_signpost": "2021-12-06T170452.681858+00:00", "starting_replication_value": null, "replicatio…tim_frazer
12/06/2021, 8:10 PMedgar_ramirez_mondragon
12/06/2021, 8:12 PMDoes Meltano filter the stream based on the "replication_key" value?Not automatically, so you have to filter the request "manually"
If Meltano does not handle the incremental filtering logic, and I need to derive the last updated timestamp manually and apply the filter myselfYou should be able to parameterize your query string to use variables, and override
get_url_params
to retrieve the timestamp value and return the variables dictionary.sean_glynn
12/06/2021, 8:18 PMprepare_request_payload()
and utilizing the context object. I may use the get_url_params
function as per your suggestion.
I will update my post with my solution soon!edgar_ramirez_mondragon
12/06/2021, 8:23 PMsean_glynn
12/07/2021, 4:18 PMsinger_sdk.streams.graphql.prepare_request_payload
function adding the following code to the function:
def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any] = None
) -> Dict[str, Any]:
"""Override :meth:`singer_sdk.streams.graphql.prepare_request_payload`.
Appending graphql query filter refresh to function.
"""
# START - Query Update
start_ts = self.get_starting_timestamp(context)
# Get last_updated ts from Meltano state else None
self.last_updated = start_ts if start_ts else None
self.logger.debug(f"last_updated: {self.last_updated}")
# If last_updated => Update query filter
if self.last_updated:
self.start_date = self.last_updated
<http://self.logger.info|self.logger.info>(f"UPDATING QUERY FILTER: {self.last_updated}")
self.query
# Else => Use default date filter (derived from conf)
else:
self.start_date = self.get_start_date
# END - Query Update
..........REST OF EXISTING FUNCTION.............
When the prepare_request_payload() function is invoked, the latest class instance vars start_date
and page_limit
are rendered within the Graphql query via the query
property
@property
def query(self) -> str:
"""Render GraphQL Query with latest `start_date` and `page_limit` values."""
# Ensure filter value is not empty
assert self.start_date is not None
# Render GraphQL query
return generate_graphql_query(self.start_date, self.page_limit)
So basically, when ever the prepare_request_payload()
function is called, it will either render the GraphQL query with the latest replication_key
value (from Meltano state) OR use the default filter value from Tap conf.
Full solution:
```class GraphQLStream(cloudflare_graphqlStream):
logger = logging.getLogger("XStream")
name = "XStream"
schema_filepath = SCHEMAS_DIR / "schema.json"
primary_keys = ["pk"]
replication_key = "dimensions_datetime"
# Declare vars
last_updated: Optional[str] = None
start_date: Optional[str] = None
@property
def query(self) -> str:
"""Render GraphQL Query with latest start_date
and page_limit
values."""
# Render GraphQL query
return generate_graphql_query(self.start_date, self.page_limit)
@property
def page_limit(self) -> str:
"""Return the request payload page_size, configurable via tap settings."""
return self.config["page_size"]
@property
def get_start_date(self) -> str:
"""Return the initial start_date timestamp value - derived from tap settings."""
return self.config["start_date"]
def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any] = None
) -> Dict[str, Any]:
"""Override meth`singer_sdk.streams.graphql.prepare_request_payload`.
Appending graphql query filter refresh to function.
"""
# START - Query Update
start_ts = self.get_starting_timestamp(context)
# Get last_updated ts from Meltano state else None
self.last_updated = start_ts if start_ts else None
self.logger.debug(f"last_updated: {self.last_updated}")
# If last_updated => Update query filter
if self.last_updated:
self.start_date = self.last_updated
self.logger.info(f"UPDATING QUERY FILTER: {self.last_updated}")
self.query
# Else => Use default date filter (derived from conf)
else:
self.start_date = self.get_start_date
# END - Query Update
params = self.get_url_params(context, next_page_token)
if self.query is None:
raise ValueError("Gra…sean_glynn
12/07/2021, 4:20 PMupdate_query()
function within the singer_sdk.streams.graphql
class? I can branch off the singer sdk repo and add something here if you feel it is easily achievable.