Skip to content

WATCHER: buffer producer dbt events and write _dbt_event once per node#2792

Draft
pankajastro wants to merge 6 commits into
mainfrom
fix-watcher-log-shipping
Draft

WATCHER: buffer producer dbt events and write _dbt_event once per node#2792
pankajastro wants to merge 6 commits into
mainfrom
fix-watcher-log-shipping

Conversation

@pankajastro

@pankajastro pankajastro commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Problem

In ExecutionMode.WATCHER, the producer pushed the structured _dbt_event to XCom on every allowlisted dbt event (~4–5 per node — NodeStart/NodeCompiling/NodeExecuting/NodeFinished), each through the global xcom_set_lock. This is largely write-amplification: the consumer reads _dbt_event only once at terminal (since #2805), so the intermediate pushes are wasted and add lock contention to the multi-threaded dbt run. The per-event overwrite also let a trailing NodeFinished clobber a captured error message.

Part of the #2736 watcher performance regression; delivers the producer-write reduction tracked in #2777.

Change (local watcher producer)

  • Buffer in memory, flush once at terminal. Accumulate each node's events in an in-process per-node buffer (node_event_buffer) and write _dbt_event to XCom only when the node reaches a terminal state — ~1 write per success node, ~2 per failure — instead of one per event.
  • Reliable error message. An error message wins and is never overwritten by a later (often empty) message; subprocess mode's RunResultError — which arrives after the terminal event — triggers a second flush; the literal "None" status is ignored; the authoritative terminal status is stamped on flush.
  • Thread-safe. A lock guards the buffer, since dbt-runner callbacks fire from multiple threads.
  • Backward compatible. Callers that don't pass a buffer (e.g. the Kubernetes watcher) keep the original per-event push via _process_dbt_log_event.

No consumer-side change: the consumer already logs _dbt_event once at terminal (execute_complete/poke).

Scope

Final of the three split PRs from this work:

  1. duplicate consumer logs — merged (Fix duplicate consumer logs in WATCHER mode #2805)
  2. dbt-runner error capture on the per-event path — merged (Capture dbt-runner error message in WATCHER consumer logs #2806)
  3. this PR — per-node event buffering (the architectural change + reduced XCom writes)

Testing

  • Unit tests for the buffered path: terminal flush, error-message-wins, subprocess RunResultError re-flush, "None"-status guard, and the allowlist / missing-unique_id guards.
  • Full tests/operators/_watcher/ suite passes (309); pre-commit (ruff/black/mypy) passes.

References

🤖 Generated with Claude Code

Copilot AI review requested due to automatic review settings June 9, 2026 11:36
@pankajastro pankajastro requested review from a team, corsettigyg, dwreeves and jbandoro as code owners June 9, 2026 11:36
@pankajastro pankajastro requested review from pankajkoti and tatiana June 9, 2026 11:36
@pankajastro pankajastro marked this pull request as draft June 9, 2026 11:36
@pankajastro pankajastro force-pushed the fix-watcher-log-shipping branch from f88d2bb to 8f6aa5f Compare June 9, 2026 11:37

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves WATCHER-mode producer→consumer dbt log shipping by buffering per-node structured events in memory and flushing to XCom only at terminal states, ensuring reliable error-message capture across dbt invocation modes and preventing duplicate consumer logs caused by repeated polling.

Changes:

  • Add an in-memory per-node dbt event buffer in the local watcher producer and flush structured events to XCom at terminal status (with an additional re-flush when late error text arrives).
  • Stop the trigger from logging the per-node structured event on every poll; log the structured event once in the consumer at completion.
  • Add unit tests covering runner vs subprocess error-message capture, non-overwrite behavior, and buffer-flush guards.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
cosmos/operators/_watcher/base.py Implements buffered structured-event accumulation/flush, integrates it into log parsing, and adjusts consumer logging to be “once per node”.
cosmos/operators/_watcher/triggerer.py Removes per-poll structured-event XCom fetch/log to avoid duplicate logs from trigger polling.
cosmos/operators/watcher.py Adds and wires a per-node event buffer into the local watcher producer operator lifecycle.
tests/operators/_watcher/test_watcher_base.py Adds unit tests for buffered event accumulation/flush and error-message capture semantics.
tests/operators/_watcher/test_triggerer.py Updates trigger tests to reflect removal of _log_dbt_event polling behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/base.py
@codecov

codecov Bot commented Jun 9, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.39%. Comparing base (0f4182a) to head (a6d326b).
⚠️ Report is 4 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2792      +/-   ##
==========================================
+ Coverage   98.37%   98.39%   +0.01%     
==========================================
  Files         107      107              
  Lines        8021     8080      +59     
==========================================
+ Hits         7891     7950      +59     
  Misses        130      130              

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajastro pankajastro marked this pull request as ready for review June 9, 2026 12:15
Copilot AI review requested due to automatic review settings June 9, 2026 12:15

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/watcher.py
Copilot AI review requested due to automatic review settings June 9, 2026 12:20

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/_watcher/base.py

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.

@tatiana tatiana added priority:high High priority issues are blocking or critical issues without a workaround and large impact roadmap:P1 BOSS roadmap-committed work (priority P1) origin:internal Authored by an Astronomer engineer review:complex Substantial review: large, multi-module, or domain-heavy review:high-risk Architectural change or critical path — two reviewers required labels Jun 10, 2026

@tatiana tatiana left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajastro, thanks for the improvements! I feel we tried to do too much in a single PR:

  1. Removing duplicate consumer logs
  2. Improving tackling the error message across dbt Runner/subprocess
  3. Changing local watcher _dbt_event persistence from per-event XCom writes to a per-event to an in-memory buffer, flushed at terminal state

The first two seem lower-risk correctness files and can be merged independently - via individual smaller PRs.

The buffering piece is the bigger architectural change because it changes crash behaviour if the producer is OOM-killed, for instance, and overlaps with the #2777 proposal to reduce XCom _dbt_event writes.

Please, split this into smaller PRs.

Also, for each PR, illustrate via the Airflow UI/command line how it looked before/after the fix, detailing which DAG was used to reproduce that use case.

Some concerns about the biggest architectural change - which we could discuss in the follow-up PR:

  1. Evaluate and review options

This approach is an alternative to ticket #2777, which was also assigned to you during this cycle. It is an alternative to reduce _dbt_event writes. In a dedicated PR, we should review which approach makes the most sense.

  1. Accuracy during producer hard-killing events

(relates to 0 as well)

One concern with the local watcher buffering approach is that we now keep the structured dbt event in memory and only flush it to XCom at the terminal state, with a possible re-flush when a later error event carries the message. That reduces XCom writes, but it also changes the failure mode if the producer process is hard-killed, for example, by OOMKilled/SIGKILL. In that case, Python will not run the terminal flush, so any in-flight node event that only exists in node_event_buffer is lost.

That may be an acceptable tradeoff, especially if this PR is intentionally optimising local watcher XCom write volume, but I think we should call it out explicitly. Is the intended behaviour that, when a hard producer death occurs, in-flight structured events/error text are lost, and consumers fall back to the producer-failed path? If yes, can we document that in the documentation/PR/comments/tests? If not, we may need some form of periodic checkpointing instead of only terminal flushes.

  1. Support ExecutionMode.WATCHER_KUBERNETES

(relates to 0 as well)

Could you confirm if keeping ExecutionMode.WATCHER_KUBERNETES on the unbuffered path is intentional? With #2543 merged, the Kubernetes watcher now has a per-execution state path via callback_extra_kwargs, so it looks like it could also pass a node_event_buffer into store_dbt_resource_status_from_log.

If this PR is intentionally scoped to localExecutionMode.WATCHER only, I think we should make that explicit in the PR description and make sure we create a follow-up PR - to be included in the Cosmos 1.15.0 release - that makes the approach in ExecutionMode.WATCHER_KUBERNETES` consistent. Otherwise, the K8s implementation still keeps the original per-event XCom writes while local WATCHER gets the buffered path - we don't want to have these two patterns in the code base.

@pankajastro

Copy link
Copy Markdown
Contributor Author

Thanks @tatiana, agree it was too much in one PR. I've split it:

A few clarifications:

Why #2806 is dbt-runner only. The two modes put the error in different places:

  • dbt-runner: in NodeFinished.run_result.message — i.e. on the terminal event itself, so a single per-event push captures it. Clean and stateless.
  • subprocess: in a RunResultError that arrives after the terminal event, with node_status="None". On the per-event path the later event would overwrite the terminal status, and a trailing empty message could wipe the error.

So subprocess capture needs retained per-node state to merge "status from the terminal event" + "message from the later error" — which is the buffer. That's why robust subprocess capture rides with the buffering PR (the re-flush + "don't overwrite the error"/"ignore None status" guards), not #2806.

Kubernetes is intentionally scoped out. Local WATCHER gets the buffered path; WATCHER_KUBERNETES stays per-event. Bringing K8s in line is already tracked by #2457, so I'll do it there rather than open a new follow-up — agreed we shouldn't keep two patterns long-term.

On in-memory loss during a hard producer kill: not a regression. The buffer flushes per node at that node's terminal state, so completed nodes are already in XCom. Only in-flight nodes are lost — but those have no terminal status under the old per-event path either, and the consumer fails them via the producer-failure fallback. Same outcome; we just don't keep partial logs (still in the producer log). I'll call this out in the buffering PR, along with the #2777 comparison.

@pankajastro pankajastro marked this pull request as draft June 17, 2026 19:29
pankajastro added a commit that referenced this pull request Jun 27, 2026
## Problem

In `ExecutionMode.WATCHER` the producer ships each node's structured dbt
event to the consumer via XCom. The error text lives in different places
depending on invocation mode:

- **dbt-runner mode:** `NodeFinished.data.run_result.message`
- **subprocess mode:** `RunResultError.data.msg`

`_process_dbt_log_event` read only `data.msg` / `info.msg`, which are
**empty on `NodeFinished`**. So a failed **dbt-runner** node reached the
consumer with no error text — the consumer logged `[ERROR] Start..
Finish..` with no detail and pointed users at the producer log.

## Fix

Read `run_result.message` first when building the per-node `_dbt_event`,
so the dbt-runner error reaches the consumer.

## Scope — read before reviewing

This is split PR 2 of 3 carved out of #2792 (per review feedback):
1. duplicate consumer logs → #2805
2. **this PR** — dbt-runner error capture, on the existing **per-event**
write path
3. per-node event buffering (architectural change)

**Important:** this fixes the **dbt-runner** case cleanly and
statelessly. The **subprocess** case is *not* fully fixed here: there
the error arrives in a *later* `RunResultError` (after the terminal
event) and must not be overwritten by a trailing empty message — which
requires retained per-node state. That robustness lands with the
**buffering** PR (3), where the `RunResultError` re-flush and "don't
overwrite the error"/"ignore `\"None\"` status" guards live.

## Testing

- Unit test
`test_process_dbt_log_event_captures_error_from_run_result_message`: a
`NodeFinished` with `run_result.message` set and empty
`data.msg`/`info.msg` pushes the error to `_dbt_event`.
- Integration test
`test_process_dbt_log_event_surfaces_dbt_runner_error_from_real_event`
(added per review feedback — aligned to dbt's real event interface, not
a mock): runs real dbt-runner against the failing
`watcher_failing_tests` project and asserts `_process_dbt_log_event`
surfaces the error from the actual `NodeFinished.run_result.message`. It
first asserts the premise (`data.msg` empty) and **fails without the
fix** (the consumer would otherwise get the generic `Finished running
node ...`).
- Full `tests/operators/_watcher/` suite passes on the AF 3.0 hatch env;
`pre-commit` (ruff/black/mypy) passes.

Before/after consumer-log screenshots below.

AFTER

<img width="1700" height="981" alt="Screenshot 2026-06-27 at 7 15 04 PM"
src="https://github.com/user-attachments/assets/8099216d-7097-493e-82d0-3d8e26aa6ebe"
/>

BEFORE

<img width="1708" height="1016" alt="Screenshot 2026-06-27 at 7 17 04
PM"
src="https://github.com/user-attachments/assets/1793ce48-b04e-49b6-9e9e-79c6e243b38a"
/>

Reproduce: add a failing model and run a WATCHER DbtDag (dbt-runner
mode); the error now appears in the failing node's **consumer** task
log.
```sql
select this_column_does_not_exist_at_all
```
Relates to #2456.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
pankajastro added a commit that referenced this pull request Jun 28, 2026
## Problem

In `ExecutionMode.WATCHER`, the consumer task log showed the same dbt
event line repeated many times — the **#2456** symptom that looked like
the dbt command had run twice.

**Cause:** the consumer logged the producer's per-node `_dbt_event` on
**every poll** of its wait loop — in both the deferrable trigger
(`WatcherTrigger.run()`) and the non-deferrable `poke()`. The loop polls
until the node reaches a terminal state, so the same event was re-logged
on each iteration.

## Fix

Log the per-node `_dbt_event` only once, at terminal, in **both**
consumer paths:
- **Deferrable** (`WatcherTrigger.run()`): drop the per-poll fetch/log
from the trigger loop. The consumer logs the event once in
`execute_complete`.
- **Non-deferrable** (`poke()`): move `_log_dbt_event()` after the
`status is None` early-return so it fires once the node is terminal
instead of on every poke.

No information is lost — the event is still logged once at terminal.

## Scope

This is the first of three split PRs carved out of #2792 (per review
feedback):
1. **this PR** — duplicate consumer logs
2. error-message capture (dbt-runner mode) — merged in #2806
3. per-node event buffering (the architectural change)

## Testing

- `test_run_does_not_log_dbt_event_per_poll`: runs the deferrable
trigger over multiple polls and asserts the dbt event line is not logged
per poll.
- `test_poke_logs_dbt_event_only_at_terminal`: asserts the
non-deferrable `poke()` does not log while the node is running and logs
exactly once at terminal.
- Both fail on the pre-fix code (the duplicated line) and pass now.
- Full `tests/operators/_watcher/` suite passes on the AF 3.0 hatch env;
`pre-commit` (ruff/black/mypy) passes.

Before

<img width="1712" height="1025" alt="Screenshot 2026-06-29 at 1 24
57 AM"
src="https://github.com/user-attachments/assets/9e15a0f3-8c4b-410e-9168-0912c2c0e21a"
/>


After
<img width="1698" height="1005" alt="Screenshot 2026-06-29 at 1 22
20 AM"
src="https://github.com/user-attachments/assets/29aa50a7-0274-4dec-bcaf-2f10c2910bd5"
/>



Relates to #2456 #2777

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
pankajastro and others added 6 commits June 29, 2026 01:46
…o duplicates

The watcher producer pushed a structured dbt event to a single per-node XCom
key on every dbt log event, overwriting each time. This caused three problems:

- The detailed error never reached the consumer. The error text lives in
  ``NodeFinished.run_result.message`` (dbt-runner mode) and in
  ``RunResultError.data.msg`` (subprocess mode), but the code only read
  ``data.msg``/``info.msg`` (empty on NodeFinished), so the consumer logged
  ``[ERROR] Start.. Finish..`` with no message.
- Duplicate log lines: the consumer re-logged the event on every poke / trigger
  poll.
- A per-event XCom write per node held the global xcom_set_lock repeatedly,
  adding contention to the multi-threaded dbt run.

Fix (local watcher only; Kubernetes watcher keeps the original per-event push
via the no-buffer fallback):

- Accumulate each node's events in an in-memory buffer and flush once to XCom
  when the node is terminal, reading the error from run_result.message (runner)
  and re-flushing when a later RunResultError carries the message (subprocess).
  Stamp the authoritative terminal status and ignore the literal "None" status.
- Consumer logs the event once at terminal (execute_complete for deferrable;
  poke for non-deferrable); the trigger no longer logs it every poll.

Verified end-to-end against real dbt runs in both dbt-runner and subprocess
modes: the consumer's _dbt_event now carries {status: error, msg: <dbt error>}.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
_accumulate_dbt_log_event reads the message from run_result.message / data.msg
/ info.msg, but the re-flush guard only checked data.msg / run_result.message.
An error event carrying its text only in info.msg would be buffered but never
re-flushed, so the consumer could miss it (subprocess mode). Check info.msg too.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
The Copilot Autofix commit (df2c63e) reworded the buffer comment but also
deleted `self._node_event_buffer = {}` from DbtProducerWatcherOperator.__init__.
The attribute is still referenced in _make_parse_callable and execute (.clear()),
so the producer would raise AttributeError at execute time. Re-add the init.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The refactor dropped the original `if extra_kwargs` guard, leaving several
unconditional `extra_kwargs.get(...)` calls that would raise AttributeError if
extra_kwargs were None/non-mapping. Normalise `extra_kwargs = extra_kwargs or {}`
at the top so all accesses are safe, restoring the original falsy-handling.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@pankajastro pankajastro force-pushed the fix-watcher-log-shipping branch from a6d326b to 707096f Compare June 28, 2026 20:18
@pankajastro pankajastro changed the title Fix watcher producer→consumer log shipping: reliable error capture, no duplicates WATCHER: buffer producer dbt events and write _dbt_event once per node Jun 28, 2026
@tatiana tatiana removed priority:high High priority issues are blocking or critical issues without a workaround and large impact roadmap:P1 BOSS roadmap-committed work (priority P1) labels Jun 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

origin:internal Authored by an Astronomer engineer review:complex Substantial review: large, multi-module, or domain-heavy review:high-risk Architectural change or critical path — two reviewers required

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants