Hello, I am working at GitLab, and there is an alr...
# getting-started
j
Hello, I am working at GitLab, and there is an already implemented custom-tap (maybe not accessible) from a data source called Adaptive, which captures financial data. I need to modify this tap by adding a stream, but it is my first time using Meltano, and I'm not sure where to start as the new API endpoint is quite different from the others. Hoping someone can advise! Details in thread.
There are currently 15 existing streams, including one called
exportVersions
, which calls an API that returns a list of report
version_id's
, i.e
Copy code
["Forecast Report1", "Forecast Report2"]
The new stream will need to use the version_id's to request an exportData endpoint, something like:
Copy code
for version_id in version_ids:
	request.get(exportData_url, params={'version_id': version_id})
Given these two requirements: 1. Need to get version_id's from existing stream and pass them to new stream 2. New stream needs to call exportData endpoint 'n' times, where 'n' is the number of version_id's passed in. All other streams only call their respective endpoint 1 time. What would be the best design pattern? Thank you in advance. cc @jstark
a
@justin_wong - If I correctly understand the use case, this sounds like a case where you'd likely want to create the new stream with the existing
exportVersions
as its parent...
The child stream would then receive the parent's data as its
context
- which it can then use to make additional calls to get those data elements which are specific to each parent item (to each version in this use case).
In your custom tap, it looks like another stream already does use
VersionsStream
as a parent, so you might be able to model after this one. Does this help at all?
j
Yes, that's super helpful, thanks AJ! Regarding, the second requirement of making the request multiple times, would [prepare_request](https://gitlab.com/gitlab-data/meltano_taps/-/blob/main/tap-adaptive/tap_adaptive/client.py#L65) function need to be overriden in the new stream?
One challenge for me has been following the flow of what happens after you run
meltano invoke
, I'm not sure which functions (in this case within client.py) are called when.
a
Regarding, the second requirement of making the request multiple times...
Can you give a specific example for what requests you'd be making? The example endpoints for example, or the differences between those extra calls?
One challenge for me has been following the flow of what happens after you run
meltano invoke
, I'm not sure which functions (in this case within client.py) are called when.
Internally, this is managed by two layers: Meltano itself calls the tap's CLI (
meltano invoke tap-mysql --help
approx. equals
tap-mysql --help
). And the tap's CLI is mostly handled by the SDK - so you only have to write the unique handling logic without worrying about Singer Spec and CLI arg passing.
j
Does this example help?
Copy code
# returned from Versions endpoint
version_id =  ["Forecast Report1", "Forecast Report2"]

for version_id in version_ids:
   
    response = request.get(some_new_endpoint, params={'version_id': version_id})
    payload = response.json()
   
    # if it's possible to upload each payload directly to snowflake
    upload_to_snowflake(payload)

    # else save each payload in memory for final upload via tap-snowflake
    final_payload = payload + final_payload
The new endpoint only takes in one
version_id
, I need to request the endpoint with each
version_id
that is returned from the versionStream. That's the only difference between the calls.
e
Is the challenge that
versionStream
returns multiple
version_id
values in a list?
j
The challenge is that the new stream needs to call its endpoint multiple times, once for every
version_id
returned by the
versionStream
. Every other stream only needs to call its respective endpoint once.
e
Gotcha, I don’t think this is easily supported in the SDK today. There are a few ways we could support this: 1. Adding a method in between, e.g.
Stream.get_context_from_parent(parent_context: dict) -> Iterable[dict]
that by default just yields the parent context, and
Stream._sync_children
iterates over the generated contexts 2. Supporting passing the context to
RESTStream.get_new_paginator
. This would allow use cases where the dev needs to paginate over a set of fixed values. cc @aaronsteers wdyt?
j
Darn, I appreciate you checking!
a
The challenge is that the new stream needs to call its endpoint multiple times, once for every version_id returned by the versionStream.
Just to make sure I understand: VersionStream returns one record per version_id, correct? And you just need to make one additional call per version record of the parent stream... is that right?
I wonder if would work to override get_url_params() in the child, and let the child stream call itself once per parent.
It looks like the version stream is passing a context with a specific version "name" to it's child streams here: https://gitlab.com/gitlab-data/meltano_taps/-/blob/main/tap-adaptive/tap_adaptive/streams.py#L271
So, from the context of the child, there should be only a single version to fetch into about - if I understand correctly
j
Just to make sure I understand: VersionStream returns one record per version_id, correct?
Correct. Here's a sample payload returned from VersionStream:
Copy code
{
  "version": [
    {
      "@id": "144",
      "@name": "Forecast1"
    },
    {
      "@id": "1683",
      "@name": "Forecast2"
    },
    {
      "@id": "1543",
      "@name": "Forecast3"
    },
    {
      "@id": "1563",
      "@name": "Forecast4"
    }
  ]
}
And you just need to make one additional call per version record of the parent stream... is that right?
Correct- one additional call per version record, so 4 version_id's = 4 additional calls to the new stream. One caveat- that's probably insignificant- is that some version_id's would be dropped based on a regex pattern, so in reality, 4 version_id's would yield 2 additional calls let's say.
Hi @aaronsteers - I wanted to follow up on this as our team is going have to make a choice whether to implement this in Meltano or to switch to a custom solution. I took a look at your comments with fresh eyes. You said:
It looks like the version stream is passing a context with a specific version "name" to it's child streams here: https://gitlab.com/gitlab-data/meltano_taps/-/blob/main/tap-adaptive/tap_adaptive/streams.py#L271
You are correct, currently, the version stream is only passing a specific version to its child streams. You also said:
So, from the context of the child, there should be only a single version to fetch into about - if I understand correctly
No. You're right- that is how the code was implemented, it passes a single version, BUT there are actually dozens of versions that need to be passed down to a child stream. And the child stream needs to call each version. My understanding of this as it stands is that Meltano does not support this type of workflow. It looks like this type of logic was implemented in a Zendesk Singer tap, but I'm not sure if I can implement something like this using the current Meltano abstraction.
a
@justin_wong - re:
You are correct, currently, the version stream is only passing a specific version to its child streams.
If you have visibility to all versions, are you able to pass all the versions in the child context? And then the child stream can iterate through each.
j
Yes, I think I can get all the versions into the context obj. That brings it back to the fundamental issue, is it possible to call the final endpoint multiple times, once for each version in the context obj? As currently implemented, each endpoint is only called once. If so, would you be able to provide an example/doc that I could refer to? I’ve been trying to find an example of this pattern, but haven't had luck.