David Peterson
10/29/2024, 8:40 PMDavid Peterson
10/29/2024, 8:40 PM- name: target-redis
namespace: target_redis
pip_url: -e ./target_redis
executable: target-redis
commands:
run: target-redis
config:
host: localhost
port: 6379
db: 0
pip_dependencies:
- redis
David Peterson
10/29/2024, 8:41 PMfrom setuptools import setup, find_packages
setup(
name='target-redis',
version='0.1.0',
description='A Singer target for writing data to Redis.',
author='David Peterson',
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
],
packages=find_packages(),
install_requires=[
'redis',
'singer-python'
],
entry_points='''
[console_scripts]
target-redis=target_redis:process_batch
''',
)
David Peterson
10/29/2024, 8:43 PM"""Redis target sink class, which handles writing streams."""
from __future__ import annotations
from singer_sdk.sinks import BatchSink
import redis
class RedisSink(BatchSink):
"""Redis target sink class."""
max_size = 10000 # Max records to write in one batch
@property
def config(self) -> dict:
"""Return the target configuration properties."""
return self._config
#@property
#def filepath(self) -> str:
#if not self._filepath:
#self._filepath = self.config.get("filepath")
#return "Redis"
@property
def host(self) -> str:
if not self._host:
self._host = self.config.get("host")
return self._host
@property
def port(self) -> int:
if not self._port:
self._port = self.config.get("port")
return self._port
@property
def db(self) -> int:
if not self._db:
self._db = self.config.get("db")
return self._db
def start_batch(self, context: dict) -> None:
"""Start a batch.
Developers may optionally add additional markers to the `context` dict,
which is unique to this batch.
Args:
context: Stream partition or context dictionary.
"""
with redis.Redis(host=self._host, port=self._port, db=self._db) as redis_client:
# Read the Singer messages from stdin
redis_client.hset('user:1003', mapping={'name': 'Talia Shire', 'age': 78, 'birthplace': 'Woodhaven, NY'})
# Sample:
# ------
# batch_key = context["batch_id"]
# context["file_path"] = f"{batch_key}.csv"
def process_record(self, record: dict, context: dict) -> None:
"""Process the record.
Developers may optionally read or write additional markers within the
passed `context` dict from the current batch.
Args:
record: Individual record in the stream.
context: Stream partition or context dictionary.
"""
# Sample:
# ------
# with open(context["file_path"], "a") as csvfile:
# csvfile.write(record)
with redis.Redis(host=self._host, port=self._port, db=self._db) as redis_client:
# Read the Singer messages from stdin
redis_client.hset('user:1004', mapping={'name': 'Burt Young', 'age': 83, 'birthplace': 'New York, NY'})
def process_batch(self, context: dict) -> None:
"""Write out any prepped records and return once fully written.
Args:
context: Stream partition or context dictionary.
"""
schema = None
key_property = None
with redis.Redis(host=self._host, port=self._port, db=self._db) as redis_client:
# Read the Singer messages from stdin
redis_client.hset('user:1005', mapping={'name': 'Mr. T', 'age': 72, 'birthplace': 'Chicago, IL'})
David Peterson
10/29/2024, 8:44 PM"""Redis target class."""
from __future__ import annotations
from singer_sdk import typing as th
from singer_sdk.target_base import Target
#import redis
from target_redis.sinks import (
RedisSink,
)
class TargetRedis(Target):
"""Sample target for Redis."""
name = "target-redis"
config_jsonschema = th.PropertiesList(
th.Property(
"filepath",
th.StringType,
#title="Output File Path",
#description="The path to the target output file",
),
th.Property(
"file_naming_scheme",
th.StringType,
#title="File Naming Scheme",
#description="The scheme with which output files will be named",
),
th.Property(
"host",
th.StringType,
default = "localhost",
),
th.Property(
"port",
th.IntegerType,
default = 6379,
),
th.Property(
"db",
th.IntegerType,
default = 0,
)
).to_dict()
default_sink_class = RedisSink
if __name__ == "__main__":
TargetRedis.cli()
David Peterson
10/29/2024, 8:46 PMhaleemur_ali
10/30/2024, 12:19 AMhaleemur_ali
10/30/2024, 12:21 AMEdgar Ramírez (Arch.dev)
10/30/2024, 12:21 AMDavid Peterson
10/30/2024, 1:07 PMDavid Peterson
10/30/2024, 4:59 PMhaleemur_ali
10/30/2024, 5:06 PMDavid Peterson
10/30/2024, 5:09 PMEdgar Ramírez (Arch.dev)
10/30/2024, 5:24 PMI was able to get it to invoke the commands and insert the records into redis!That's awesome @David Peterson!