WATCHER: buffer producer dbt events and write _dbt_event once per node#2792
WATCHER: buffer producer dbt events and write _dbt_event once per node#2792pankajastro wants to merge 6 commits into
_dbt_event once per node#2792Conversation
f88d2bb to
8f6aa5f
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves WATCHER-mode producer→consumer dbt log shipping by buffering per-node structured events in memory and flushing to XCom only at terminal states, ensuring reliable error-message capture across dbt invocation modes and preventing duplicate consumer logs caused by repeated polling.
Changes:
- Add an in-memory per-node dbt event buffer in the local watcher producer and flush structured events to XCom at terminal status (with an additional re-flush when late error text arrives).
- Stop the trigger from logging the per-node structured event on every poll; log the structured event once in the consumer at completion.
- Add unit tests covering runner vs subprocess error-message capture, non-overwrite behavior, and buffer-flush guards.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
cosmos/operators/_watcher/base.py |
Implements buffered structured-event accumulation/flush, integrates it into log parsing, and adjusts consumer logging to be “once per node”. |
cosmos/operators/_watcher/triggerer.py |
Removes per-poll structured-event XCom fetch/log to avoid duplicate logs from trigger polling. |
cosmos/operators/watcher.py |
Adds and wires a per-node event buffer into the local watcher producer operator lifecycle. |
tests/operators/_watcher/test_watcher_base.py |
Adds unit tests for buffered event accumulation/flush and error-message capture semantics. |
tests/operators/_watcher/test_triggerer.py |
Updates trigger tests to reflect removal of _log_dbt_event polling behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2792 +/- ##
==========================================
+ Coverage 98.37% 98.39% +0.01%
==========================================
Files 107 107
Lines 8021 8080 +59
==========================================
+ Hits 7891 7950 +59
Misses 130 130 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
@pankajastro, thanks for the improvements! I feel we tried to do too much in a single PR:
- Removing duplicate consumer logs
- Improving tackling the error message across dbt Runner/subprocess
- Changing local watcher
_dbt_eventpersistence from per-event XCom writes to a per-event to an in-memory buffer, flushed at terminal state
The first two seem lower-risk correctness files and can be merged independently - via individual smaller PRs.
The buffering piece is the bigger architectural change because it changes crash behaviour if the producer is OOM-killed, for instance, and overlaps with the #2777 proposal to reduce XCom _dbt_event writes.
Please, split this into smaller PRs.
Also, for each PR, illustrate via the Airflow UI/command line how it looked before/after the fix, detailing which DAG was used to reproduce that use case.
Some concerns about the biggest architectural change - which we could discuss in the follow-up PR:
- Evaluate and review options
This approach is an alternative to ticket #2777, which was also assigned to you during this cycle. It is an alternative to reduce _dbt_event writes. In a dedicated PR, we should review which approach makes the most sense.
- Accuracy during producer hard-killing events
(relates to 0 as well)
One concern with the local watcher buffering approach is that we now keep the structured dbt event in memory and only flush it to XCom at the terminal state, with a possible re-flush when a later error event carries the message. That reduces XCom writes, but it also changes the failure mode if the producer process is hard-killed, for example, by OOMKilled/SIGKILL. In that case, Python will not run the terminal flush, so any in-flight node event that only exists in node_event_buffer is lost.
That may be an acceptable tradeoff, especially if this PR is intentionally optimising local watcher XCom write volume, but I think we should call it out explicitly. Is the intended behaviour that, when a hard producer death occurs, in-flight structured events/error text are lost, and consumers fall back to the producer-failed path? If yes, can we document that in the documentation/PR/comments/tests? If not, we may need some form of periodic checkpointing instead of only terminal flushes.
- Support
ExecutionMode.WATCHER_KUBERNETES
(relates to 0 as well)
Could you confirm if keeping ExecutionMode.WATCHER_KUBERNETES on the unbuffered path is intentional? With #2543 merged, the Kubernetes watcher now has a per-execution state path via callback_extra_kwargs, so it looks like it could also pass a node_event_buffer into store_dbt_resource_status_from_log.
If this PR is intentionally scoped to localExecutionMode.WATCHER only, I think we should make that explicit in the PR description and make sure we create a follow-up PR - to be included in the Cosmos 1.15.0 release - that makes the approach in ExecutionMode.WATCHER_KUBERNETES` consistent. Otherwise, the K8s implementation still keeps the original per-event XCom writes while local WATCHER gets the buffered path - we don't want to have these two patterns in the code base.
|
Thanks @tatiana, agree it was too much in one PR. I've split it:
A few clarifications: Why #2806 is dbt-runner only. The two modes put the error in different places:
So subprocess capture needs retained per-node state to merge "status from the terminal event" + "message from the later error" — which is the buffer. That's why robust subprocess capture rides with the buffering PR (the re-flush + "don't overwrite the error"/"ignore Kubernetes is intentionally scoped out. Local On in-memory loss during a hard producer kill: not a regression. The buffer flushes per node at that node's terminal state, so completed nodes are already in XCom. Only in-flight nodes are lost — but those have no terminal status under the old per-event path either, and the consumer fails them via the producer-failure fallback. Same outcome; we just don't keep partial logs (still in the producer log). I'll call this out in the buffering PR, along with the #2777 comparison. |
## Problem In `ExecutionMode.WATCHER` the producer ships each node's structured dbt event to the consumer via XCom. The error text lives in different places depending on invocation mode: - **dbt-runner mode:** `NodeFinished.data.run_result.message` - **subprocess mode:** `RunResultError.data.msg` `_process_dbt_log_event` read only `data.msg` / `info.msg`, which are **empty on `NodeFinished`**. So a failed **dbt-runner** node reached the consumer with no error text — the consumer logged `[ERROR] Start.. Finish..` with no detail and pointed users at the producer log. ## Fix Read `run_result.message` first when building the per-node `_dbt_event`, so the dbt-runner error reaches the consumer. ## Scope — read before reviewing This is split PR 2 of 3 carved out of #2792 (per review feedback): 1. duplicate consumer logs → #2805 2. **this PR** — dbt-runner error capture, on the existing **per-event** write path 3. per-node event buffering (architectural change) **Important:** this fixes the **dbt-runner** case cleanly and statelessly. The **subprocess** case is *not* fully fixed here: there the error arrives in a *later* `RunResultError` (after the terminal event) and must not be overwritten by a trailing empty message — which requires retained per-node state. That robustness lands with the **buffering** PR (3), where the `RunResultError` re-flush and "don't overwrite the error"/"ignore `\"None\"` status" guards live. ## Testing - Unit test `test_process_dbt_log_event_captures_error_from_run_result_message`: a `NodeFinished` with `run_result.message` set and empty `data.msg`/`info.msg` pushes the error to `_dbt_event`. - Integration test `test_process_dbt_log_event_surfaces_dbt_runner_error_from_real_event` (added per review feedback — aligned to dbt's real event interface, not a mock): runs real dbt-runner against the failing `watcher_failing_tests` project and asserts `_process_dbt_log_event` surfaces the error from the actual `NodeFinished.run_result.message`. It first asserts the premise (`data.msg` empty) and **fails without the fix** (the consumer would otherwise get the generic `Finished running node ...`). - Full `tests/operators/_watcher/` suite passes on the AF 3.0 hatch env; `pre-commit` (ruff/black/mypy) passes. Before/after consumer-log screenshots below. AFTER <img width="1700" height="981" alt="Screenshot 2026-06-27 at 7 15 04 PM" src="https://github.com/user-attachments/assets/8099216d-7097-493e-82d0-3d8e26aa6ebe" /> BEFORE <img width="1708" height="1016" alt="Screenshot 2026-06-27 at 7 17 04 PM" src="https://github.com/user-attachments/assets/1793ce48-b04e-49b6-9e9e-79c6e243b38a" /> Reproduce: add a failing model and run a WATCHER DbtDag (dbt-runner mode); the error now appears in the failing node's **consumer** task log. ```sql select this_column_does_not_exist_at_all ``` Relates to #2456. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
## Problem In `ExecutionMode.WATCHER`, the consumer task log showed the same dbt event line repeated many times — the **#2456** symptom that looked like the dbt command had run twice. **Cause:** the consumer logged the producer's per-node `_dbt_event` on **every poll** of its wait loop — in both the deferrable trigger (`WatcherTrigger.run()`) and the non-deferrable `poke()`. The loop polls until the node reaches a terminal state, so the same event was re-logged on each iteration. ## Fix Log the per-node `_dbt_event` only once, at terminal, in **both** consumer paths: - **Deferrable** (`WatcherTrigger.run()`): drop the per-poll fetch/log from the trigger loop. The consumer logs the event once in `execute_complete`. - **Non-deferrable** (`poke()`): move `_log_dbt_event()` after the `status is None` early-return so it fires once the node is terminal instead of on every poke. No information is lost — the event is still logged once at terminal. ## Scope This is the first of three split PRs carved out of #2792 (per review feedback): 1. **this PR** — duplicate consumer logs 2. error-message capture (dbt-runner mode) — merged in #2806 3. per-node event buffering (the architectural change) ## Testing - `test_run_does_not_log_dbt_event_per_poll`: runs the deferrable trigger over multiple polls and asserts the dbt event line is not logged per poll. - `test_poke_logs_dbt_event_only_at_terminal`: asserts the non-deferrable `poke()` does not log while the node is running and logs exactly once at terminal. - Both fail on the pre-fix code (the duplicated line) and pass now. - Full `tests/operators/_watcher/` suite passes on the AF 3.0 hatch env; `pre-commit` (ruff/black/mypy) passes. Before <img width="1712" height="1025" alt="Screenshot 2026-06-29 at 1 24 57 AM" src="https://github.com/user-attachments/assets/9e15a0f3-8c4b-410e-9168-0912c2c0e21a" /> After <img width="1698" height="1005" alt="Screenshot 2026-06-29 at 1 22 20 AM" src="https://github.com/user-attachments/assets/29aa50a7-0274-4dec-bcaf-2f10c2910bd5" /> Relates to #2456 #2777 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…o duplicates
The watcher producer pushed a structured dbt event to a single per-node XCom
key on every dbt log event, overwriting each time. This caused three problems:
- The detailed error never reached the consumer. The error text lives in
``NodeFinished.run_result.message`` (dbt-runner mode) and in
``RunResultError.data.msg`` (subprocess mode), but the code only read
``data.msg``/``info.msg`` (empty on NodeFinished), so the consumer logged
``[ERROR] Start.. Finish..`` with no message.
- Duplicate log lines: the consumer re-logged the event on every poke / trigger
poll.
- A per-event XCom write per node held the global xcom_set_lock repeatedly,
adding contention to the multi-threaded dbt run.
Fix (local watcher only; Kubernetes watcher keeps the original per-event push
via the no-buffer fallback):
- Accumulate each node's events in an in-memory buffer and flush once to XCom
when the node is terminal, reading the error from run_result.message (runner)
and re-flushing when a later RunResultError carries the message (subprocess).
Stamp the authoritative terminal status and ignore the literal "None" status.
- Consumer logs the event once at terminal (execute_complete for deferrable;
poke for non-deferrable); the trigger no longer logs it every poll.
Verified end-to-end against real dbt runs in both dbt-runner and subprocess
modes: the consumer's _dbt_event now carries {status: error, msg: <dbt error>}.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
_accumulate_dbt_log_event reads the message from run_result.message / data.msg / info.msg, but the re-flush guard only checked data.msg / run_result.message. An error event carrying its text only in info.msg would be buffered but never re-flushed, so the consumer could miss it (subprocess mode). Check info.msg too. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
The Copilot Autofix commit (df2c63e) reworded the buffer comment but also deleted `self._node_event_buffer = {}` from DbtProducerWatcherOperator.__init__. The attribute is still referenced in _make_parse_callable and execute (.clear()), so the producer would raise AttributeError at execute time. Re-add the init. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The refactor dropped the original `if extra_kwargs` guard, leaving several
unconditional `extra_kwargs.get(...)` calls that would raise AttributeError if
extra_kwargs were None/non-mapping. Normalise `extra_kwargs = extra_kwargs or {}`
at the top so all accesses are safe, restoring the original falsy-handling.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
a6d326b to
707096f
Compare
_dbt_event once per node
Problem
In
ExecutionMode.WATCHER, the producer pushed the structured_dbt_eventto XCom on every allowlisted dbt event (~4–5 per node —NodeStart/NodeCompiling/NodeExecuting/NodeFinished), each through the globalxcom_set_lock. This is largely write-amplification: the consumer reads_dbt_eventonly once at terminal (since #2805), so the intermediate pushes are wasted and add lock contention to the multi-threaded dbt run. The per-event overwrite also let a trailingNodeFinishedclobber a captured error message.Part of the #2736 watcher performance regression; delivers the producer-write reduction tracked in #2777.
Change (local watcher producer)
node_event_buffer) and write_dbt_eventto XCom only when the node reaches a terminal state — ~1 write per success node, ~2 per failure — instead of one per event.RunResultError— which arrives after the terminal event — triggers a second flush; the literal"None"status is ignored; the authoritative terminal status is stamped on flush._process_dbt_log_event.No consumer-side change: the consumer already logs
_dbt_eventonce at terminal (execute_complete/poke).Scope
Final of the three split PRs from this work:
Testing
RunResultErrorre-flush,"None"-status guard, and the allowlist / missing-unique_idguards.tests/operators/_watcher/suite passes (309);pre-commit(ruff/black/mypy) passes.References
_dbt_eventonly for a curated subset (allowlist) of dbt event types #2777 — reduce producer_dbt_eventwrites (delivered here via buffering)🤖 Generated with Claude Code