Hey! I'm trying to develop a custom rest extracto...
# singer-tap-development
l
Hey! I'm trying to develop a custom rest extractor. But when I run it, it seems like the context that gets sent to
get_url_params
is always None. Does anyone have any idea why that might be ?
1
r
No state possibly? What are you trying to do with it?
l
When I look in the system database I see a state so I assume there is a state {"singer_state": {"bookmarks": {"fields": {"replication_key": "id", "replication_key_value": "workratio"}, "issues": {"replication_key": "updated", "replication_key_value": "2024-04-19T125814.552361+00:00"}}}} I'm trying to create a version of the jira-tap and I want to use the state to create the url_params that is used in a stream. But when I log the context of the stream all I see is that it is None.
r
Are you running just the `issues`/`fields` streams? Might be an issue for other streams that don't have any state. What stream are you trying to configure
get_url_params
for?
l
running just the issues and fields streams. I'm trying to run configure it for the issues stream
does it have to do something with me using target-jsonl?
r
Did you fork the existing
tap-jira
? If so, is the context
None
without your changes still?
does it have to do something with me using target-jsonl?
No, you'll probably find you have the same issue just invoking the tap standalone:
meltano invoke tap-jira
l
ATM I just took the content of the repo and put it in a folder, then I install the tap like so
Copy code
- name: tap-jira
  namespace: tap-jira
  pip_url: -e extract/tap-jira
  executable: tap-jira
yes I have the same issue if I just invoke the tap standalone. I also tried creating a simple extractor using the cookie-cutter and I have the same issue there. So it might be something with my local setup?
r
Maybe that's just the default behaviour of the SDK then... I'm not sure why it would be
None
if there is state. @Edgar Ramírez (Arch.dev) thoughts, if you have a sec?
(and thanks in advance)
e
The context would always be None if 1. The stream is not a child stream 2. Or the stream does not override the
partitions
attribute I'd rather use
get_starting_timestamp
or
get_starting_replication_value
to get the bookmark for the stream
1
r
I just assumed those were convenience methods that wrap context. I think I'm conflating context and state though.
e
Yeah you have to pass them the context, which can be None. They use the context to get the right bookmark for the current partition/child.
r
Makes sense, thank you. 😁
l
Thanks for the input! I tried using
get_starting_replication_key_value
, it gets the start_value I have set in config if I have one. But it still doesn't seem to pick up my replication_key from my state. Do I need to set a partitions attribute for it to work properly? This is the code for my stream
Copy code
class IssueStream(JiraStream):
    """
    <https://developer.atlassian.com/cloud/jira/platform/rest/v3/api-group-issue-search/#api-rest-api-3-search-get>
    """

    """
    name: stream name
    path: path which will be added to api url in client.py
    schema: instream schema
    primary_keys = primary keys for the table
    replication_key = datetime keys for replication
    records_jsonpath = json response body
    """

    name = "issues"
    path = "/search"
    primary_keys = ["id"]
    replication_key = "updated"
    records_jsonpath = "$[issues][*]"  # Or override `parse_response`.
    instance_name = "issues"
    TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

    schema = (
        PropertiesList(
            Property("expand", StringType),
            Property("created", DateTimeType),
            Property("updated", DateTimeType),
            Property("id", StringType),
            Property("self", StringType),
            Property("key", StringType),
            Property("fields", ObjectType())
        ).to_dict())

    def get_url_params(
            self,
            context: dict | None,
            next_page_token: Any | None,
    ) -> dict[str, Any]:

        replication_key_start = self.get_starting_replication_key_value(context)
        <http://self.logger.info|self.logger.info>(f"replication key value used = {replication_key_start}")

        params = {"sort": "asc", "order_by": "fields.updated",
                  "maxResults": self.config.get("page_size", {}).get("issues", 10), "jql": []}

        if next_page_token:
            params["startAt"] = next_page_token

        params["sort"] = "asc"
        params["order_by"] = "fields.updated"

        if replication_key_start:
            params["jql"].append(f"(updated>={replication_key_start})")

        if params["jql"]:
            jql = " and ".join(params["jql"])
            params["jql"] = jql

        else:
            params.pop("jql")  # drop if there's no query
        <http://self.logger.info|self.logger.info>(params)
        return params

    def post_process(self, row: dict, context: dict | None = None) -> dict | None:
        row["updated"] = row["fields"]["updated"]
        row["created"] = row["fields"]["created"]
        return row
and this is the code for the JiraStream
Copy code
class JiraStream(RESTStream):
    """tap-jira stream class."""

    next_page_token_jsonpath = (
        "$.paging.start"  # Or override `get_next_page_token`.  # noqa: S105
    )

    records_jsonpath = "$[*]"  # Or override `parse_response`.

    # Set this value or override `get_new_paginator`.
    next_page_token_jsonpath = "$.next_page"

    @property
    def url_base(self) -> str:
        """
        Returns base url
        """
        domain = self.config["domain"]
        base_url = "https://{}:443/rest/api/3".format(domain)
        return base_url

    @property
    def authenticator(self) -> _Auth:
        """Return a new authenticator object.

        Returns:
            An authenticator instance.
        """
        auth_type = self.config["auth"]["flow"]

        if auth_type == "oauth":
            return BearerTokenAuthenticator.create_for_stream(
                self,
                token=self.config["auth"]["access_token"],
            )
        else:
            logging.warn(self.config)
            password = os.environ.get("JIRA_CUSTOM_PASSWORD")
            return BasicAuthenticator.create_for_stream(
                self,
                password=password,
                username=self.config["auth"]["username"],
            )

    @property
    def http_headers(self) -> dict:
        """Return the http headers needed.

        Returns:
            A dictionary of HTTP headers.
        """
        headers = {}
        if "user_agent" in self.config:
            headers["User-Agent"] = self.config.get("user_agent")
        # If not using an authenticator, you may also provide inline auth headers:
        # headers["Private-Token"] = self.config.get("auth_token")  # noqa: ERA001
        return headers

    def get_url_params(
        self,
        context: dict | None,
        next_page_token: Any | None,
    ) -> dict[str, Any]:

        """Return a dictionary of values to be used in URL parameterization.

        Args:
            context: The stream context.
            next_page_token: The next page index or value.

        Returns:
            A dictionary of URL query parameters.
        """
        params: dict = {}
        if next_page_token:
            params["startAt"] = next_page_token
        if self.replication_key:
            params["sort"] = "asc"
            params["order_by"] = self.replication_key

        return params

    def get_next_page_token(
        self,
        response: requests.Response,
        previous_token: t.Any | None,
    ) -> t.Any | None:
        """Return a token for identifying next page or None if no more pages."""
        # If pagination is required, return a token which can be used to get the
        #       next page. If this is the final page, return "None" to end the
        #       pagination loop.
        return None
e
Did you define
capabilities
for your plugin in
meltano.yml
?
l
Yeah, it looks like this and I assume it is working because it does write to a state
Copy code
version: 1
send_anonymous_usage_stats: true
project_id: "tap-jira"
default_environment: dev
plugins:
  extractors:
  - name: "tap-jira"
    namespace: "tap_jira"
    pip_url: -e .
    capabilities:
    - state
    - catalog
    - discover
    - about
    - stream-maps
    settings:
    - name: start_date
    - name: end_date
    - name: domain
    - name: auth_type
    - name: auth.flow
    - name: auth.access_token
      kind: password
    - name: auth.username
    - name: auth.password
      kind: password
environments:
- name: dev
- name: staging
- name: prod
or do you mean in in the meltano.yml file I haev at the root of my project ?
looks like adding it to the project level .yaml file fixed it, thank you so much!
🔥 1
🙌 1