Anyone have an example using Partitions with a Chi...
# singer-tap-development
v
Anyone have an example using Partitions with a Child Stream? Because
_sync_records()
calls
context_list = [context] if context is not None else self.partitions
, and when calling a child stream my parent
get_child_context()
has data, the partitions for my Child Stream are ignored. My use case is Combining the two Streams listed here https://github.com/AutoIDM/tap-clickup/pull/88/files into one, using partitions seems like a decent idea ie define a partition of
partitions = [{"archived":"yep"},{"archived":"nope"}]
, and change the parameters based on that, but because these streams a child streams they get clobbered.
e
Yup, the ignoring is taking place here: https://gitlab.com/meltano/sdk/-/blob/5b613404963a55610b59acdd1042e2021c219069/singer_sdk/streams/core.py#L899. I wonder if it makes sense to merge the context with each partition if it's not null.
v
Seems like we might break some things 😕 , I'm not sure I wonder about https://gitlab.com/meltano/sdk/blob/5b613404963a55610b59acdd1042e2021c219069/singer_sdk/streams/core.py#L986 It's almost like instead of
child_stream.sync(context=child_context)
maybe it'd be
child_context = child.stream.from_parent_context(context=child_context)
first so the
child_stream
would have the ability to make changes which allows situations where you want a different behavior for different child streams of the same parent https://gitlab.com/meltano/sdk/blob/5b613404963a55610b59acdd1042e2021c219069/singer_sdk/streams/core.py#L989
This does the trick still thinking about the comments more. I'll push this up for my tap, and we could potentially add this to the SDK?
Copy code
def from_parent_context(self, context: dict):
        """Default is to return the dict passed in"""
        if(self.partitions is None): return context
        else:
            #Was going to copy the partitions, but the _sync call, forces us
            #To use partitions, instead of being able to provide a list of contexts
            #Ideally we wouldn't mutate partitions here, and we'd just provide
            #A copy of partitions with context merged so we don't have side effects
            for partition in self.partitions:
                partition.update(context.copy()) #Add copy of context to partition
            return None #Context now handled at the partition level

    def _sync_children(self, child_context: dict) -> None:
        for child_stream in self.child_streams:
            if child_stream.selected or child_stream.has_selected_descendents:
                child_stream.sync(child_stream.from_parent_context(context=child_context))
Feels yucky mutating partitions, but because of the single parent child relationship we are probably good for now. Feels really dirty
b
Ahh. So I have done something similar to this. What I did was created something like this to inherit, as there were several endpoints with the same set of urlparams.
Copy code
class ArchiveableStream(RestStream):
    @property
    def partitions(self) -> List[dict]:
        return [
            { "archived": True },
            { "archived": False },
        ]
In my case I was using RestStreams and they were urlparams, so overrode
get_url_params
as required in the child streams.
Which may just be what you want to do. I notice
ClickUpStream
is a RestStream, and you don't have an override of
get_url_params
So, you could set the `path = /team/{team_id}/task' And then
Copy code
def get_url_params(self, context, next_page_token):
    params = {}
    params["include_closed"] = "true"
    params["subtasks"] = "true
    params["archived"] = context["archived"]
    return params
Which should get picked up in
prepare_request()
- https://gitlab.com/meltano/sdk/-/blob/main/singer_sdk/streams/rest.py#L209
Ohh, I see now. ClickUpTaskStream has a
get_url_params()
. Looks like all tasks can have
archived
whether it's from the /list or /team url. So I would set the url params (even the ones you have with defaults) in that
ClickUpTaskStream
get_url_params
and then just
params["archived"] = context["archived"]
Then for the TeamTasksStream you can trim the path, and set the parent stream to something like the
ArchiveableStream
above and it should use the partitions.
Oh, uhh, nope nm, I was misremembering. I use the above method for streams where the partition rolls down to all child streams. I just double checked through my code and for this sort of case I got by this by overriding (mostly copy-paste)
_sync_records()
in the parent stream and enriching the
context_list
there, which is a bit of a yikes ... I just whipped up a minimal streams.py to illustrate.
streams.py
Some sort of ability to set stream-local additional partitions would be nice, though. Or even just a list of values you'd want to pass in to a url_param, per partition?
Something weird was going on with the log output. Should be fixed here.
v
Wow awesome @brandon_isom Yeah I should be using
get_url_params
instead of throwing them in myself and doing them in two places 😄 Your method went with overriding
_sync_records
which I was trying really hard not to do as there's a bunch of code in there, and I imagine of all the private functions we shouldn't be overriding that's one I want to try to stay away from if possible With https://meltano.slack.com/archives/C01PKLU5D1R/p1636646158227500?thread_ts=1636595472.225300&cid=C01PKLU5D1R everything works and I don't have to override
_sync
but I'm doing terrible things with partition. We could probably work to get
context_list = [context] if context is not None else self.partitions
updated in the SDK. My gut is we could probably harden up the interfaces between context, partitions, and parent/child streams, but I don't have a solution sitting here today
b
Thinking about this a bit more, I wonder if this is more of a case of a need to enabling looping on a query param within a particular stream. Given this scenario, we have
/lists
/teams
/list/{list_id}/tasks?archived=true|false
/team/{team_id}/tasks?archived=true|false
Suppose we also had a
/task/{task_id}/comments
(or something), which does not accept an
archived
query param. If the
ListStream
and
TeamStream
has no connection to that archived param, and the
CommentStream
has no such parameter, then it may not make sense to consider
archived
as a partitioning construct. Instead it may just make sense to run the
TaskStream
api call twice per team or list id, and carry on without passing
archived
out at all. Heck, maybe even allow for setting
TAP_CLICKUP_INCLUDE_ARCHIVED=false
to choose whether or not to pick up the
archived
records at all.