What’s the simplest way to add a new function to t...
# troubleshooting
m
What’s the simplest way to add a new function to the
singer_sdk.mapper.CustomStreamMap#functions
options? https://github.com/meltano/sdk/blob/7f2df99148e28a65a5dd53f915041fe8f652b03f/singer_sdk/mapper.py#L297-L307 I would like to define a
decrypt
function that could be used in the same way the default
md5
or
datetime
functions can be. It seems straightforward enough to define a class that extends CustomStreamMap and adds that new function, but in order to use it, it seems like I also need my mapper to also define a class that extends PluginMapper and overrides the
register_raw_stream_schema
method (which is large) , as
CustomStreamMap
is invoked by name here https://github.com/meltano/sdk/blob/7f2df99148e28a65a5dd53f915041fe8f652b03f/singer_sdk/mapper.py#L731 Is that accurate?
v
copy pasted this form one of mine here's prety much all the code in mine is
Copy code
def uuid(input: str) -> str:
    """Convert from Hex String to UUID String, Active Directory Specefic
    "X'31914b12ca48664990d2c31049c07d00" == 124B9131-48CA-4966-90D2-C31049C07D00

    Args:
        input: String to digest.

    Returns:
        A string digested into MD5.
    """
    hex_string = input[2:-1]
    n = len(hex_string)
    binary_data =  binascii.unhexlify(hex_string.zfill(n+(n&1)))
    return pywintypes.IID(binary_data, True) 

class ActiveDirectoryMapper(PluginMapper):
    
    @property
    def functions(self) -> Dict[str, Callable]:
        funcs = super().functions()
        funcs["uuid"] = uuid
I think if you do this in a tap you'd just swap the
setup_mapper
function to your new mapper and you're good