I was wondering if I create a new field in post_pr...
# singer-tap-development
n
I was wondering if I create a new field in post_process is it not possible to use it as a replication key? I am working with a data source where one endpoint sends list of objects of different types. The date field which should be used as replication key, is named as loginDate, logoutDate, date in different object. This is the post process code:
Copy code
def post_process(
        self,
        row: dict,
        context: Context | None = None,  # noqa: ARG002
    ) -> dict | None:

        for key in ["loginDate", "logoutDate"]:
            row.pop(key, None) if row.get(key) == 0 else None

        for key in ["loginDate", "logoutDate", "date"]:
            if key in row:
                dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + \
                    timedelta(milliseconds=row[key])
                row["eventDate"] = dt.isoformat().replace("+00:00", "Z")
                del row[key]  # Remove original key
                break

        
        <http://logger.info|logger.info>(row)
        return super().post_process(row, context)
I get this error: Extraction failed singer_sdk.exceptions.InvalidReplicationKeyException: Field 'eventDate' is not in schema for stream 'events' I have added eventDate to the schema.
1
r
Can you show the schema?
n
{
"oneOf": [
{
"type": "object",
"properties": {
"eventDate": { "type": "string" },
"userName": { "type": "string" },
"loginEvent": { "type": "string" },
"ipAddress": { "type": "string" },
"accessSource": { "type": "string" },
"auditSource": { "type": "string" }
},
"required": ["eventDate", "userName", "auditSource"]
},
{
"type": "object",
"properties": {
"eventDate": { "type": "string" },
"sourcePath": { "type": "string" },
"targetPath": { "type": ["string", "null"] },
"user": { "type": "string" },
"userId": { "type": "string" },
"action": { "type": "string" },
"access": { "type": "string" },
"ipAddress": { "type": "string" },
"actionInfo": { "type": "string" },
"checksum": { "type": "string" },
"groupId": { "type": "string" },
"auditSource": { "type": "string" }
},
"required": ["eventDate", "sourcePath", "user", "action", "ipAddress", "auditSource"]
},
{
"type": "object",
"properties": {
"eventDate": { "type": "string" },
"actor": { "type": "string" },
"subject": { "type": "string" },
"action": { "type": "string" },
"actionInfo": { "type": "string" },
"source": { "type": "string" },
"auditSource": { "type": "string" }
},
"required": ["eventDate", "actor", "subject", "action", "auditSource"]
}
]
}
r
I think the SDK expects
properties
to be at the top level of a schema - not
oneOf
. https://github.com/meltano/sdk/blob/6708cb995c68ab6f74d4874dfc8f978c3b054ceb/singer_sdk/streams/core.py#L228-L231 I would make all non-common properties nullable like this:
Copy code
{
  "type": "object",
  "properties": {
    "eventDate": {
      "type": "string"
    },
    "userName": {
      "type": [
        "string",
        "null"
      ]
    },
    "loginEvent": {
      "type": [
        "string",
        "null"
      ]
    },
    "ipAddress": {
      "type": [
        "string",
        "null"
      ]
    },
    "accessSource": {
      "type": [
        "string",
        "null"
      ]
    },
    "sourcePath": {
      "type": [
        "string",
        "null"
      ]
    },
    "targetPath": {
      "type": [
        "string",
        "null"
      ]
    },
    "user": {
      "type": [
        "string",
        "null"
      ]
    },
    "userId": {
      "type": [
        "string",
        "null"
      ]
    },
    "action": {
      "type": [
        "string",
        "null"
      ]
    },
    "access": {
      "type": [
        "string",
        "null"
      ]
    },
    "actionInfo": {
      "type": [
        "string",
        "null"
      ]
    },
    "checksum": {
      "type": [
        "string",
        "null"
      ]
    },
    "groupId": {
      "type": [
        "string",
        "null"
      ]
    },
    "actor": {
      "type": [
        "string",
        "null"
      ]
    },
    "subject": {
      "type": [
        "string",
        "null"
      ]
    },
    "source": {
      "type": [
        "string",
        "null"
      ]
    },
    "auditSource": {
      "type": "string"
    }
  },
  "required": [
    "eventDate",
    "auditSource"
  ]
}
What were you trying to do with
oneOf
?
👍 1
n
Even if it makes others nullable, I am not sure how it makes eventDate as not present or null in the schema. Because I get InvalidReplicationKeyException
r
Did you try it? You need
properties
at the top level of the schema regardless.
🙌 1
n
Yes, it worked thanks. I didn't know "properties" should be top level. I missed reading the core.py snippet you shared.
r
Gotcha 👍 The schema I posted above is just a merge of all your
oneOf
sub-schemas, where properties that are not common are made nullable since there is no guarantee they will be there (guessing from your initial attempt) - maybe I should have been clearer.