Hi, I wish to bring up another use case of data fr...
# best-practices
h
Hi, I wish to bring up another use case of data freshness check. We plan to setup a Reverse EL pipeline to replicate a user metrics mart view from data warehouse to one of our operational postges DB every N minutes. Our internal dev team that will consume the replicated user metrics in operational DB raise a question on how they can check if the metrics are up-to-data, particularly they do not want to consume the data during the tap-target replication is in-progress to avoid consuming half updated half outdated data. Is this a possible problem? I'm curious how do you usually solve this problem, as I imagine this would be a common problem that already has a well known solution?
h
Sounds like the dev team might want to do incremental ingest too? I have a somewhat different but perhaps related situation, where I do a map-transform to add a randomly generated ID to each batch as a new column, and then hit an API endpoint where I pass in the batch ID when the load is done and the consumption team then know which rows to load. I run this via Prefect so that I can do the API thing seamlessly as part of the run.
a
That sounds interesting, are you running meltano through prefect?
h
Interesting approach. We're toying with another idea to maintain a separate "freshness logs" table to insert a log entry row when the Reverse EL job is completed. The dev team can automate the freshness check by querying this table in the operational postgres DB. But I'm curious, is this a common approach? Do you see any downsides of using this approach? I'm also curious if Meltano is interested to eventually provide a built-in reusable solution to address the freshness check use cases? cc @aaronsteers
a
I'm also curious if Meltano is interested to eventually provide a built-in reusable solution to address the freshness check use cases?
Yes. 100%.
I'm not sure of the right paradigm as of yet, but on principal, I think this makes sense.
The most direct (and perhaps least satisfying?) solution is to simply introduce a failure condition where the invocation would fail if the freshness is not up to standard, or if refresh is still wip. Is that sufficient for a solution here?
h
For our use cases, what really matters to the consumer of data marts is they can first self-check to ensure the data freshness meets thier expectation before consuming the data. This self-check step needs to be automated. A more concrete example: We imagine the app service will be able to self-check a data mart on whether it meets a specific freshness requirement, otherwise wait for the next data pipeline run and retry again automatically.
a
Okay. Then if I understand correctly that's different than what I was proposing. Rather than fail the pipeline if freshness tests fail, it sounds like your use case is better served by a metric or API that can be queried, leaving to the caller what to do with the result. Did I understand correctly the use case?
h
Yes, correct.
Storing this metric together with the replicated data in the operational db (for reverse EL use case mentioned above) and within the warehouse (normal warehouse usage) will be better than requiring another API connection.
I imagine the caller (data consumer) can just send a simple SQL query to self-check the freshness, followed by the actual data query to consume the data.
But again, I'm also curious if there are other common practices that can solve this problem in a more elegant way?
a
Makes sense. I think there are two general approaches. First option is to expose the freshness of each table, prioritizing freedom and simplicity overall. However, there is also a second option which is to publish to versioned namespaces, such that an omitted or incomplete dataset is opaque and obvious. So, in the second approach, rather than a simple replace/update strategy per table, you take a namespace- or domain-level approach to the publish operation
The simplest version of the second approach would be something like a date in the target schema name. But there are alterations on this approach that prioritize usability - such as remaining the prior schema before publish, and then publishing into a fresh schema at the start of each cycle.
Another alteration is that you have three publish domains:
best/latest
,
current
, and
dated/versioned
. In that approach,
current
might have omissions if some datasets have not completed.
h
Interesting, I'm new to the "publish domain" concept. In our Reverse EL use case, we will publish the user segmentation mart to a Postgres operational database. How would we set up a
current
"domain" in Postgres, will it be a new Postgres schema? It's not clear to me how would we copy the user segmentation mart between different domains.
btw, given there are several options, if Meltano were to design a built-in solution, which approach is more likely to be considered?
a
Sure. So, in this case, you could put an environment variable into the schema names or database names, so that each time you publish, the output table names are in a distinct "domain" or "namespace" that is scoped with that variable. If you use timestamp as the seed for that string, for example, you might end up with table names like
finance_20230321.users
and
finance_20230321.orders
. Then, if the
users
table doesn't exist, it's obvious that some part of the reverse EL has not completed yet. I believe postgres also supports renaming schemas, so at the very end of your process, you could optionally do something like this to make sure users still have a single place to query the "latest" datasets:
Copy code
drop schema if exists finance_yesterday_bak;
alter schema finance_latest rename to finance_yesterday_bak;
alter schema finance_20230321 rename to finance_latest;
Since these rename operations are logical operations and don't require any data processing, the "switch" process can complete very quickly.
btw, given there are several options, if Meltano were to design a built-in solution, which approach is more likely to be considered?
This is hard to say. Something like what I describe above is pretty close to being doable with a pre script to initialize env vars and a post command to do the swaps if needed. But there are other ways to solve this problem - such as connecting with dbt's "freshness" feature or Meltano having its own end-to-end dag as a superset of the dag that dbt maintains. We've discussed these internally but there's a lot of complexity in finding a good solution, likely including the need to integrate with while also expanding upon the related core dbt features. (And we'd want to make sure the task isn't already achievable with dbt-level features since these are a lot of effort and time to implement.)
I'm curious based on your use case what your ideal solution would look like. Is it sufficient to swap in tables or schemas in an explicit way so that users know that the data is updated or not? Or are you thinking to add a "updated_as_of" column to the published tables, or something different altogether?
h
We have 2 different use cases so far: 1.
multiple reporting services ---(consume)--> Warehouse (Redshift)
- the mart tables of the warehouse are usually huge, and we will use incremental materialization strategy. If we use the "schema swap" approach, does it mean we need to re-materialize full tables to a new schema in every pipeline run? I assume this approach will not work for huge incremental mart tables correct? 2.
Meltano job --(publish marts)---> Operational DB (Postgres)
(Reverse EL) - the mart tables to publish are relatively smaller. Therefore I think the "creating a new schema and replicate the marts as full table" approach will still work, but ideally, we want a general solution that can solve both use cases.
Or are you thinking to add a "updated_as_of" column to the published tables,
this seems like a general solution that solve both use cases, wdyt?
a
> Or are you thinking to add a "updated_as_of" column to the published tables,
this seems like a general solution that solve both use cases, wdyt?
Yes, this is a good solution. It's hard to implement though robustly - and the only solutions that readily come to mind are ones that are implemented (primarily) in the dbt layer (as macros or sql transforms). As noted, it is easier to make freshness a blocking condition than to merge statuses on different tables which might have different levels of adherence to a freshness policy.
If we use the "schema swap" approach, does it mean we need to re-materialize full tables to a new schema in every pipeline run?
The options for this depend on the backend database. For Snowflake you have zero-copy clones that can mitigate the duplication, but for Redshift/Postgres, I don't know if that's an option.
@huiming - Apologies if some of the ideas are red herrings and not a good fit for your particular case. If by any chance you have time to join today for office hours in 30 minutes, this would make a great discussion topic. (Zoom link in #CFG3C3C66 channel.)
h
ah, I'm located in Asia/Singapore (+0800) time zone, the office hours is live at my late night, so usually I watch the playback next day, haha.
and the only solutions that readily come to mind are ones that are implemented (primarily) in the dbt layer (as macros or sql transforms)
Update: @adrian_soltesz from our team built a simple working PoC yesterday using dbt macros. We can share with the community once we have a production-ready version.