matt_elgazar
10/10/2023, 12:01 AMget_records method, the line of code yield record fails from core.py and many other source code locations if there are inf or nan values in the record. Is there a way to override the default call of json.dumps in meltano / singer to set the parameter as json.dumps(<json>, allow_nan=True) ?
Seems like bad practice to modify the existing library source code setting allow_nan=True . Not sure of a better way to do this, or if the community wants this on the master branch?
def format_message(message: Message) -> str:
"""Format a message as a JSON string.
Args:
message: The message to format.
Returns:
The formatted message.
"""
return json.dumps(message.to_dict(), use_decimal=True, default=str, allow_nan=True)edgar_ramirez_mondragon
10/10/2023, 1:29 AMallow_nan=True are valid JSON and they seem to be specific to Python JSON stdlib and derived libraries, so I'd worry that it'd violate some assumptions of interoperability of the Singer ecosystem, namely that taps and targets can be written in any language and they communicate through valid JSON lines via pipes.
Perhaps you're able to use post_process to handle those values before they're serialized?matt_elgazar
10/10/2023, 4:28 AMpost_process is the right idea because then I’m manually handling the NaN values as part of a transformation process, when I only want to extract and load the raw data and put any transformations for later. Since None / NaN / Inf values are standard in datasets, and I don’t think setting allow_nan=True will affect the interoperability. Only way I can think of is replacing those values with strings of that value… but I don’t like that solution. If I could override the default singer function here I’d be good.
messages.py
def format_message(message: Message) -> str:
"""Format a message as a JSON string.
Args:
message: The message to format.
Returns:
The formatted message.
"""
return json.dumps(message.to_dict(), use_decimal=True, default=str)visch
10/10/2023, 12:11 PMmatt_elgazar
10/10/2023, 2:10 PMprices tables (e.g. stock_prices_1m) the state needs to keep track of the last timestamp, yahoo-ticker pair. So if AAPL was last pulled at 2020-01-01 and META was last pulled at 2023-01-01, I want to start the extraction for AAPL at 2020-01-01 and META at 2023-01-01. Not sure how to do this but would think the json file should something like this:
◦ {'AAPL': {'last_date': '2020-01-01'}, 'NVDA': {'last_date': '20023-01-01'}}edgar_ramirez_mondragon
10/10/2023, 4:37 PM>>> import json, simplejson
>>> s = json.dumps({"x": float("-inf")})
>>> json.loads(s)
{'x': -inf}
>>> simplejson.loads(s)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/meltano/sdk/.venv/lib/python3.11/site-packages/simplejson/__init__.py", line 514, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/meltano/sdk/.venv/lib/python3.11/site-packages/simplejson/decoder.py", line 386, in decode
obj, end = self.raw_decode(s)
^^^^^^^^^^^^^^^^^^
File "/meltano/sdk/.venv/lib/python3.11/site-packages/simplejson/decoder.py", line 416, in raw_decode
return self.scan_once(s, idx=_w(s, idx).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
simplejson.errors.JSONDecodeError: Expecting value: line 1 column 7 (char 6)
If tap-yfinance uses allow_nan=True, some targets may crash. Using strings would break schema validation, so at the moment I'm not sure what's the right approach.matt_elgazar
10/10/2023, 4:38 PMedgar_ramirez_mondragon
10/10/2023, 6:20 PMnull is the usual way missing data is represented in Singermatt_elgazar
10/10/2023, 11:03 PMpost_process?visch
10/11/2023, 12:22 AMNonematt_elgazar
10/11/2023, 12:26 AMjsonschema.exceptions.ValidationError: None is not of type 'number'. In that case I can just change the expected datatatype in schema inputedgar_ramirez_mondragon
10/11/2023, 12:28 AM"type": ["number", "null"] should do itmatt_elgazar
10/11/2023, 12:34 AM_schema = th.PropertiesList(
th.Property("open", th.NumberType, required=True)
).to_dict()Reuben (Matatika)
10/11/2023, 12:38 AMrequired=False (or omit required kwarg - same behaviour).matt_elgazar
10/11/2023, 12:39 AMmatt_elgazar
10/11/2023, 1:24 PMstate.json file should look like when debugging? I want to group the standard state information by ticker (e.g. store a different timestamp state for each ticker.) Can something like this work where stock_prices_1m is the table_name and the key is the ticker, value is the replication key dict? I think I need to override a singer method that writes the state to fit this format. What’s that method called?
{
"bookmark": {
"stock_prices_1m": {
"AAPL": {
"replication_key": "replication_key",
"replication_key_value": "AAPL|2023-10-11T02:33:11.053658Z"
},
"NVDA": {
"replication_key": "replication_key",
"replication_key_value": "NVDA|2023-10-12T02:35:11.053658Z"
}
},
"stock_prices_2m": {
"AAPL": {
"replication_key": "replication_key",
"replication_key_value": "AAPL|2023-11-11T02:33:11.053658Z"
},
"NVDA": {
"replication_key": "replication_key",
"replication_key_value": "NVDA|2023-11-12T02:35:11.053658Z"
}
}
}
}user
10/12/2023, 1:54 AMpartitions attribute and the context in parent/child stream relations are used to partition the state. This example has a single project_id partition but there can be any number:
{
"bookmarks": {
"projects": {},
"branches": {
"partitions": [
{
"context": {
"project_id": "noisy-haze-492309"
}
}
]
},
"databases": {
"partitions": [
{
"context": {
"project_id": "noisy-haze-492309",
"branch_id": "br-noisy-haze-492309"
}
}
]
},
"roles": {
"partitions": [
{
"context": {
"project_id": "noisy-haze-492309",
"branch_id": "br-noisy-haze-492309"
}
}
]
},
"endpoints": {
"partitions": [
{
"context": {
"project_id": "noisy-haze-492309"
}
}
]
},
"operations": {
"partitions": [
{
"context": {
"project_id": "noisy-haze-492309"
}
}
]
}
}
}matt_elgazar
10/12/2023, 6:11 PMget_records method? state = self.get_context_state(context)['context']
• More confusingly, what should the output of partitions be / what does it look like specifically? I read in the docs it should be a list of dictionaries, but how does it dynamically update the replication key? Should the output look something like this?
@property
def partitions(self) -> list[dict] | None:
"""
Example output:
list_of_dicts = [
{
"stock_prices_1m": {
"AAPL": {
"replication_key": "replication_key",
"replication_key_value": "AAPL|2023-10-11T02:33:11.053658Z"
},
"NVDA": {
"replication_key": "replication_key",
"replication_key_value": "NVDA|2023-11-11T02:33:11.053658Z"
}
}
},
{
"stock_prices_2m": {
"AAPL": {
"replication_key": "replication_key",
"replication_key_value": "AAPL|2023-11-11T02:33:11.053658Z"
},
"NVDA": {
"replication_key": "replication_key",
"replication_key_value": "NVDA|2023-12-11T02:33:11.053658Z"
}
}
}
]
"""
state_data = {}
for ac in self.config['asset_class']:
for table_name in self.config['asset_class'][ac].keys():
state_data[table_name] = {}
for ticker in self.config['asset_class'][ac][table_name]['tickers']:
state_data[table_name][ticker] = {
"replication_key": "replication_key",
"replication_key_value": f"{ticker}|{self.config['default_start_date']}"
}
list_of_dicts = []
for table_name, table_data in state_data.items():
for ticker, ticker_data in table_data.items():
entry = {
table_name: {
ticker: ticker_data
}
}
list_of_dicts.append(entry)
return list_of_dicts
^^ Where does it overwrite the replication_key_value ? I have it as {self.config['default_start_date']} but this whole thing doesn’t look right.edgar_ramirez_mondragon
10/12/2023, 10:20 PMSTATE message.matt_elgazar
10/12/2023, 10:22 PMmatt_elgazar
10/12/2023, 10:27 PMpartitions method in some way. Doing that should allow me to use the built in meltano incremental replication. I think I just need some direction towards what the inputs and outputs should be of the partitions method and how/where the replication_key is getting overridden. I think what I’m trying to do is pretty simple but I just don’t know where the magic happens so I’m constantly running things in debug mode.user
10/12/2023, 10:52 PMAAPL and NVDA are expected to be passed by the user via config and the endpoint expects the ticker to be passed in the path or as a URL parameter, then each partition would be dictionary that contains at least the ticker:
1. config: {"tickers": ["APPL", "NVDA"]}
2. stream class
class MyStream:
# Do this if the ticker is in the path
path = "/api/v1/tickers/{ticker}" # 'ticker' is a key in each partition/context dict
@property
def partitions(self):
return [{"ticker": t} for t in self.config["tickers"]]
# Do this if the ticker is a query param
def get_url_params(self, context, next_page_token):
ticker = context["ticker"] # 'ticker' is a key in each partition/context dict
...user
10/13/2023, 7:48 PMget_url_params are either singer_sdk.Stream.get_starting_timestamp or Stream.get_starting_replication_key_value. And essentially, I think you want a partition for every combination of asset class and ticker, if every asset class has the same schema.
If you
1. set the replication_key attribute in your class to, say, replication_key = 'date'
2. and make sure date is included in the stream schema
3. and make sure date is included in each record, perhaps making use of post_process to get the right value from the record
The the SDK should manage the state automatically for you.user
10/13/2023, 7:49 PMmatt_elgazar
10/13/2023, 7:50 PM@property
def partitions(self):
state_data = {}
for asset_class, asset_params in self.config['asset_class'].items():
for table_name, table_params in asset_params.items():
state_data[table_name] = {}
for ticker in table_params['tickers']:
state_data[table_name][ticker] = {
"replication_key": "replication_key",
"replication_key_value": f"{ticker}|{datetime.now()}"
}
return [state_data]
The part I’m stuck on now is how can I send these to different table_names. Does this happen on the extractor side or the loader side? For example, stock_prices_1m should be its own table.user
10/13/2023, 7:54 PMmatt_elgazar
10/13/2023, 8:19 PMmatt_elgazar
10/13/2023, 8:19 PMedgar_ramirez_mondragon
10/13/2023, 9:42 PMmatt_elgazar
10/13/2023, 9:43 PMedgar_ramirez_mondragon
10/13/2023, 9:46 PMmatt_elgazar
10/18/2023, 5:35 AMmeltano invoke tap-yfinance I can see the correct records printed to stdout, but when I run meltano el tap-yfinance target-jsonl --state-id test the only record that shows up in the jsonl file is the replication_key and the replication_key_value. I also tried target-csv and got the same result.
In `get_records`:
self.logger.info(f'\n\n\n*** {record} ***\n\n\n')
yield record
In Stdout:
*** {'timestamp': Timestamp('2023-10-18 15:22:00+0000', tz='UTC'), 'timestamp_tz_aware': Timestamp('2023-10-18 11:22:00-0400', tz='America/New_York'), 'timezone': 'America/New_York', 'yahoo_ticker': 'NVDA', 'open': 424.80999755859375, 'high': 424.80999755859375, 'low': 424.80999755859375, 'close': 424.80999755859375, 'volume': 0, 'dividends': 0.0, 'stock_splits': 0.0, 'repaired': False, 'batch_timestamp': '2023-10-18 15:23:00.838365', 'replication_key': 'NVDA|2023-10-18 15:23:00.838365'} ***
...
In output/stock_prices_1m.jsonl:
{"replication_key": "AAPL|2023-10-18 15:22:58.336357"}
{"replication_key": "AAPL|2023-10-18 15:22:58.336574"}
...
When i change the CatalogEntry to be this it deselects all streams
CatalogEntry(
tap_stream_id=asset_class + '|' + table_name,
stream=asset_class + '|' + table_name,
table=table_name,
...matt_elgazar
10/19/2023, 2:10 AMmatt_elgazar
10/19/2023, 7:31 PMkey_properties. Not sure why the stream only outputs the fields listed in the metadata key_properties.
metadata=MetadataMapping.get_standard_metadata(
schema=schema,
replication_method=None, # defined by user
key_properties=[
'timestamp', 'timestamp_tz_aware', 'timezone', 'yahoo_ticker', 'open', 'high', 'low', 'close',
'volume', 'dividends', 'stock_splits', 'repaired'
],
valid_replication_keys=None # defined by user
)