hi Tap-toberfest ! I wanted to maybe do an S3 Tap...
# singer-tap-development
e
hi Tap-toberfest ! I wanted to maybe do an S3 Tap .. to pull down blob files.. does that make sense for Meltano and is it not already implemented I wondered.. I did not see a tap for S3 before but it seems quite.. useful?? I would use it to pull down image and audio datasets into machines file systems but.. this seems very different from say... pushing images to postgresql.. I wonder how I would write this or if it makes sense
v
I don't know where to put this but I keep plugging it in different places. The "file" idea of pulling things down has at least 2 abstraction layers that should be though about (added a 3rd as well). 1. Transport Layer (I think the right term, maybe something else) - How do we get the file, FTP, SFTP, S3, etc etc etc 2. File Types - CSV, Parquet, etc etc 3. Compression - gzip, zip, 7zip, tar, etc --- There's multiple ways to approach this To me blobs fit into Meltano it'd be under the FileType idea above, but it would be pretty specific and potentially another abstraction layer?
I keep saying this in different ways I'll make an issue so we have one place to go back to. Maybe this is the wrong idea and we should just have a bunch of taps that pull from a common library
e
so I didn't miss it.. nothing exists yet then?
v
Not that I know of!
I've done blob types from an oracle db that's the closest I've been 😄
e
maybe I can start something "dumb" to solve my problem.. I need to move images and their JSON metadata to and from S3
I currently have a Boto3 service.. using thrift.. that likely wouldn't be that nice to open source so I wondered if I should toss the thrift piece and write something
but i had seen next to nothing in terms of file handling
nor s3 integration
this is the extent of my tap writing experience.. so I worry I'd need to "yield" the file contents but it'd not retain it's file-ness(??)
Copy code
from datetime import datetime
from typing import Optional, Iterable

from tap_ibkr.tws_core.queries import execute_symbol_search
from tap_ibkr.tws_core.ibrokers_thrift.client.ibrokers_client import TWSAPIThriftClient
from singer_sdk.streams import Stream

from tap_ibkr.tws_core.ibrokers_thrift.gen_py.ibrokers import ttypes


class ibkrStream(Stream):

    def get_records(self, context: Optional[dict]) -> Iterable[dict]:
        <http://self.logger.info|self.logger.info>("TESTING LOGGING STATEMENT??!!")
        ibrokers_client = TWSAPIThriftClient().connect_to_ibrokers(ib_host_ip=self.config["host_tws_thrift"])
        ibrokers_client.connect()
        count = 0
        query_starttime = datetime.utcnow().isoformat() + 'Z'
        try:
            valid_stocks = execute_symbol_search(ibrokers_client, context['alphabet_partition'])
            if hasattr(valid_stocks, '__iter__'):
                for valid_stock in valid_stocks:
                    count += 1
                    if count % 10000 == 0:
                        <http://self.logger.info|self.logger.info>("Query Counter == " + str(count))
                        <http://self.logger.info|self.logger.info>("Current Stock == " + str(valid_stock))
                    ticker_record = {}
                    if hasattr(valid_stock, "contract"):
                        ticker_record = {
                            "symbol": valid_stock.contract.symbol.lower(),
                            "sec_type": valid_stock.contract.secType.lower(),
                            "primary_exchange": valid_stock.contract.primaryExchange.lower(),
                            "exchange": valid_stock.contract.exchange.lower(),
                            "currency": valid_stock.contract.currency.lower(),
                            "contract_id": valid_stock.contract.conId,
                            "query_start_time": query_starttime
                        }
                    else:
                        self.logger.warning("Unexpected situation occurred...? ")
                        continue
                    yield ticker_record
        except ttypes.IBSAPIException as ib_excp:
            self.logger.error("IBSAPIException occurred, stopping.. " + str(ib_excp))
        except Exception as excp:
            self.logger.error("Exception occurred, stopping.. " + str(excp))
could i "yield" the binary image data... and then in a matching target... re-drop the binary content into the given target ?? be it a file system.. JSON file.. jpeg , etc...
seems I'm doing something the spec doesn't really talk about .. or I am still so new
v
Could add the blob to ticker_record, encode it with base64?
That's what I did with oracle 🤷
e
here's what my boto3 thrift microservice does
Copy code
def download_files_by_bucket(s3_url_string, target_file_path, s3_resource):

    bucket_name, prefix = s3_url_string.split('/', 2)[-1].split('/', 1)

    # Handle missing / at end of prefix
    if not target_file_path.endswith('/'):
        target_file_path += '/'

    #response = s3_resource.list_objects(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
    paginator = s3_resource.get_paginator('list_objects_v2')
    for result in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        # Download each file individually
        for key in result['Contents']:
            # Calculate relative path
            rel_path = key['Key'][len(target_file_path):]
            # Skip paths ending in /
            if not key['Key'].endswith('/'):
                local_file_path = os.path.join(target_file_path, key['Key'])
                # Make sure directories exist
                local_file_dir = os.path.dirname(local_file_path)
                assert_dir_exists(local_file_dir)
                s3_resource.download_file(bucket_name, key['Key'], local_file_path)
it simply downloads to file system.. I guess here I can try to .. add some base64 encoding.. and IF the target understands this stuf.. it can take the record and lay it into a target file system.. I can do the tap quickly, but will then need to look at how targets work.. never written one
the downside is.. it appears that download_file will need to still lay files down in the file system of the taps executor.
if it isn't the target that.. won't work.. or maybe that is enough for my usecase.. I just configure the file resting place and run meltano on the AI system that is moving images around... hmmmmm great thoughts @visch, thanks