dan_ladd
04/21/2022, 10:22 AMclass UsersStream(IterableExportStream):
name = "users"
api_name = "user"
replication_key = "profileUpdatedAt"
def post_process(self, row: dict, context: Optional[dict]) -> dict:
row = super().post_process(row, context)
processed_row = self.process_user_fields(row)
return processed_row
def process_user_fields(self, row: dict) -> dict:
user_fields_raw = row.get("user_fields_raw", {})
normalized_row = {}
for k, v in row.items():
if k not in self.schema["properties"]:
user_fields_raw[k] = v
else:
normalized_row[k] = v
normalized_row["user_fields_raw"] = json.dumps(user_fields_raw)
return normalized_row
Other IterableExportStreams have no issues. What I'm doing here is taking all the variable fields that someone can configure in iterable, and storing them into a json string field to be transformed out later. The json string could easily be 100kb.
The problem is that the memory spikes on this stream and the pod gets evicted. Any thoughts/recommendations? This is for the initial backfill and seems I get about 20k rows in before it quickly fails.