https://linen.dev logo
#singer-tap-development
Title
# singer-tap-development
d

dan_ladd

04/21/2022, 10:22 AM
I'm running into some memory issues with a stream.
Copy code
class UsersStream(IterableExportStream):
    name = "users"
    api_name = "user"
    replication_key = "profileUpdatedAt"

    def post_process(self, row: dict, context: Optional[dict]) -> dict:
        row = super().post_process(row, context)
        processed_row = self.process_user_fields(row)
        return processed_row

    def process_user_fields(self, row: dict) -> dict:
        user_fields_raw = row.get("user_fields_raw", {})
        normalized_row = {}
        for k, v in row.items():
            if k not in self.schema["properties"]:
                user_fields_raw[k] = v
            else:
                normalized_row[k] = v
        normalized_row["user_fields_raw"] = json.dumps(user_fields_raw)
        
        return normalized_row
Other IterableExportStreams have no issues. What I'm doing here is taking all the variable fields that someone can configure in iterable, and storing them into a json string field to be transformed out later. The json string could easily be 100kb. The problem is that the memory spikes on this stream and the pod gets evicted. Any thoughts/recommendations? This is for the initial backfill and seems I get about 20k rows in before it quickly fails.