Hey in `get_records` method, the line of code `yie...
# getting-started
m
Hey in
get_records
method, the line of code
yield record
fails from
core.py
and many other source code locations if there are
inf
or
nan
values in the record. Is there a way to override the default call of
json.dumps
in meltano / singer to set the parameter as
json.dumps(<json>, allow_nan=True)
? Seems like bad practice to modify the existing library source code setting
allow_nan=True
. Not sure of a better way to do this, or if the community wants this on the master branch?
Copy code
def format_message(message: Message) -> str:
    """Format a message as a JSON string.

    Args:
        message: The message to format.

    Returns:
        The formatted message.
    """
    return json.dumps(message.to_dict(), use_decimal=True, default=str, allow_nan=True)
e
Hi @matt_elgazar 👋🏼 I don't think the values produced by
allow_nan=True
are valid JSON and they seem to be specific to Python JSON stdlib and derived libraries, so I'd worry that it'd violate some assumptions of interoperability of the Singer ecosystem, namely that taps and targets can be written in any language and they communicate through valid JSON lines via pipes. Perhaps you're able to use
post_process
to handle those values before they're serialized?
m
Hmm… I don’t think
post_process
is the right idea because then I’m manually handling the NaN values as part of a transformation process, when I only want to extract and load the raw data and put any transformations for later. Since None / NaN / Inf values are standard in datasets, and I don’t think setting
allow_nan=True
will affect the interoperability. Only way I can think of is replacing those values with strings of that value… but I don’t like that solution. If I could override the default singer function here I’d be good.
messages.py
Copy code
def format_message(message: Message) -> str:
    """Format a message as a JSON string.

    Args:
        message: The message to format.

    Returns:
        The formatted message.
    """
    return json.dumps(message.to_dict(), use_decimal=True, default=str)
v
What is the data source @matt_elgazar ? (Mostly curious, but there's a few things I've seen that may help depending on what it is)
m
I’m trying to build a tap that hits yahoo finance using the yfinance python library. I have done this manually in another repo but random things keep breaking with the loaders. It’s quite a pain to write the loaders 😅Here’s the github url: https://github.com/melgazar9/tap-yfinance Definitely open to suggestions that you all have since I’m new to meltano and singer! I think many people would benefit from this tap. Before addressing my below ideas I need to address the NaN issue. I also had this issue when extracting data from mongodb. My solution was to convert Inf / NaN / None to strings (I really don’t like that solution but I needed to get something done). Some ideas are: • Configurable: Create a new table defined in meltano.yml for each type of data that’s pulled (e.g. table for stock_prices_1m, 2m, forex_prices_1m, etc…, financials, etc..) • For the
prices
tables (e.g. stock_prices_1m) the
state
needs to keep track of the last timestamp, yahoo-ticker pair. So if AAPL was last pulled at 2020-01-01 and META was last pulled at 2023-01-01, I want to start the extraction for AAPL at 2020-01-01 and META at 2023-01-01. Not sure how to do this but would think the json file should something like this: ◦
{'AAPL': {'last_date': '2020-01-01'}, 'NVDA': {'last_date': '20023-01-01'}}
e
Gotcha. So, there's no way to override the serializer function, so do log a feature request. It should probably be implemented by adding abstract classes for SerDe that taps and targets can override. I do think that `Infinity`/`NaN` violates interoperability because the default behavior changes from one JSON library to another:
Copy code
>>> import json, simplejson
>>> s = json.dumps({"x": float("-inf")})
>>> json.loads(s)
{'x': -inf}
>>> simplejson.loads(s)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/meltano/sdk/.venv/lib/python3.11/site-packages/simplejson/__init__.py", line 514, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/meltano/sdk/.venv/lib/python3.11/site-packages/simplejson/decoder.py", line 386, in decode
    obj, end = self.raw_decode(s)
               ^^^^^^^^^^^^^^^^^^
  File "/meltano/sdk/.venv/lib/python3.11/site-packages/simplejson/decoder.py", line 416, in raw_decode
    return self.scan_once(s, idx=_w(s, idx).end())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
simplejson.errors.JSONDecodeError: Expecting value: line 1 column 7 (char 6)
If
tap-yfinance
uses
allow_nan=True
, some targets may crash. Using strings would break schema validation, so at the moment I'm not sure what's the right approach.
m
Interesting… How do you typically handle missing data then?
e
JSON
null
is the usual way missing data is represented in Singer
m
I’m not aware of a python datatype for json null, so not sure how to convert it. Would you add this replacement for +-inf / None / NaN in
post_process
?
v
@matt_elgazar it's
None
m
Ahh good to know. When doing that I get a new error
jsonschema.exceptions.ValidationError: None is not of type 'number'
.
In that case I can just change the expected datatatype in schema input
e
Yeah,
"type": ["number", "null"]
should do it
m
Where would you put that?
Copy code
_schema = th.PropertiesList(
        th.Property("open", th.NumberType, required=True)
    ).to_dict()
r
Make it
required=False
(or omit
required
kwarg - same behaviour).
m
Hm ok! I’ll take it
Hey @edgar_ramirez_mondragon / @Reuben (Matatika) / @visch do you know how the
state.json
file should look like when debugging? I want to group the standard state information by ticker (e.g. store a different timestamp state for each ticker.) Can something like this work where
stock_prices_1m
is the table_name and the key is the ticker, value is the replication key dict? I think I need to override a singer method that writes the state to fit this format. What’s that method called?
Copy code
{
  "bookmark": {
    "stock_prices_1m": {
      "AAPL": {
        "replication_key": "replication_key",
        "replication_key_value": "AAPL|2023-10-11T02:33:11.053658Z"
      },
      "NVDA": {
        "replication_key": "replication_key",
        "replication_key_value": "NVDA|2023-10-12T02:35:11.053658Z"
      }
    },
    "stock_prices_2m": {
      "AAPL": {
        "replication_key": "replication_key",
        "replication_key_value": "AAPL|2023-11-11T02:33:11.053658Z"
      },
      "NVDA": {
        "replication_key": "replication_key",
        "replication_key_value": "NVDA|2023-11-12T02:35:11.053658Z"
      }
    }
  }
}
u
@matt_elgazar The
partitions
attribute and the context in parent/child stream relations are used to partition the state. This example has a single
project_id
partition but there can be any number:
Copy code
{
  "bookmarks": {
    "projects": {},
    "branches": {
      "partitions": [
        {
          "context": {
            "project_id": "noisy-haze-492309"
          }
        }
      ]
    },
    "databases": {
      "partitions": [
        {
          "context": {
            "project_id": "noisy-haze-492309",
            "branch_id": "br-noisy-haze-492309"
          }
        }
      ]
    },
    "roles": {
      "partitions": [
        {
          "context": {
            "project_id": "noisy-haze-492309",
            "branch_id": "br-noisy-haze-492309"
          }
        }
      ]
    },
    "endpoints": {
      "partitions": [
        {
          "context": {
            "project_id": "noisy-haze-492309"
          }
        }
      ]
    },
    "operations": {
      "partitions": [
        {
          "context": {
            "project_id": "noisy-haze-492309"
          }
        }
      ]
    }
  }
}
m
Hey @edgar_ramirez_mondragon I’m a bit confused. I don’t understand 2 things still • Where can I read the state? I think something like this within the
get_records
method?
state = self.get_context_state(context)['context']
• More confusingly, what should the output of partitions be / what does it look like specifically? I read in the docs it should be a list of dictionaries, but how does it dynamically update the replication key? Should the output look something like this?
Copy code
@property
    def partitions(self) -> list[dict] | None:
        """
        Example output:
        list_of_dicts = [
            {
                "stock_prices_1m": {
                    "AAPL": {
                        "replication_key": "replication_key",
                        "replication_key_value": "AAPL|2023-10-11T02:33:11.053658Z"

                    },
                    "NVDA": {
                        "replication_key": "replication_key",
                        "replication_key_value": "NVDA|2023-11-11T02:33:11.053658Z"

                    }
                }
            },

            {
                "stock_prices_2m": {
                    "AAPL": {
                        "replication_key": "replication_key",
                        "replication_key_value": "AAPL|2023-11-11T02:33:11.053658Z"

                    },
                    "NVDA": {
                        "replication_key": "replication_key",
                        "replication_key_value": "NVDA|2023-12-11T02:33:11.053658Z"

                    }
                }
            }
        ]
        """

        state_data = {}
        for ac in self.config['asset_class']:
            for table_name in self.config['asset_class'][ac].keys():
                state_data[table_name] = {}
                for ticker in self.config['asset_class'][ac][table_name]['tickers']:
                    state_data[table_name][ticker] = {
                        "replication_key": "replication_key",
                        "replication_key_value": f"{ticker}|{self.config['default_start_date']}"
                    }

        list_of_dicts = []
        for table_name, table_data in state_data.items():
            for ticker, ticker_data in table_data.items():
                entry = {
                    table_name: {
                        ticker: ticker_data
                    }
                }
                list_of_dicts.append(entry)
        return list_of_dicts
^^ Where does it overwrite the
replication_key_value
? I have it as
{self.config['default_start_date']}
but this whole thing doesn’t look right.
e
https://sdk.meltano.com/en/latest/incremental_replication.html#incremental-replication is worth reading. The easiest way to know the state format for a given tap is to run it and inspect the last emitted
STATE
message.
m
hey yes I have read this many times but I don’t see a clear explanation of what’s expected as inputs / outputs and which methods to override. I’m completely new meltano, so from that documentation it’s not clear what I need to do in order to save and load the state and which methods to override
Regarding managing my own state file, I don’t know if it’s necessary to do this, is it? Going through the docs, it seems like I need to override the
partitions
method in some way. Doing that should allow me to use the built in meltano incremental replication. I think I just need some direction towards what the inputs and outputs should be of the
partitions
method and how/where the replication_key is getting overridden. I think what I’m trying to do is pretty simple but I just don’t know where the magic happens so I’m constantly running things in debug mode.
u
It's hard to tell without knowing the structure of the tap or the API, but if the ticker IDs
AAPL
and
NVDA
are expected to be passed by the user via config and the endpoint expects the ticker to be passed in the path or as a URL parameter, then each partition would be dictionary that contains at least the ticker: 1. config:
{"tickers": ["APPL", "NVDA"]}
2. stream class
Copy code
class MyStream:
  # Do this if the ticker is in the path
  path = "/api/v1/tickers/{ticker}"  # 'ticker' is a key in each partition/context dict

  @property
  def partitions(self):
    return [{"ticker": t} for t in self.config["tickers"]]

  # Do this if the ticker is a query param
  def get_url_params(self, context, next_page_token):
    ticker = context["ticker"]  # 'ticker' is a key in each partition/context dict
    ...
u
The methods you're looking for to call from
get_url_params
are either singer_sdk.Stream.get_starting_timestamp or Stream.get_starting_replication_key_value. And essentially, I think you want a partition for every combination of asset class and ticker, if every asset class has the same schema. If you 1. set the
replication_key
attribute in your class to, say,
replication_key = 'date'
2. and make sure
date
is included in the stream schema 3. and make sure
date
is included in each record, perhaps making use of
post_process
to get the right value from the record The the SDK should manage the state automatically for you.
u
I'm sorry, it's probably harder to get in the nitty gritty in a Slack thread. Feel free to start a discussion in the SDK repo and ping me if you prefer.
m
Ah yea i realized that, yes thanks! I had to send the replication key to each partition separately. Here’s my approach:
Copy code
@property
    def partitions(self):
        state_data = {}

        for asset_class, asset_params in self.config['asset_class'].items():
            for table_name, table_params in asset_params.items():
                state_data[table_name] = {}
                for ticker in table_params['tickers']:
                    state_data[table_name][ticker] = {
                        "replication_key": "replication_key",
                        "replication_key_value": f"{ticker}|{datetime.now()}"
                    }

        return [state_data]
The part I’m stuck on now is how can I send these to different table_names. Does this happen on the extractor side or the loader side? For example,
stock_prices_1m
should be its own table.
u
Oh, then that requires a different approach. For example in my own tap-dbf I: • get a filename pattern from config and instantiate a stream for each match: https://github.com/edgarrmondragon/tap-dbf/blob/2025269e5e2825d5c6352e8a23472e04d9bf2342/tap_dbf/tap.py#L217-L227. In your case, you already have a mapping/list so you just need to iterate and instantiate a stream for each element/combination. • get the stream from the config or the stream's init parameters and pass that to the parent class's initializer: https://github.com/edgarrmondragon/tap-dbf/blob/2025269e5e2825d5c6352e8a23472e04d9bf2342/tap_dbf/tap.py#L137-L158
m
hmm ok i’ll go through this. Does something also need to be done on the loader side?
I was thinking I’d have to define the output tables in the loader section of meltano.yml
e
No, the target will automatically create the right tables based on the stream names and schemas
m
Ok cool. Going through it now. A bit complicated!
e
I can see it's probably more complicated than it should be and I'm wondering what's best to prioritize documenting for this type of more complex use cases: how-tos, tutorials, etc. I don't know 😅
m
@pat_nadolny / @edgar_ramirez_mondragon do you guys know why this is happening? Been stuck on this for a few days 😭 thankyou When running
meltano invoke tap-yfinance
I can see the correct records printed to stdout, but when I run
meltano el tap-yfinance target-jsonl --state-id test
the only record that shows up in the jsonl file is the
replication_key
and the
replication_key_value
. I also tried target-csv and got the same result. In `get_records`:
Copy code
self.logger.info(f'\n\n\n*** {record} ***\n\n\n')
yield record
In Stdout:
Copy code
*** {'timestamp': Timestamp('2023-10-18 15:22:00+0000', tz='UTC'), 'timestamp_tz_aware': Timestamp('2023-10-18 11:22:00-0400', tz='America/New_York'), 'timezone': 'America/New_York', 'yahoo_ticker': 'NVDA', 'open': 424.80999755859375, 'high': 424.80999755859375, 'low': 424.80999755859375, 'close': 424.80999755859375, 'volume': 0, 'dividends': 0.0, 'stock_splits': 0.0, 'repaired': False, 'batch_timestamp': '2023-10-18 15:23:00.838365', 'replication_key': 'NVDA|2023-10-18 15:23:00.838365'} ***

...
In output/stock_prices_1m.jsonl:
Copy code
{"replication_key": "AAPL|2023-10-18 15:22:58.336357"}
{"replication_key": "AAPL|2023-10-18 15:22:58.336574"}
...
When i change the
CatalogEntry
to be this it deselects all streams
Copy code
CatalogEntry(
             tap_stream_id=asset_class + '|' + table_name,
             stream=asset_class + '|' + table_name,
             table=table_name,
...
@edgar_ramirez_mondragon saw your fork. Thank you for the help! ty
@edgar_ramirez_mondragon I managed to get a basic tap working. All I needed to add was one line in
key_properties
. Not sure why the stream only outputs the fields listed in the metadata key_properties.
Copy code
metadata=MetadataMapping.get_standard_metadata(
                    schema=schema,
                    replication_method=None,  # defined by user
                    key_properties=[
                        'timestamp', 'timestamp_tz_aware', 'timezone', 'yahoo_ticker', 'open', 'high', 'low', 'close',
                        'volume', 'dividends', 'stock_splits', 'repaired'
                    ],
                    valid_replication_keys=None  # defined by user
                )