emcp
10/18/2021, 2:36 PMvisch
10/18/2021, 2:49 PMvisch
10/18/2021, 2:49 PMemcp
10/18/2021, 2:54 PMvisch
10/18/2021, 2:54 PMvisch
10/18/2021, 2:55 PMemcp
10/18/2021, 2:55 PMemcp
10/18/2021, 2:55 PMemcp
10/18/2021, 2:56 PMemcp
10/18/2021, 2:56 PMemcp
10/18/2021, 2:58 PMfrom datetime import datetime
from typing import Optional, Iterable
from tap_ibkr.tws_core.queries import execute_symbol_search
from tap_ibkr.tws_core.ibrokers_thrift.client.ibrokers_client import TWSAPIThriftClient
from singer_sdk.streams import Stream
from tap_ibkr.tws_core.ibrokers_thrift.gen_py.ibrokers import ttypes
class ibkrStream(Stream):
def get_records(self, context: Optional[dict]) -> Iterable[dict]:
<http://self.logger.info|self.logger.info>("TESTING LOGGING STATEMENT??!!")
ibrokers_client = TWSAPIThriftClient().connect_to_ibrokers(ib_host_ip=self.config["host_tws_thrift"])
ibrokers_client.connect()
count = 0
query_starttime = datetime.utcnow().isoformat() + 'Z'
try:
valid_stocks = execute_symbol_search(ibrokers_client, context['alphabet_partition'])
if hasattr(valid_stocks, '__iter__'):
for valid_stock in valid_stocks:
count += 1
if count % 10000 == 0:
<http://self.logger.info|self.logger.info>("Query Counter == " + str(count))
<http://self.logger.info|self.logger.info>("Current Stock == " + str(valid_stock))
ticker_record = {}
if hasattr(valid_stock, "contract"):
ticker_record = {
"symbol": valid_stock.contract.symbol.lower(),
"sec_type": valid_stock.contract.secType.lower(),
"primary_exchange": valid_stock.contract.primaryExchange.lower(),
"exchange": valid_stock.contract.exchange.lower(),
"currency": valid_stock.contract.currency.lower(),
"contract_id": valid_stock.contract.conId,
"query_start_time": query_starttime
}
else:
self.logger.warning("Unexpected situation occurred...? ")
continue
yield ticker_record
except ttypes.IBSAPIException as ib_excp:
self.logger.error("IBSAPIException occurred, stopping.. " + str(ib_excp))
except Exception as excp:
self.logger.error("Exception occurred, stopping.. " + str(excp))
emcp
10/18/2021, 3:04 PMemcp
10/18/2021, 3:05 PMvisch
10/18/2021, 3:06 PMvisch
10/18/2021, 3:06 PMemcp
10/18/2021, 3:09 PMdef download_files_by_bucket(s3_url_string, target_file_path, s3_resource):
bucket_name, prefix = s3_url_string.split('/', 2)[-1].split('/', 1)
# Handle missing / at end of prefix
if not target_file_path.endswith('/'):
target_file_path += '/'
#response = s3_resource.list_objects(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
paginator = s3_resource.get_paginator('list_objects_v2')
for result in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
# Download each file individually
for key in result['Contents']:
# Calculate relative path
rel_path = key['Key'][len(target_file_path):]
# Skip paths ending in /
if not key['Key'].endswith('/'):
local_file_path = os.path.join(target_file_path, key['Key'])
# Make sure directories exist
local_file_dir = os.path.dirname(local_file_path)
assert_dir_exists(local_file_dir)
s3_resource.download_file(bucket_name, key['Key'], local_file_path)
it simply downloads to file system.. I guess here I can try to .. add some base64 encoding.. and IF the target understands this stuf.. it can take the record and lay it into a target file system..
I can do the tap quickly, but will then need to look at how targets work.. never written oneemcp
10/18/2021, 3:10 PMemcp
10/18/2021, 3:11 PM