I have a custom tap with a stream I want to run in...
# singer-tap-development
d
I have a custom tap with a stream I want to run incrementally. My stream definition is this and the
replication_key
is set:
Copy code
class GetProjectMentionsStream(BrandMentionsStream):
    """Define custom stream."""
    name = "GetProjectMentions"
    primary_keys = ["project_id", "url"]
    replication_key = "published"
    schema = th.PropertiesList(
        th.Property("project_id", th.IntegerType,),
        th.Property("published", th.DateTimeType,),
        th.Property("url", th.StringType,),
        th.Property("title", th.StringType,),
        th.Property("image_url", th.StringType,),
        th.Property("performance", th.IntegerType,),
        th.Property("language", th.StringType,),
        th.Property("country", th.StringType,),
        th.Property("social_network", th.StringType,),
        th.Property("text", th.StringType,),
        th.Property("sentiment", th.StringType,),
        th.Property("linked", th.IntegerType,),
        th.Property("hashtags", th.ArrayType(th.StringType),),
        th.Property("author", th.ObjectType(),),
        th.Property("social", th.ObjectType(),),
    ).to_dict()
When I run
meltano state get dev:tap-brandmentions-to-target-jsonl
, I see that the state bookmark is saved properly:
Copy code
{
  "singer_state": {
    "bookmarks": {
      "GetProjectMentions": {
        "replication_key": "published",
        "replication_key_value": "2023-01-27 00:04:00"
      }
    }
  }
}
But when I do subsequent runs
meltano run tap-brandmentions target-jsonl
, the full dataset is always sent to my target-jsonl. What am I missing for my stream to only send incremental data to my target?
s
Hey @david_gauthier-dussureault I think you have to pass in the state_id explicitly for it to find the replication key value. In this case it would be
--state-id dev:tap-brandmentions-to-target-jsonl
Nevermind my response is only relevant for
elt
😅, thanks Pat!
p
@stephen_sciortino I think thats only for
elt
😄. Using
run
should automatically get the correct state from the system db without specifying a state ID. @david_gauthier-dussureault what base stream class are you inheriting from in
BrandMentionsStream
? Check out AJ's thread here where he explains how to use your replication key in your tap implementation, the SDK mostly just helps track and store it but you'll need to use it for your custom filtering in requests depending on your source. Also make sure your meltano.yml has the
state
capability list in its configuration so Meltano knows to track and use your state between runs, although it looks like thats probably working for you already based on your question.
a
Thanks, @pat_nadolny. That's probably the perfect thread to link back to. 🙏 thankyou @stephen_sciortino - let us know if that helps at all, and specifically the part regarding passing the starting key back to your api via get_url_params() or similar.
d
The issue here is that the value of
context
is always None when accessed in the
get_url_params
method. I was reading through that other thread and I'm sure the field is declared as a timestamp in the schema. Are there any troubleshooting steps you'd recommend when context isn't being properly captured?
e
@daniel_luftspring we do have a short guide on debugging with vscode: https://sdk.meltano.com/en/latest/dev_guide.html#debugging. Also, the logs should display the context at the start of each stream sync:
Copy code
2023-01-27 17:43:35,081 | INFO     | tap-stackexchange    | Beginning incremental sync of 'answers' with context: {'question_id': 69025024}...
d
Yeah it's interesting when we were working on this - even using the logger e.g.
<http://self.logger.info|self.logger.info>(context)
the context was always
None
even when the state was being properly set in the db 🤷. I'm just trying to get a sense of the possible reasons for context being empty
d
Thank you all for your replies and your help. I have read the other thread that where AJ goes a bit more in depth. Here are a few thoughts and items to discuss/consider: • On my tap, I had not configured the
state
capability in my
meltano.yml
and this is why get_starting_timesamp() method was returning None. Don't forget to set it, don't be me. ◦ On that point, I have a follow up question. What does it matters that I need to list this capability in my
meltano.yml
? I mean, the capability is already in my tap's code and the base classes of the sdk, etc. Right? Why do I need to explicitly tells that it has the
state
capability even thought it's already somewhat kind of backed in the tap's code? • Now that I have the state from the get_starting_timesamp() method, I can effectively pass it in the
get_url_params()
method to filter my API calls to only return records that are greater than or equal to my saved state. This is great!! • On why the
context
is None, it seems like it's because I'm not using Stream Partitioning or Parent-Child Streams, so no
context
. It was a bit confusing as the get_starting_timesamp() method have the
context
as an argument, but if you go down the rabbit hole of all the embedded methods, you will find out that it does not required a
context
in order to provide the
self.stream_state
. Mystery solve. • Getting back to the actual reason of my initial question, I was under the impression that the tap would be somewhat able to filter "in memory" the incremental records based on the
replication_key
, independently of me overriding the
get_url_params()
. As an example: a. Let say I have my
replication_key
setup properly to "updated_at" and the first time I ran my pipeline, I get a 1000 records from the API with dates ranging from
2023-01-01
to
2023-01-30
in the "updated_at" field. b. 1000 records are sent to the target. c. The state is saved properly with a value of
2023-01-30
for this stream. d. The following day, I rerun the pipeline and get 1100 records from the API with dates ranging from
2023-01-01
to
2023-01-31
in the "updated_at" field. e. Here (in my mind), the tap was supposed to be able to detect that there are only a 100 new records with
"updated_at" > state
and then only send those 100 records to the target. f. The state is saved properly with a value of
2023-01-31
for this stream. g. Repeat... • Based on my experience and reading from other threads, point e) is not what is happening. In fact, the whole 1100 records are sent to the target. This means for a cloud target like Snowflake, our
merge
statement would be done with an ever increasing number or records instead of only on the newest 100 records, and in the end, this would lead to higher cloud cost. ◦ I'm wondering why the tap cannot detect/filter the newest records "in memory"? In theory, it would have all the information necessary that is, the full dataset of records, the
replication_key
and the last saved
state
, so is there a reason I'm missing why it can't do that natively? • Lastly, I struggled for a few days before asking questions internally at my company and here on Slack on how to properly setup a incremental stream, I think a bit more documentation in the Meltano Singer SDK would be a great improvement to the user experience. I think either sometime in the SDK Code Samples with example of an implementation of <https://sdk.meltano.com/en/latest/classes/singer_sdk.Stream.html#singer_sdk.Stream.get_starting_time…
e
On why the
context
is None
Maybe documentation could help here? Perhaps as part of https://sdk.meltano.com/en/latest/context_object.html
On my tap, I had not configured the
state
capability in my
meltano.yml
and this is why get_starting_timesamp() method was returning None. Don’t forget to set it, don’t be me.
There’s https://github.com/meltano/meltano/issues/2986 that might be good to revisit
What does it matters that I need to list this capability in my
meltano.yml
? I mean, the capability is already in my tap’s code and the base classes of the sdk, etc. Right? Why do I need to explicitly tells that it has the
state
capability even thought it’s already somewhat kind of backed in the tap’s code?
We’ve discussed the general issue of translating singer tap/target capabilities and settings to Meltano, but the problem (not that big, imo) is that it would only benefit SDK-based packages: https://github.com/meltano/meltano/issues/7156
Here (in my mind), the tap was supposed to be able to detect that there are only a 100 new records with
"updated_at" > state
and then only send those 100 records to the target.
This is something we definitely want in the SDK: https://github.com/meltano/sdk/issues/226 @david_gauthier-dussureault thanks for the extremely detailed explanation of the problems you encountered.
a
I think @edgar_ramirez_mondragon said everything I would have 😅 rockon
@david_gauthier-dussureault - Your detailed comments and feedback are exactly on point - and much appreciated. Two things we don't have logged as of yet and which I thought of as I was reading your above: 1. A runtime warning is probably warranted whenever a tap runs that doesn't declare
STATE
capability: "Your tap doesn't support incremental sync. See https://melta.no#state_capability for more info." That would be better than silently not passing state to the tap. 2. A
meltano compile --check=all
or
meltano check
command or similar is probably warranted for us in the near future. What might not be reasonable to put in the codepath of normal operations like
meltano run
and
meltano add
could be completely okay to run in an out-of-cycle "check my stuff please" command. That would be a decent place to iterate though all plugins, run
--about
on them, and compare the output with what is declared in
meltano.yml
. I'll log issues on both of the above unless anyone disagrees - or beats me to it. 😄
d
Thank you @edgar_ramirez_mondragon and @aaronsteers for your responses here and for the thoughts and work you put into these items. Also, are you interested in community contributions for the two documentation suggestions I made? Should I open an issue in GitHub for discussion?
a
Thanks, @david_gauthier-dussureault. And absolutely - we always appreciate new docs contributions.
I think a bit more documentation in the Meltano Singer SDK would be a great improvement to the user experience. I think either sometime in the SDK Code Samples with example of an implementation of get_starting_timesamp() or a dedicated section similar to Parent-Child Streams would be awesome.
Yeah, I agree both could be really helpful.