david_gauthier-dussureault
01/27/2023, 8:15 PMreplication_key
is set:
class GetProjectMentionsStream(BrandMentionsStream):
"""Define custom stream."""
name = "GetProjectMentions"
primary_keys = ["project_id", "url"]
replication_key = "published"
schema = th.PropertiesList(
th.Property("project_id", th.IntegerType,),
th.Property("published", th.DateTimeType,),
th.Property("url", th.StringType,),
th.Property("title", th.StringType,),
th.Property("image_url", th.StringType,),
th.Property("performance", th.IntegerType,),
th.Property("language", th.StringType,),
th.Property("country", th.StringType,),
th.Property("social_network", th.StringType,),
th.Property("text", th.StringType,),
th.Property("sentiment", th.StringType,),
th.Property("linked", th.IntegerType,),
th.Property("hashtags", th.ArrayType(th.StringType),),
th.Property("author", th.ObjectType(),),
th.Property("social", th.ObjectType(),),
).to_dict()
When I run meltano state get dev:tap-brandmentions-to-target-jsonl
, I see that the state bookmark is saved properly:
{
"singer_state": {
"bookmarks": {
"GetProjectMentions": {
"replication_key": "published",
"replication_key_value": "2023-01-27 00:04:00"
}
}
}
}
But when I do subsequent runs meltano run tap-brandmentions target-jsonl
, the full dataset is always sent to my target-jsonl.
What am I missing for my stream to only send incremental data to my target?stephen_sciortino
01/27/2023, 8:25 PM--state-id dev:tap-brandmentions-to-target-jsonl
elt
😅, thanks Pat!pat_nadolny
01/27/2023, 9:10 PMelt
😄. Using run
should automatically get the correct state from the system db without specifying a state ID. @david_gauthier-dussureault what base stream class are you inheriting from in BrandMentionsStream
? Check out AJ's thread here where he explains how to use your replication key in your tap implementation, the SDK mostly just helps track and store it but you'll need to use it for your custom filtering in requests depending on your source. Also make sure your meltano.yml has the state
capability list in its configuration so Meltano knows to track and use your state between runs, although it looks like thats probably working for you already based on your question.aaronsteers
01/27/2023, 9:20 PMdaniel_luftspring
01/27/2023, 10:52 PMcontext
is always None when accessed in the get_url_params
method. I was reading through that other thread and I'm sure the field is declared as a timestamp in the schema. Are there any troubleshooting steps you'd recommend when context isn't being properly captured?edgar_ramirez_mondragon
01/27/2023, 11:47 PM2023-01-27 17:43:35,081 | INFO | tap-stackexchange | Beginning incremental sync of 'answers' with context: {'question_id': 69025024}...
daniel_luftspring
01/27/2023, 11:59 PM<http://self.logger.info|self.logger.info>(context)
the context was always None
even when the state was being properly set in the db 🤷. I'm just trying to get a sense of the possible reasons for context being emptydavid_gauthier-dussureault
01/30/2023, 10:05 PMstate
capability in my meltano.yml
and this is why get_starting_timesamp() method was returning None. Don't forget to set it, don't be me.
◦ On that point, I have a follow up question. What does it matters that I need to list this capability in my meltano.yml
? I mean, the capability is already in my tap's code and the base classes of the sdk, etc. Right? Why do I need to explicitly tells that it has the state
capability even thought it's already somewhat kind of backed in the tap's code?
• Now that I have the state from the get_starting_timesamp() method, I can effectively pass it in the get_url_params()
method to filter my API calls to only return records that are greater than or equal to my saved state. This is great!!
• On why the context
is None, it seems like it's because I'm not using Stream Partitioning or Parent-Child Streams, so no context
. It was a bit confusing as the get_starting_timesamp() method have the context
as an argument, but if you go down the rabbit hole of all the embedded methods, you will find out that it does not required a context
in order to provide the self.stream_state
. Mystery solve.
• Getting back to the actual reason of my initial question, I was under the impression that the tap would be somewhat able to filter "in memory" the incremental records based on the replication_key
, independently of me overriding the get_url_params()
. As an example:
a. Let say I have my replication_key
setup properly to "updated_at" and the first time I ran my pipeline, I get a 1000 records from the API with dates ranging from 2023-01-01
to 2023-01-30
in the "updated_at" field.
b. 1000 records are sent to the target.
c. The state is saved properly with a value of 2023-01-30
for this stream.
d. The following day, I rerun the pipeline and get 1100 records from the API with dates ranging from 2023-01-01
to 2023-01-31
in the "updated_at" field.
e. Here (in my mind), the tap was supposed to be able to detect that there are only a 100 new records with "updated_at" > state
and then only send those 100 records to the target.
f. The state is saved properly with a value of 2023-01-31
for this stream.
g. Repeat...
• Based on my experience and reading from other threads, point e) is not what is happening. In fact, the whole 1100 records are sent to the target. This means for a cloud target like Snowflake, our merge
statement would be done with an ever increasing number or records instead of only on the newest 100 records, and in the end, this would lead to higher cloud cost.
◦ I'm wondering why the tap cannot detect/filter the newest records "in memory"? In theory, it would have all the information necessary that is, the full dataset of records, the replication_key
and the last saved state
, so is there a reason I'm missing why it can't do that natively?
• Lastly, I struggled for a few days before asking questions internally at my company and here on Slack on how to properly setup a incremental stream, I think a bit more documentation in the Meltano Singer SDK would be a great improvement to the user experience. I think either sometime in the SDK Code Samples with example of an implementation of <https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.get_starting_time…edgar_ramirez_mondragon
01/31/2023, 12:17 AMOn why theMaybe documentation could help here? Perhaps as part of https://sdk.meltano.com/en/latest/context_object.htmlis Nonecontext
On my tap, I had not configured theThere’s https://github.com/meltano/meltano/issues/2986 that might be good to revisitcapability in mystate
and this is why get_starting_timesamp() method was returning None. Don’t forget to set it, don’t be me.meltano.yml
What does it matters that I need to list this capability in myWe’ve discussed the general issue of translating singer tap/target capabilities and settings to Meltano, but the problem (not that big, imo) is that it would only benefit SDK-based packages: https://github.com/meltano/meltano/issues/7156? I mean, the capability is already in my tap’s code and the base classes of the sdk, etc. Right? Why do I need to explicitly tells that it has themeltano.yml
capability even thought it’s already somewhat kind of backed in the tap’s code?state
Here (in my mind), the tap was supposed to be able to detect that there are only a 100 new records withThis is something we definitely want in the SDK: https://github.com/meltano/sdk/issues/226 @david_gauthier-dussureault thanks for the extremely detailed explanation of the problems you encountered.and then only send those 100 records to the target."updated_at" > state
aaronsteers
01/31/2023, 1:05 AMaaronsteers
01/31/2023, 1:12 AMSTATE
capability: "Your tap doesn't support incremental sync. See https://melta.no#state_capability for more info." That would be better than silently not passing state to the tap.
2. A meltano compile --check=all
or meltano check
command or similar is probably warranted for us in the near future. What might not be reasonable to put in the codepath of normal operations like meltano run
and meltano add
could be completely okay to run in an out-of-cycle "check my stuff please" command. That would be a decent place to iterate though all plugins, run --about
on them, and compare the output with what is declared in meltano.yml
.
I'll log issues on both of the above unless anyone disagrees - or beats me to it. 😄david_gauthier-dussureault
01/31/2023, 2:34 PMaaronsteers
01/31/2023, 4:59 PMI think a bit more documentation in the Meltano Singer SDK would be a great improvement to the user experience. I think either sometime in the SDK Code Samples with example of an implementation of get_starting_timesamp() or a dedicated section similar to Parent-Child Streams would be awesome.Yeah, I agree both could be really helpful.