Matt Menzenski
08/08/2023, 5:44 PMsinger_sdk.mapper.CustomStreamMap#functions
options? https://github.com/meltano/sdk/blob/7f2df99148e28a65a5dd53f915041fe8f652b03f/singer_sdk/mapper.py#L297-L307
I would like to define a decrypt
function that could be used in the same way the default md5
or datetime
functions can be.
It seems straightforward enough to define a class that extends CustomStreamMap and adds that new function, but in order to use it, it seems like I also need my mapper to also define a class that extends PluginMapper and overrides the register_raw_stream_schema
method (which is large) , as CustomStreamMap
is invoked by name here https://github.com/meltano/sdk/blob/7f2df99148e28a65a5dd53f915041fe8f652b03f/singer_sdk/mapper.py#L731
Is that accurate?visch
08/08/2023, 7:58 PMdef uuid(input: str) -> str:
"""Convert from Hex String to UUID String, Active Directory Specefic
"X'31914b12ca48664990d2c31049c07d00" == 124B9131-48CA-4966-90D2-C31049C07D00
Args:
input: String to digest.
Returns:
A string digested into MD5.
"""
hex_string = input[2:-1]
n = len(hex_string)
binary_data = binascii.unhexlify(hex_string.zfill(n+(n&1)))
return pywintypes.IID(binary_data, True)
class ActiveDirectoryMapper(PluginMapper):
@property
def functions(self) -> Dict[str, Callable]:
funcs = super().functions()
funcs["uuid"] = uuid
I think if you do this in a tap you'd just swap the setup_mapper
function to your new mapper and you're good