joao_torquato
12/06/2021, 7:46 PMjoao_torquato
12/06/2021, 8:22 PMclass ReceivableStream(CustomExtractorStream):
name = "receivable"
primary_keys = ["id"]
replication_key = "s3_key"
schema_filepath = SCHEMAS_DIR / "receivable.json"
and the client.py class:
class CustomExtractorStream(Stream):
"""Stream class for CustomExtractorStream streams."""
>> def get_records(self, context: Optional[dict]) -> Iterable[dict]:
"""Return a generator of row-type dictionary objects.
The optional `context` argument is used to identify a specific slice of the
stream if partitioning is required for the stream. Most implementations do not
require partitioning and should ignore the `context` argument.
"""
# TODO: Write logic to extract data from the upstream source.
aws_access_key_id = self.config.get("aws_access_key_id")
aws_secret_access_key = self.config.get("aws_secret_access_key")
company_slug = self.config.get("company_slug")
replication_key = self.get_starting_replication_key_value(context)
print(replication_key)
But even after ran and populate the database, the replication key value is null.edgar_ramirez_mondragon
12/06/2021, 8:35 PMyour-tap --config config.json --state state.json
joao_torquato
12/06/2021, 9:15 PMmeltano elt tap-custom-extractor target-postgres
edgar_ramirez_mondragon
12/06/2021, 9:25 PMmeltano elt tap-custom-extractor target-postgres --job_id=custom-to-postgres
joao_torquato
12/06/2021, 9:32 PMmeltano elt tap-custom-extractor target-postgres --job_id=custom-to-postgres --dump=state > state.json
Could not find state file for this pipeline
In which case it would not generate a state?joao_torquato
12/07/2021, 4:41 PMmeltano invoke tap-custome-extractor
and the state is correct:
{"type": "STATE", "value": {"bookmarks": {"receivable": {"replication_key": "s3_upload_date", "replication_key_value": "2021-12-06T18:24:22.082797+00:00"}}}}
What could make this not work properly?joao_torquato
12/07/2021, 6:23 PM338 teste1 SUCCESS 2021-12-07 18:13:03.629083 2021-12-07 18:13:10.056404 {"singer_state": {"bookmarks": {"receivable": {"replication_key": "s3_upload_date", "replication_key_value": "2021-12-07T14:52:45.173466+00:00"}}}} 1 5b18edc1bf2448709673d540bffff1ba cli 2021-12-07 18:13:09.214089
But even when I run meltano elt tap-custom-extractor target-postgres --job_id=teste1
more than once, I print the last state variable replication_key_value
and got None
And when I run: meltano elt tap-smart-score-api-json-extractor target-postgres --job_id=teste1 --dump=state
it says:
Could not find state file for this pipeline
Should I try to pass the state manually?joao_torquato
12/07/2021, 6:38 PMmeltano elt tap-custom-extractor target-postgres --job_id=teste1
is working properly, but inside meltano sdk code:
I can't get the last value of the replication_key, this is what I'm trying:
state = self.get_context_state(context)
state_bookmark = state.get("replication_key_value") # it returns None
log:
2021-12-07T18:36:29.719457Z [info ] Incremental state has been updated at 2021-12-07 18:36:29.719331.
2021-12-07T18:36:29.719637Z [debug ] Incremental state: {'bookmarks': {'receivable': {'replication_key': 's3_upload_date', 'replication_key_value': '2021-12-07T14:52:45.173466+00:00'}}}
2021-12-07T18:36:29.734873Z [debug ] Deleted configuration at /home/joaotorquato/cashu_projects/meltano-poc/first-project/.meltano/run/elt/teste1/64ba762b-2bf2-4f91-b756-f83b26c9311a/target.ad2a335c-741a-49c2-a589-bd90eb99cef2.config.json
2021-12-07T18:36:29.735086Z [debug ] Deleted configuration at /home/joaotorquato/cashu_projects/meltano-poc/first-project/.meltano/run/elt/teste1/64ba762b-2bf2-4f91-b756-f83b26c9311a/tap.7e9eea52-5457-4163-aadb-aedc22f92b79.config.json
2021-12-07T18:36:29.735190Z [info ] Extract & load complete! job_id=teste1 name=meltano run_id=64ba762b-2bf2-4f91-b756-f83b26c9311a
2021-12-07T18:36:29.735315Z [info ] Transformation skipped. job_id=teste1 name=meltano run_id=64ba762b-2bf2-4f91-b756-f83b26c9311a
edgar_ramirez_mondragon
12/07/2021, 6:45 PMget_starting_replication_key_value
returns None
too?joao_torquato
12/07/2021, 7:05 PMjoao_torquato
12/07/2021, 7:10 PMmeltano --log-level=debug elt tap-custom-extractor target-postgres --job_id=teste1
2021-12-07T19:02:37.007835Z [debug ] Could not find state.json in /home/joaotorquato/cashu_projects/meltano-poc/first-project/.meltano/extractors/tap-custom-extractor/state.json, skipping.
It should check the state on meltano.db right?(https://meltano.com/docs/integration.html#incremental-replication-state) But in the log it looks for a state.json
fileedgar_ramirez_mondragon
12/07/2021, 7:23 PMmeltano.yml
declares the state
capability?
plugins:
extractors:
my-tap:
capabilities:
- catalog
- discover
- state # <- this guy
joao_torquato
12/07/2021, 7:31 PMedgar_ramirez_mondragon
12/07/2021, 7:34 PMelt
(and job_id
) are used and state is not a declared capability. (this or maybe a new issue)