Matt Menzenski
04/06/2023, 5:01 PMimport datetime
from singer_sdk.testing import get_tap_test_class
from tap_mongodb.tap import TapMongoDB
SAMPLE_CONFIG = {
"start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
# TODO: Initialize minimal tap config
}
# Run standard built-in tap tests from the SDK:
TestTapMongoDB = get_tap_test_class(tap_class=TapMongoDB, config=SAMPLE_CONFIG)
# TODO: Create additional tests as appropriate for your tap.
visch
04/06/2023, 5:09 PMMatt Menzenski
04/06/2023, 5:29 PMvisch
04/06/2023, 5:30 PMMatt Menzenski
04/06/2023, 6:21 PMvisch
04/06/2023, 6:22 PMfor stream in catalog1["streams"]:
if stream.get("stream") and stream["stream"] not in ("task", "team"):
for metadata in stream["metadata"]:
metadata["metadata"]["replication-method"] = "FULL_TABLE"
Not I"m hand waving a bit here but generally that's the ideaMatt Menzenski
04/06/2023, 6:30 PMMatt Menzenski
04/06/2023, 6:30 PM