dan_ladd
04/21/2022, 10:22 AMclass UsersStream(IterableExportStream):
name = "users"
api_name = "user"
replication_key = "profileUpdatedAt"
def post_process(self, row: dict, context: Optional[dict]) -> dict:
row = super().post_process(row, context)
processed_row = self.process_user_fields(row)
return processed_row
def process_user_fields(self, row: dict) -> dict:
user_fields_raw = row.get("user_fields_raw", {})
normalized_row = {}
for k, v in row.items():
if k not in self.schema["properties"]:
user_fields_raw[k] = v
else:
normalized_row[k] = v
normalized_row["user_fields_raw"] = json.dumps(user_fields_raw)
return normalized_row
Other IterableExportStreams have no issues. What I'm doing here is taking all the variable fields that someone can configure in iterable, and storing them into a json string field to be transformed out later. The json string could easily be 100kb.
The problem is that the memory spikes on this stream and the pod gets evicted. Any thoughts/recommendations? This is for the initial backfill and seems I get about 20k rows in before it quickly fails.visch
04/21/2022, 11:27 AMvisch
04/21/2022, 11:28 AMvisch
04/21/2022, 11:29 AMvisch
04/21/2022, 11:30 AMdan_ladd
04/21/2022, 11:34 AMvisch
04/21/2022, 12:15 PMdan_ladd
04/21/2022, 12:17 PMaaronsteers
04/21/2022, 3:13 PMdan_ladd
04/21/2022, 4:28 PMdan_ladd
04/25/2022, 1:27 PM@property
def requests_session(self) -> requests.Session:
if not self._requests_session or not self._requests_session.stream:
self._requests_session = requests.Session()
self._requests_session.stream = True
return self._requests_session
It's a little hard to search for "request" and "stream" to find if there are any related discussions or examples 😄visch
04/25/2022, 1:32 PM