fix(runs): expose active progress counters#3148
Conversation
|
A few trade-off notes: This PR uses settled usage snapshots, not token/chunk-level telemetry. With the current
I also left interrupted-run accounting unchanged here, since that feels like a separate product/accounting semantic decision from active progress visibility. |
|
@kibabsquirrel Thanks for working on this. Overall, I think the scope is reasonable. Focusing this PR on active progress counters, while leaving durable trace / raw stream persistence out of scope, keeps the change bounded. The settled-usage snapshot approach also makes sense to me. Keeping A few review notes:
Your trade-off note mentions leading + final snapshots. My main concern is the final snapshot part.
await self._progress_reporter(self.get_completion_data())Then completion = journal.get_completion_data()
await run_manager.update_run_completion(run_id, status=record.status.value, **completion)Given the recent SQLite/write-pressure issues, I think we should avoid unnecessary duplicate writes on run finalization. Is the final progress write in
The PR describes this as active/running progress visibility, but I think we should either:
Otherwise this API may be easy to misuse later and could become semantic maintenance debt.
progress_flush_interval = 0.01
await asyncio.sleep(0.03)It passed locally, but this kind of timing-based async test can become flaky under CI load. It may be more robust to drive it with an
The token/message counter fields are now repeated through No blocker from tests/lint on my side, but I would prefer discussing points 1 and 2 with you before merge. |
There was a problem hiding this comment.
Pull request overview
Adds near-real-time visibility into active run progress (tokens, LLM calls, message counts) by periodically persisting settled usage snapshots during execution, and optionally including running runs in thread-level token aggregation.
Changes:
- Implement throttled progress snapshot reporting in
RunJournal, wired from the worker toRunManager.update_run_progress(). - Extend run stores/repository to persist progress snapshots and optionally include
runningruns in/token-usageaggregation (include_active=true). - Expose token/message counters on run list/detail responses and add/extend tests for the new behavior.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/packages/harness/deerflow/runtime/journal.py | Adds throttled progress snapshot scheduling + final flush behavior. |
| backend/packages/harness/deerflow/runtime/runs/worker.py | Wires RunJournal.progress_reporter to persist active progress via RunManager. |
| backend/packages/harness/deerflow/runtime/runs/manager.py | Adds in-memory + store-backed update_run_progress() and hydrates counters from store rows. |
| backend/packages/harness/deerflow/runtime/runs/store/base.py | Extends the RunStore interface with update_run_progress() and include_active aggregation flag. |
| backend/packages/harness/deerflow/runtime/runs/store/memory.py | Implements progress snapshot persistence and include_active aggregation for the in-memory store. |
| backend/packages/harness/deerflow/persistence/run/sql.py | Implements SQL-backed update_run_progress() and include_active token aggregation. |
| backend/app/gateway/routers/thread_runs.py | Exposes counters on RunResponse and adds include_active query param to /token-usage. |
| backend/tests/test_run_journal.py | Adds tests covering snapshot emission + throttling/trailing flush behavior. |
| backend/tests/test_run_repository.py | Adds tests for update_run_progress() + include_active aggregation. |
| backend/tests/test_thread_token_usage.py | Adds API test for include_active=true behavior. |
|
@ShenAC-SAC Thanks, these points make sense. For (1), I agree the final progress write in For (2), agreed as well. The intended semantics are active/running progress, not a general post-hoc usage update. I tightened the path so progress updates only apply to For (3), I made the trailing-flush test less timing-sensitive by waiting on an For (4), agreed as a follow-up. I kept the fields explicit in this first-layer PR to avoid expanding the scope, but a small I also followed up on the Copilot comments around |
|
@kibabsquirrel In |
|
@ShenAC-SAC Good catch, I've cleaned it up in this PR |
|
@kibabsquirrel please fix the lint error with the below command |
483b19f to
702ac1d
Compare
|
@WillemJiang solved it. I will make sure it is checked by configuring git hooks😂 |
Summary
related to #3116
Fixes the active-run visibility part of BUG-005 by exposing near-real-time progress counters for running runs.
During long runs,
RunJournalalready accumulates token usage and message counts, but those values were only written torunsat completion. This made active runs appear astotal_tokens=0,llm_call_count=0, andmessage_count=0until the worker finalized the run.This PR adds a throttled active progress snapshot path:
RunJournalreports accumulated usage snapshots after LLM/subagent usage is settled.RunManager/RunStorepersist progress without changing run status.RunResponseexposes token/message counters for run detail and list APIs./api/threads/{thread_id}/token-usage?include_active=truecan include running run snapshots while preserving the completed-only default.Progress flushing uses leading + trailing throttling: the first settled usage update is reported promptly, and updates suppressed by the throttle window are followed by a trailing flush with the latest snapshot. This avoids stale active counters during long tool/file/network phases after a burst of LLM calls.
Scope
This implements layer 1 of BUG-005: active progress counters.
It intentionally does not implement layer 2 durable structured trace or layer 3 raw stream frame persistence.
Key Decisions
/token-usagecompleted-only by default, withinclude_active=trueas an explicit opt-in for callers that want running snapshots included.0, so partial progress updates remain safe and consistent across stores.Why Not Durable Trace Here
Durable structured trace is a separate design concern. DeerFlow already has
run_eventsand external tracing integrations, but making local trace durable by default requires decisions around backend defaults, payload size limits, sensitive content, retention, event schema, and query semantics.Keeping this PR focused avoids mixing active cost visibility with trace persistence and keeps the database write path lightweight.
Semantics
Progress counters are snapshots of settled usage, not token-by-token telemetry. They update after LLM/subagent usage is available and are throttled to avoid high-frequency writes.
The final completion path writes the authoritative final totals.
Test Plan
PYTHONPATH=. uv run pytest tests/test_run_repository.py tests/test_run_journal.py tests/test_thread_token_usage.py tests/test_worker_langfuse_metadata.py tests/test_persistence_scaffold.py -quv run ruff check packages/harness/deerflow/runtime/journal.py packages/harness/deerflow/runtime/runs/manager.py packages/harness/deerflow/runtime/runs/store/memory.py packages/harness/deerflow/runtime/runs/store/base.py packages/harness/deerflow/persistence/run/sql.py tests/test_run_journal.py tests/test_run_repository.py