I'm running into some memory issues with a stream....
# singer-tap-development
d
I'm running into some memory issues with a stream.
Copy code
class UsersStream(IterableExportStream):
    name = "users"
    api_name = "user"
    replication_key = "profileUpdatedAt"

    def post_process(self, row: dict, context: Optional[dict]) -> dict:
        row = super().post_process(row, context)
        processed_row = self.process_user_fields(row)
        return processed_row

    def process_user_fields(self, row: dict) -> dict:
        user_fields_raw = row.get("user_fields_raw", {})
        normalized_row = {}
        for k, v in row.items():
            if k not in self.schema["properties"]:
                user_fields_raw[k] = v
            else:
                normalized_row[k] = v
        normalized_row["user_fields_raw"] = json.dumps(user_fields_raw)
        
        return normalized_row
Other IterableExportStreams have no issues. What I'm doing here is taking all the variable fields that someone can configure in iterable, and storing them into a json string field to be transformed out later. The json string could easily be 100kb. The problem is that the memory spikes on this stream and the pod gets evicted. Any thoughts/recommendations? This is for the initial backfill and seems I get about 20k rows in before it quickly fails.
v
Could you post the whole tap by any chance? Iterableexporstram at a minimum is needed as well
General answer I'd say use a profiler, good steps on the sdk website! I'm not sure how good that profiler it's at telling you memory usage
https://github.com/bloomberg/memray stumbled on this yesterday 🤷
Please share back with what you find! I'm interested and who knows who may hit this later
d
I'll polish up some stuff and publish the repo, but I don't think it's anything in IterableExportStream as there are many other Streams that inherit from it and run without issue. I looked at viztracer, but yea, it's more focused on runtime than memory. I saw the memray buzz, I'm going to try and take a closer look there!
d
wrestling getting it running in Docker since MacOS isn't supported 😅
a
@dan_ladd - In past experiences, I had a "soft memory limit" and "hard memory limit". Since trash collection in Python (and other software) is often lazy, you might experience an issue where memory doesn't get cleaned up unless there's some memory pressure, from the soft memory limit or other tasks competing for RAM. So, there may be two paths here: 1. Make sure your pointers are released for anything you are done with. (No lists of all the strings, for instance, forcing python to keep stuff in memory.) 2. See if you can tweak tolerances so that Python knows to clean up memory before the Pod gets grumpy. I am by no means an expert in this topic but posting just in case it is helpful.
d
Thanks for the advice AJ, I'll dive in a little deeper. Didn't have much luck with memray. It sure looked cool in live mode, but nothing from the results was really sticking out to me.
ok, I think I found the problem. These Export streams are a bulk export. You request all records between two timestamps and it responds with everything at once (no response limit/pagination). The singer version overcomes this by setting the request to streaming. I'm thinking to override requests_session as follows.
Copy code
@property
    def requests_session(self) -> requests.Session:
        if not self._requests_session or not self._requests_session.stream:
            self._requests_session = requests.Session()
            self._requests_session.stream = True
        return self._requests_session
It's a little hard to search for "request" and "stream" to find if there are any related discussions or examples 😄
v
oh wow your API response streams all the data in, makes a ton of sense how this would happen now 😄