Hi Guys, I'm developing a custom tap and I'm looki...
# singer-tap-development
j
Hi Guys, I'm developing a custom tap and I'm looking to use the last replication-key value inside the Stream class on meltano sdk. How can I do that?
j
Hi Derek, thanks for the reply but I've set up like that:
Copy code
class ReceivableStream(CustomExtractorStream):                                                
      name = "receivable"                                                                                  
      primary_keys = ["id"]                                                                                
      replication_key = "s3_key"                                                                           
      schema_filepath = SCHEMAS_DIR / "receivable.json"
and the client.py class:
Copy code
class CustomExtractorStream(Stream):                                                          
      """Stream class for CustomExtractorStream streams."""                                           
                                                                                                           
>>    def get_records(self, context: Optional[dict]) -> Iterable[dict]:                                    
          """Return a generator of row-type dictionary objects.                                            
                                                                                                           
          The optional `context` argument is used to identify a specific slice of the                      
          stream if partitioning is required for the stream. Most implementations do not                   
          require partitioning and should ignore the `context` argument.                                   
          """                                                                                              
          # TODO: Write logic to extract data from the upstream source.                                    
          aws_access_key_id = self.config.get("aws_access_key_id")                                         
          aws_secret_access_key = self.config.get("aws_secret_access_key")                                 
          company_slug = self.config.get("company_slug")                                                   
          replication_key = self.get_starting_replication_key_value(context)
          print(replication_key)
But even after ran and populate the database, the replication key value is null.
e
@joao_torquato (assuming you're not using Meltano but running the tap directly) are you passing a state file to the invocation?
Copy code
your-tap --config config.json --state state.json
j
Hi @edgar_ramirez_mondragon! No, I'm running with meltano:
meltano elt tap-custom-extractor target-postgres
e
@joao_torquato thanks for confirming! Then make sure to pass a job_id so state can be retrieved:
Copy code
meltano elt tap-custom-extractor target-postgres --job_id=custom-to-postgres
j
Thanks Edgar, I got it, but even when I set a job_id and dump the state to check it, it is not working:
Copy code
meltano elt tap-custom-extractor target-postgres --job_id=custom-to-postgres --dump=state > state.json                                                        
Could not find state file for this pipeline
In which case it would not generate a state?
I ran
meltano invoke tap-custome-extractor
and the state is correct:
{"type": "STATE", "value": {"bookmarks": {"receivable": {"replication_key": "s3_upload_date", "replication_key_value": "2021-12-06T18:24:22.082797+00:00"}}}}
What could make this not work properly?
@edgar_ramirez_mondragon After some debug I check that the state is saving properly in database:
Copy code
338	teste1	SUCCESS	2021-12-07 18:13:03.629083	2021-12-07 18:13:10.056404	{"singer_state": {"bookmarks": {"receivable": {"replication_key": "s3_upload_date", "replication_key_value": "2021-12-07T14:52:45.173466+00:00"}}}}	1	5b18edc1bf2448709673d540bffff1ba	cli	2021-12-07 18:13:09.214089
But even when I run
meltano elt tap-custom-extractor target-postgres --job_id=teste1
more than once, I print the last state variable
replication_key_value
and got
None
And when I run:
meltano elt tap-smart-score-api-json-extractor target-postgres --job_id=teste1 --dump=state
it says:
Copy code
Could not find state file for this pipeline
Should I try to pass the state manually?
It seems that the command:
meltano elt tap-custom-extractor target-postgres --job_id=teste1
is working properly, but inside meltano sdk code: I can't get the last value of the replication_key, this is what I'm trying:
Copy code
state = self.get_context_state(context)
state_bookmark = state.get("replication_key_value") # it returns None
log:
Copy code
2021-12-07T18:36:29.719457Z [info     ] Incremental state has been updated at 2021-12-07 18:36:29.719331.
2021-12-07T18:36:29.719637Z [debug    ] Incremental state: {'bookmarks': {'receivable': {'replication_key': 's3_upload_date', 'replication_key_value': '2021-12-07T14:52:45.173466+00:00'}}}
2021-12-07T18:36:29.734873Z [debug    ] Deleted configuration at /home/joaotorquato/cashu_projects/meltano-poc/first-project/.meltano/run/elt/teste1/64ba762b-2bf2-4f91-b756-f83b26c9311a/target.ad2a335c-741a-49c2-a589-bd90eb99cef2.config.json
2021-12-07T18:36:29.735086Z [debug    ] Deleted configuration at /home/joaotorquato/cashu_projects/meltano-poc/first-project/.meltano/run/elt/teste1/64ba762b-2bf2-4f91-b756-f83b26c9311a/tap.7e9eea52-5457-4163-aadb-aedc22f92b79.config.json
2021-12-07T18:36:29.735190Z [info     ] Extract & load complete!       job_id=teste1 name=meltano run_id=64ba762b-2bf2-4f91-b756-f83b26c9311a
2021-12-07T18:36:29.735315Z [info     ] Transformation skipped.        job_id=teste1 name=meltano run_id=64ba762b-2bf2-4f91-b756-f83b26c9311a
e
@joao_torquato does
get_starting_replication_key_value
returns
None
too?
j
@edgar_ramirez_mondragon yes!
Copy code
meltano --log-level=debug elt tap-custom-extractor target-postgres --job_id=teste1                                    
                                                                                                                                                                
2021-12-07T19:02:37.007835Z [debug    ] Could not find state.json in /home/joaotorquato/cashu_projects/meltano-poc/first-project/.meltano/extractors/tap-custom-extractor/state.json, skipping.
It should check the state on meltano.db right?(https://meltano.com/docs/integration.html#incremental-replication-state) But in the log it looks for a
state.json
file
e
@joao_torquato Ok, this may be another type of error. Can you confirm that your custom plugin in
meltano.yml
declares the
state
capability?
Copy code
plugins:
  extractors:
    my-tap:
      capabilities:
        - catalog
        - discover
        - state # <- this guy
j
@edgar_ramirez_mondragon it was that! We didn't set the state as a capability. Thank you very much!
e
@joao_torquato Oh, I'm glad it was as simple as that! cc @taylor we def need to print a clear warning when
elt
(and
job_id
) are used and state is not a declared capability. (this or maybe a new issue)