Hi all, I’m new to Meltano, I’m developing a tap f...
# singer-target-development
d
Hi all, I’m new to Meltano, I’m developing a tap for the ETL infrastructure my company has deployed and I’m trying to set up a target to redis using this guide: https://jove.medium.com/build-a-meltano-target-within-1-hour-7134df6244fb I’m trying to just set it up and do a simple hard coded hset command in my database to know that I have got all of my settings right and I’m using a version of tap-jsonplaceholder as my tap. I have tried a bunch of different settings, but I am getting a BrokenPipeError so I’m sure I am missing something in my code. I’ll put my code in the thread, any help would be appreciated.
this is in my meltano.yaml file:
Copy code
- name: target-redis
    namespace: target_redis
    pip_url: -e ./target_redis
    executable: target-redis
    commands:
      run: target-redis
    config:
      host: localhost
      port: 6379
      db: 0
    pip_dependencies:
    - redis
This is in setup.py:
Copy code
from setuptools import setup, find_packages

setup(
    name='target-redis',
    version='0.1.0',
    description='A Singer target for writing data to Redis.',
    author='David Peterson',
    classifiers=[
        'Programming Language :: Python :: 3',
        'License :: OSI Approved :: MIT License',
        'Operating System :: OS Independent',
    ],
    packages=find_packages(),
    install_requires=[
        'redis',
        'singer-python'
    ],
    entry_points='''
        [console_scripts]
        target-redis=target_redis:process_batch
    ''',
)
This is my sinks.py:
Copy code
"""Redis target sink class, which handles writing streams."""

from __future__ import annotations

from singer_sdk.sinks import BatchSink
import redis


class RedisSink(BatchSink):
    """Redis target sink class."""

    max_size = 10000  # Max records to write in one batch

    @property
    def config(self) -> dict:
        """Return the target configuration properties."""
        return self._config
    
    #@property
    #def filepath(self) -> str:
        #if not self._filepath:
            #self._filepath = self.config.get("filepath")
        #return "Redis"

    @property
    def host(self) -> str:
        if not self._host:
            self._host = self.config.get("host")
        return self._host
    
    @property
    def port(self) -> int:
        if not self._port:
            self._port = self.config.get("port")
        return self._port
    
    @property
    def db(self) -> int:
        if not self._db:
            self._db = self.config.get("db")
        return self._db


    def start_batch(self, context: dict) -> None:
        """Start a batch.

        Developers may optionally add additional markers to the `context` dict,
        which is unique to this batch.

        Args:
            context: Stream partition or context dictionary.
        """
        with redis.Redis(host=self._host, port=self._port, db=self._db) as redis_client:
            # Read the Singer messages from stdin
            redis_client.hset('user:1003', mapping={'name': 'Talia Shire', 'age': 78, 'birthplace': 'Woodhaven, NY'})

        # Sample:
        # ------
        # batch_key = context["batch_id"]
        # context["file_path"] = f"{batch_key}.csv"

    def process_record(self, record: dict, context: dict) -> None:
        """Process the record.

        Developers may optionally read or write additional markers within the
        passed `context` dict from the current batch.

        Args:
            record: Individual record in the stream.
            context: Stream partition or context dictionary.
        """
        # Sample:
        # ------
        # with open(context["file_path"], "a") as csvfile:
        #     csvfile.write(record)
        with redis.Redis(host=self._host, port=self._port, db=self._db) as redis_client:
            # Read the Singer messages from stdin
            redis_client.hset('user:1004', mapping={'name': 'Burt Young', 'age': 83, 'birthplace': 'New York, NY'})



    def process_batch(self, context: dict) -> None:
        """Write out any prepped records and return once fully written.

        Args:
            context: Stream partition or context dictionary.
        """
        schema = None
        key_property = None
        with redis.Redis(host=self._host, port=self._port, db=self._db) as redis_client:
            # Read the Singer messages from stdin
            redis_client.hset('user:1005', mapping={'name': 'Mr. T', 'age': 72, 'birthplace': 'Chicago, IL'})
This is my target.py
Copy code
"""Redis target class."""

from __future__ import annotations

from singer_sdk import typing as th
from singer_sdk.target_base import Target
#import redis

from target_redis.sinks import (
    RedisSink,
)


class TargetRedis(Target):
    """Sample target for Redis."""

    name = "target-redis"

    config_jsonschema = th.PropertiesList(
        th.Property(
            "filepath",
            th.StringType,
            #title="Output File Path",
            #description="The path to the target output file",
        ),
        th.Property(
            "file_naming_scheme",
            th.StringType,
            #title="File Naming Scheme",
            #description="The scheme with which output files will be named",
        ),
        th.Property(
            "host",
            th.StringType,
            default = "localhost",
        ),
        th.Property(
            "port",
            th.IntegerType,
            default = 6379,
        ),
        th.Property(
            "db",
            th.IntegerType,
            default = 0,
        )
    ).to_dict()

    default_sink_class = RedisSink


if __name__ == "__main__":
    TargetRedis.cli()
This is the file structure
h
are you testing locally, using docker or running it in the cloud
1
ty 1
also, would it be possible to share the code in a git repo so we can collaborate easily
e
also: have you tried running the built-in integration tests? https://sdk.meltano.com/en/latest/testing.html#testing-targets
1
ty 1
d
I’m running in docker, would it be better to test it out in a fresh install instead? I’ll try out those testing targets
Rather than trying to add redis to my docker I tried running meltano outside of Docker and I was able to get it to invoke the commands and insert the records into redis! Thanks for taking the time to talk with me about it on the office hours! I can debug it this way now
🙌 2
h
well done @David Peterson. would you (& your employer) consider open sourcing the plugin? I don't think any similar loader plugin exists, so it would be a net new benefit for the community. I'm also curious to understand the use-cases you have for transferring data to Redis. I'm sure a blog on this topic would be very welcome by the folks on this slack and the data engineering community at large.
d
Yeah, I will talk to them, it would be a benefit to users of Redis and Meltano and some nice exposure for our organization
e
I was able to get it to invoke the commands and insert the records into redis!
That's awesome @David Peterson!