Hey Meltano team, We use tap-salesforce plugin wit...
# getting-started
ю
Hey Meltano team, We use tap-salesforce plugin with incremental load, but before merge statement, delete statement is run. We found out that job_id is none https://github.com/MeltanoLabs/tap-salesforce/blob/64ca3f9780fb8a5c8a978a6e0aa484d0bdc086b3/tap_salesforce/__init__.py#L452 and as a result activate_version_message is sent. Could you help to understand how we can set job_id?
e
Hi @Юлия Герман! Perhaps the stream doesn't support incremental replication? I see there's a few fields that, if present, determine whether the stream can be synced incrementally: https://github.com/MeltanoLabs/tap-salesforce/blob/64ca3f9780fb8a5c8a978a6e0aa484d0bdc086b3/tap_salesforce/__init__.py#L55-L69 Also, which loader are you using this tap with? It might support a different strategy for handling activate_version messages.
ю
@Edgar Ramírez (Arch.dev), Concerning replication key - no problem, it defined correctly. We have one of them -'SystemModstamp'. As a loader we use https://github.com/MeltanoLabs/target-snowflake
@Edgar Ramírez (Arch.dev) As this line `job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') returns None`https://github.com/MeltanoLabs/tap-salesforce/blob/64ca3f9780fb8a5c8a978a6e0aa484d0bdc086b3/tap_salesforce/__init__.py#L452 else part triggered, but if we could set jobId in bookmark -then
if job_id returns true
and no activate_version_message will be send. Could you help me to understand where I can set jobId for tap-salesforce?😊
btw, I could not find target-snowflake override activate_version_message, it looks like this code is used https://github.com/meltano/sdk/blob/bfc1bc1e34ed4c6a81feee2b697d3799b780cf7e/singer_sdk/sinks/sql.py#L363
e
Ok, so you could set it manually in the state file (see meltano state) as
Copy code
{
  "bookmarks": {
    "<your-stream-id>": {
      "JobID": "<your-job-id>"
    }
  }
}
but I wouldn't know what value to set it to https://github.com/MeltanoLabs/tap-salesforce/blob/64ca3f9780fb8a5c8a978a6e0aa484d0bdc086b3/tap_salesforce/__init__.py#L91-L93
So you could try setting
hard_delete: false
in your loader config
ю
yeap, but I do not want to have sent this message activate_version_message.
if I set hard_delete: false, all previous version will be removed, right?
How to set it manually,
{"bookmarks": {"<your-stream-id>": {"JobID": "<your-job-id>"}}}
. How I can add it in yaml config?
e
if I set hard_delete: false, all previous version will be removed, right?
No, it will do an upsert: https://github.com/meltano/sdk/blob/bfc1bc1e34ed4c6a81feee2b697d3799b780cf7e/singer_sdk/sinks/sql.py#L404-L409
How to set it manually,
{"bookmarks": {"<your-stream-id>": {"JobID": "<your-job-id>"}}}
. How I can add it in yaml config?
Can't add it directly in meltano.yml, but you can put it in a json file and reference it with https://docs.meltano.com/concepts/plugins/#state-extra
ю
if I set hard_delete to false, some date will be set to _sdc_deleted_at column. It may confuse🙂
e
Gotcha, yeah it may be confusing
ю
https://docs.meltano.com/concepts/plugins/#state-extra Do we have any options without adding file?
btw, if we already have state in db, would it be merged anyhow or state from db will be ignored?
e
> btw,
hard_delete: false
- it is default behaviour That change was shipped in singer-sdk
v0.35.0
but target-snowflake is at an older version
btw, if we already have state in db, would it be merged anyhow or state from db will be ignored?
If you have state in the db, you could try using the
meltano state
command. See the
meltano state --help
for options. You probably want to send
meltano state get
output to a file, followed by editing the file manually, followed by
meltano state set
ю
@Edgar Ramírez (Arch.dev), am I correct that meltano state is generated automatically? (we do not generate it manually anyhow). So, in this case, may be we use incorrect version where state is generated without jobId. Could you help to understand what should be done to have jobId in state automatically?
e
ю
e
Right
ю
hm, we use Bulk
No, job_id that used fro balk and job_id from state - they are two different ids. Looks like job_id for batch used in api, for example
endpoint = "job/{}/batch/{}/result".format(job_id, batch_id)
e
ю
but it works only for
batch_status['state'] == 'Failed':
, right?
So, if it is true, only for failed batch, state will be with jobId?
e
Correct. Putting the job_id in the state only makes sense if the bulk sync should be resumed. Otherwise, the full table replication is preferred.
ю
But if table is big, no sense to reload it daily. It is better to use incremental mode, but in current implementation, only latest data will be in db, others will be deleted
So, it means, incremental replication method is not supported for salesforce, right?
e
Could there be a bug in this line? https://github.com/MeltanoLabs/tap-salesforce/blob/64ca3f9780fb8a5c8a978a6e0aa484d0bdc086b3/tap_salesforce/__init__.py#L476C12-L476C27 Should it perhaps be
Copy code
if not replication_key or bookmark_is_empty:
ю
btw, it is logical, if no replication key, then run full load. Otherwise, how to understand final point of previous run?
👍 1
Could you create bug for it?😊
e
ю
@Edgar Ramírez (Arch.dev) Thank you a lot for help😊
np 1
e
You're welcome!