Hi, I'm trying to find a way to achieve a stream's...
# troubleshooting
p
Hi, I'm trying to find a way to achieve a stream's self recursion or two streams recursing with parent-child on eachother (planning to write base conditions overriding request_records). For apis where you get composite data, like a {folder_id}/items api that gives both folders, and files but for folders you would want to do another recursion. Can anyone help me if there's present capability to do that? @Edgar Ramírez (Arch.dev)
e
I haven't attempted anything like that before. What have you tried so far?
p
Hi, I tried instantiating parent_stream_type with self overriding constructor and making loop with two classes with parent_stream_type in constructor both didn't work.
e
didn't work.
how did it fail?
p
adding parent_stream_type in constructor to self and get_child_context didn't produce any child stream, making a loop with two streams didn't produce any output. @Edgar Ramírez (Arch.dev)
the self loop stream class
Copy code
class RootStream(boxStream):
    """Define custom stream."""
    name = "root_files"
    path = f"/folders/0/items?fields=id,name,type,created_at,modified_at,webViewLink,extension,size"
    primary_keys: t.ClassVar[list[str]] = ["id"]
    replication_key = "modified_at"
    records_jsonpath = "entries.[*]"
    # Optionally, you may also use `schema_filepath` in place of `schema`:
    # schema_filepath = SCHEMAS_DIR / "users.json"  # noqa: ERA001

    def __init__(self, *args, **kwargs) -> None:
        parent_stream_type = RootStream 
        super().__init__(*args, **kwargs)

    schema = th.PropertiesList(
        th.Property("type", th.StringType),
        th.Property(
            "id",
            th.StringType,
            description="The File's system ID",
        ),
        th.Property(
            "extension",
            th.StringType,
            description="mime type of file/folder",
        ),
        th.Property(
            "name",
            th.StringType,
            description="name of file/folder",
        ),
        # th.Property(
        #     "shared_link.url",
        #     th.ArrayType(th.StringType),
        #     description="Shared link url",
        # ),
        th.Property(
            "modified_at",
            th.StringType,
            description="modified time of file/folder",
        ),
        th.Property(
            "created_at",
            th.StringType,
            description="created time of file/folder",
        ),
        th.Property(
            "size",
            th.IntegerType,
            description="size of file",
        ),
        additional_properties=True
        ).to_dict()
    # <https://www.googleapis.com/drive/v3/files/1Thv4cb7nLdUyBX882jeiLlmVa2QuP1m_?alt=media>


    def get_url_params(self, context, next_page_token):
        """Augment default to implement incremental syncing."""
        params = super().get_url_params(context, next_page_token)
        if context:
            self.path = f"/folders/{context['folder_id']}/items?fields=id,name,type,created_at,modified_at,webViewLink,extension,size"
        <http://self.logger.info|self.logger.info>(f"URL params: {context}, {self.path}")
        return params
 
    def get_child_context(self, record, context):
        <http://self.logger.info|self.logger.info>(f"get_child_context: {record}, {context}")
        if record["type"] == "file":
            return None
        else:
            return {"folder_id": record["id"], "folder_name": record["name"]}
    
    def post_process(self, row, context=None):
        row = super().post_process(row, context)
        if row['type'] == 'folder':
            <http://self.logger.info|self.logger.info>(f"Root folder processing: {row['name']}")
        else:
            <http://self.logger.info|self.logger.info>(f"Root file processing: {row['name']}")
        return row
Hi @Edgar Ramírez (Arch.dev), also trying to know if there's a way to run meltano multi-threaded (by invoking directly the singer sdk routine). I'm able to invoke cli via
from meltano.cli import main
. but since it's cli function I think I'm receiving the following the error.
Copy code
❯ python src/main.py meltano
{"event": "Environment 'dev' is active", "level": "info", "timestamp": "2024-07-22T20:45:12.499493Z"}
{"event": "Environment 'dev' is active", "level": "info", "timestamp": "2024-07-22T20:45:12.500550Z"}
{"event": "signal only works in main thread of the main interpreter", "exc_info": ["<class 'ValueError'>", "ValueError('signal only works in main thread of the main interpreter')", "<traceback object at 0x10897be00>"], "level": "error", "timestamp": "2024-07-22T20:45:12.594961Z"}
{"event": "signal only works in main thread of the main interpreter", "exc_info": ["<class 'ValueError'>", "ValueError('signal only works in main thread of the main interpreter')", "<traceback object at 0x108926880>"], "level": "error", "timestamp": "2024-07-22T20:45:12.595597Z"}
Need help fixing this problem? Visit <http://melta.no/> for troubleshooting steps, or to
join our friendly Slack community.

signal only works in main thread of the main interpreter
Need help fixing this problem? Visit <http://melta.no/> for troubleshooting steps, or to
join our friendly Slack community.

signal only works in main thread of the main interpreter
r
If you want to invoke the CLI programmatically, I would just use
subprocess
. Importing and calling CLI methods is definitely not supported, or desirable probably. https://github.com/meltano/meltano/issues/2414