Is there anyway to pass values from config into th...
# getting-started
a
Is there anyway to pass values from config into the
schema
defined in
streams.py
? For example if the config has a
name
that is used to hit an api-endppoint, is there a way to pass this name into the return schema so it can be persisted in the db?
e
Hi @annamarie! Do you need to use the value of
name
as a schema field name or as a value in a field with a fixed name (e.g.
name_from_config
)?
a
Yes I need the value of
name
but the field name itself is not particularly crucial, I just need it in the schema for a join I'll be performing later on
e
Ok. Then you can probably add the field to the schema in your stream and use post_process to append the field to every record:
Copy code
class MyStream:
  schema = th.PropertiesList(
    th.Property("name_from_config", th.String),
    ...
  ).to_dict()

  def post_process(self, row, context):
    row["name_from_config"] = self.config["name"]
    return row
a
Amazing! Thank you so much
@edgar_ramirez_mondragon Just circling back here, is there any way to modify the returned row to pass the context? I set up the config to be a csv of values to call the api for so
row["name_from_config"] = self.config["name"]
no longer solves my problem. But this does not work, context is empty
Copy code
def post_process(
        self,
        row: dict,
        context: dict | None,
    ) -> dict | None:
        print("CONTEXT:", context)
        
        row['input_domain'] = context['website_domain']
        return row
despite setting the context here, and successfully using this context to search against the correct domain:
Copy code
def request_records(self, context: dict | None) -> t.Iterable[dict]:
        """Request records from REST endpoint(s), returning response records.

        If pagination is detected, pages will be recursed automatically.

        Args:
            context: Stream partition or context dictionary.

        Yields:
            An item for every record in the response.
        """
        decorated_request = self.request_decorator(self._request)

        with metrics.http_request_counter(self.name, self.path) as request_counter:
            request_counter.context = context

            domains = self.config["domains"].split(",")

            if context is None:
                context = {}

            for domain in domains:
                context["website_domain"] = domain

                prepared_request = self.prepare_request(
                    context,
                    next_page_token=None,
                )
                resp = decorated_request(prepared_request, context)
                request_counter.increment()
                self.update_sync_costs(prepared_request, resp, context)
                yield from self.parse_response(resp)