taeef_najib
09/09/2023, 9:04 PMIceberg Datalake (with Hive Metastore as the catalog and S3 as the storage.) I figured that I have to change the meltano.yml and target.py to add custom parameters like metastore thrift uri, s3 key , s3 secret, s3 endpoint url, bucket name, filepath to metadata, etc. But I was wondering what would be the change I need to make in sinks.py to actually connect to an Iceberg Datalake. Did anyone try yet?taeef_najib
09/10/2023, 1:10 AM...
class IcebergSink(BatchSink):
def __init__(self, config, transform=None, default_bucket=None):
super().__init__(config, transform=transform)
self.s3_bucket = config.get("s3_bucket") # S3 bucket name
self.batch_size = config.get("batch_size", 1000)
self.records = []
def start_batch(self, context: dict) -> None:
self.records = []
def process_record(self, record: dict, context: dict) -> None:
self.records.append(record)
def process_batch(self, context: dict) -> None:
# Generate a unique directory name based on the current timestamp
now = datetime.now()
timestamp_str = now.strftime("%Y%m%d%H%M%S%f")
iceberg_dir = f"{timestamp_str}" # Use prefix if needed
# Create a PyArrow table from the collected records
table = pa.Table.from_pandas(pd.DataFrame(self.records))
# Create an S3FileSystem instance
s3 = S3FileSystem()
# Upload the PyArrow table to S3
pq.write_to_dataset(table, root_path=f"s3://{self.s3_bucket}/{iceberg_dir}", filesystem=s3)
# Clean up the temporary records
self.records = []
Can we use PyArrow and Pandas to create a table and write a parquet file in iceberg table format? Any help or suggestion would be much helpful!visch
09/10/2023, 10:24 AMtaeef_najib
09/10/2023, 8:52 PM