Hello all, I am a Meltano newbie and I am currentl...
# troubleshooting
s
Hello all, I am a Meltano newbie and I am currently trying to develop a custom GraphQL Tap (tap_cloudflare_graphql) that hits a REST endpoint and incrementally loads records to the relevant target (Using target-bigquery as the target right now). This tap currently works with the following stream definition:  
Copy code
class TimeViewedFlattenedStream(cloudflare_graphqlStream):
   name = "timeViewedFlattenedStream"
   schema_filepath = SCHEMAS_DIR / "timeViewedFlattened.json"
   
   primary_keys = ["pk"]
   replication_key = "dimensions_datetime"
   last_updated = None
   start_date = date_yesterday() 
   page_limit = "10000"
   query = generate_graphql_query(start_date, page_limit)
  I render the GraphQl query like so:
Copy code
query {
   {
     graphX(
           filter: {
           date_geq: "2021-12-05"
           }
           orderBy:[fieldY]
           limit: 10000 )
           { count
           avg {
               sampleInterval
           }
           dimensions{
               uid
               clientCountryName
               deviceType
               deviceOs
               datetime
               date
           }
       sum {
           fieldZ
           }
           }
           }
       
       }
  (Please excuse the bad indentation)   ^This returns all of the records from yesterday's date until now^   The initial meltano invocation works as expected:
Copy code
╰─$ meltano elt --job_id incrementaltest tap_cloudflare_graphql target-bigquery                                                                                                         1 ↵
2021-12-06T17:04:46.725968Z [info     ] Running extract & load...      job_id=incrementaltest name=meltano run_id=0cbc34a9-f6aa-473d-9549-2f8b780a11bb
2021-12-06T17:04:46.814121Z [warning  ] No state was found, complete import.
2021-12-06T17:04:50.804806Z [info     ] INFO Pushing state: {}         cmd_type=loader job_id=incrementaltest name=target-bigquery run_id=0cbc34a9-f6aa-473d-9549-2f8b780a11bb stdio=stderr
2021-12-06T17:04:50.811995Z [info     ] Incremental state has been updated at 2021-12-06 17:04:50.811874.
2021-12-06T17:04:50.973960Z [info     ] Installing dependencies from lock file cmd_type=extractor job_id=incrementaltest name=tap_cloudflare_graphql run_id=0cbc34a9-f6aa-473d-9549-2f8b780a
  But as I want to incrementally load this stream to BigQuery, I want to stream only the freshest records to BQ, so I set:
replication_key = "dimensions_datetime"
  When I run for the first time I can see that meltano has taken the max(replication_key) and added it to job state:
Copy code
2021-12-06T17:04:56.921098Z [info    ] INFO Updating state with {'bookmarks': {'timeViewedFlattenedStream': {'replication_key_signpost': '2021-12-06T17:04:52.681858+00:00', 'starting_replication_value': None, 'progress_markers': {'Note': 'Progress is not resumable if interrupted.', 'replication_key': 'dimensions_datetime', 'replication_key_value': '2021-12-05T19:02:47Z'}}}} cmd_type=loader job_id=incrementaltest name=target-bigquery run_id=0cbc34a9-f6aa-473d-9549-2f8b780a11bb stdio=stderr
^ This stores the latest load timestamp in the job's state in the job table within MeltanoDB: ```sqlite> SELECT * FROM job WHERE job_id='incrementaltest'; 92|incrementaltest|SUCCESS|2021-12-06 170445.140626|2021-12-06 170503.930382|{"singer_state": {"bookmarks": {"timeViewedFlattenedStream": {"progress_markers": {"Note": "Progress is not resumable if interrupted.", "replication_key": "dimensions_datetime", "replication_key_value": "2021-12-05T190247Z"}, "replication_key_signpost": "2021-12-06T170452.681858+00:00", "starting_replication_value": null, "replicatio…
t
if you managed to solve the issue could you post the solution for others?
e
Hi @sean_glynn! First of all, thanks for trying out the GraphQL capabilities in the Meltano SDK 😄.
Does Meltano filter the stream based on the "replication_key" value?
Not automatically, so you have to filter the request "manually"
If Meltano does not handle the incremental filtering logic, and I need to derive the last updated timestamp manually and apply the filter myself
You should be able to parameterize your query string to use variables, and override
get_url_params
to retrieve the timestamp value and return the variables dictionary.
s
I will certainly post up the solution once I test everything just to be sure @tim_frazer! Thanks for answering these questions @edgar_ramirez_mondragon , yes I managed to pull the latest timestamp via overriding the
prepare_request_payload()
and utilizing the context object. I may use the
get_url_params
function as per your suggestion. I will update my post with my solution soon!
e
For sure let us know and share your working solution! We definitely need to give GraphQL taps some love as we've been focusing a lot of efforts towards conventional REST APIs 😅
s
Hey @edgar_ramirez_mondragon, So I can confirm my solution works as expected. I firstly overrode the
singer_sdk.streams.graphql.prepare_request_payload
function adding the following code to the function:
Copy code
def prepare_request_payload(
        self, context: Optional[dict], next_page_token: Optional[Any] = None
    ) -> Dict[str, Any]:
        """Override :meth:`singer_sdk.streams.graphql.prepare_request_payload`.

        Appending graphql query filter refresh to function.
        """
        # START - Query Update
        start_ts = self.get_starting_timestamp(context)
        # Get last_updated ts from Meltano state else None
        self.last_updated = start_ts if start_ts else None
        self.logger.debug(f"last_updated: {self.last_updated}")

        # If last_updated => Update query filter
        if self.last_updated:
            self.start_date = self.last_updated
            <http://self.logger.info|self.logger.info>(f"UPDATING QUERY FILTER: {self.last_updated}")
            self.query
        # Else => Use default date filter (derived from conf)
        else:
            self.start_date = self.get_start_date
        # END - Query Update
..........REST OF EXISTING FUNCTION.............
When the prepare_request_payload() function is invoked, the latest class instance vars
start_date
and
page_limit
are rendered within the Graphql query via the
query
property
Copy code
@property
    def query(self) -> str:
        """Render GraphQL Query with latest `start_date` and `page_limit` values."""
        # Ensure filter value is not empty
        assert self.start_date is not None

        # Render GraphQL query
        return generate_graphql_query(self.start_date, self.page_limit)
So basically, when ever the
prepare_request_payload()
function is called, it will either render the GraphQL query with the latest
replication_key
value (from Meltano state) OR use the default filter value from Tap conf. Full solution: ```class GraphQLStream(cloudflare_graphqlStream): logger = logging.getLogger("XStream") name = "XStream" schema_filepath = SCHEMAS_DIR / "schema.json" primary_keys = ["pk"] replication_key = "dimensions_datetime" # Declare vars last_updated: Optional[str] = None start_date: Optional[str] = None @property def query(self) -> str: """Render GraphQL Query with latest
start_date
and
page_limit
values.""" # Render GraphQL query return generate_graphql_query(self.start_date, self.page_limit) @property def page_limit(self) -> str: """Return the request payload page_size, configurable via tap settings.""" return self.config["page_size"] @property def get_start_date(self) -> str: """Return the initial start_date timestamp value - derived from tap settings.""" return self.config["start_date"] def prepare_request_payload( self, context: Optional[dict], next_page_token: Optional[Any] = None ) -> Dict[str, Any]: """Override meth`singer_sdk.streams.graphql.prepare_request_payload`. Appending graphql query filter refresh to function. """ # START - Query Update start_ts = self.get_starting_timestamp(context) # Get last_updated ts from Meltano state else None self.last_updated = start_ts if start_ts else None self.logger.debug(f"last_updated: {self.last_updated}") # If last_updated => Update query filter if self.last_updated: self.start_date = self.last_updated self.logger.info(f"UPDATING QUERY FILTER: {self.last_updated}") self.query # Else => Use default date filter (derived from conf) else: self.start_date = self.get_start_date # END - Query Update params = self.get_url_params(context, next_page_token) if self.query is None: raise ValueError("Gra…
Would it be worth adding a new
update_query()
function within the
singer_sdk.streams.graphql
class? I can branch off the singer sdk repo and add something here if you feel it is easily achievable.