Last question for the day. I am working on a tap ...
# singer-tap-development
s
Last question for the day. I am working on a tap that is an events stream. The source system is a Kafka instance and uses a GraphQL api to accept queries. The system has a marker parameter, which is returned with every pagination. But this marker persists between replications. It marks a point in the kafka topic. As a result, it can be used to incrementally replicate data. However, I can’t figure out how to use it as a replication_key. I get the following error:
Copy code
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/tap_base.py", line 256, in cli
    tap.sync_all()
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/tap_base.py", line 186, in sync_all
    stream.sync()
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/streams/core.py", line 458, in sync
    self._sync_records()
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/streams/core.py", line 433, in _sync_records
    self._increment_stream_state(row_dict, partition=partition)
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/streams/core.py", line 363, in _increment_stream_state
    partition=partition
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/streams/core.py", line 162, in get_replication_key_signpost
    if self.is_timestamp_replication_key:
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/streams/core.py", line 111, in is_timestamp_replication_key
    return is_datetime_type(type_dict)
  File "/Users/stephenlloyd/Library/Caches/pypoetry/virtualenvs/tap-cato-o33NKEet-py3.7/lib/python3.7/site-packages/singer_sdk/helpers/_typing.py", line 53, in is_datetime_type
    raise ValueError("Could not detect type from empty type_dict param.")
ValueError: Could not detect type from empty type_dict param.
How could I get around this? I cannot pass a datetime parameter.
a
@stephen_lloyd - This error occurs when a column type cannot be detected in the json schema definition. Have you defined Stream.schema for the Stream, and did it include the marker column? Adding the marker column as a string type should resolve this error.
On looking more closely at your example, I think I understand that you may not have been intending to store the marker with the record, which is perhaps the issue. Is it acceptable for your use case to also pass the marker as part of the stream data, even if its function is solely as a bookmark?
s
That worked perfectly! Thanks!
a
@stephen_lloyd - Thanks for this - I didn’t like the error message we were sending so I’ve improved it to make it more clear. The updated message will soon prompt you that the problem may be the property not being defined in the schema.