Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion backend/app/gateway/routers/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from pydantic import BaseModel, Field, field_validator

from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer
from app.gateway.deps import get_checkpointer, get_store
from app.gateway.transcripts import delete_thread_transcript, get_thread_transcript
from app.gateway.utils import sanitize_log_param
from deerflow.config.paths import Paths, get_paths
from deerflow.runtime import serialize_channel_values
Expand Down Expand Up @@ -161,6 +162,12 @@ class ThreadHistoryRequest(BaseModel):
before: str | None = Field(default=None, description="Cursor for pagination")


class ThreadMessagesResponse(BaseModel):
"""Canonical UI transcript for a thread."""

messages: list[dict[str, Any]] = Field(default_factory=list)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -223,6 +230,14 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
# Clean local filesystem
response = _delete_thread_data(thread_id, user_id=get_effective_user_id())

# Remove canonical UI transcript (best-effort)
store = get_store(request)
if store is not None:
try:
await delete_thread_transcript(store, thread_id)
except Exception:
logger.debug("Could not delete transcript for thread %s (not critical)", sanitize_log_param(thread_id))

# Remove checkpoints (best-effort)
checkpointer = getattr(request.app.state, "checkpointer", None)
if checkpointer is not None:
Expand All @@ -243,6 +258,25 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
return response


@router.get("/{thread_id}/messages", response_model=ThreadMessagesResponse)
@require_permission("threads", "read", owner_check=True)
async def get_thread_messages(thread_id: str, request: Request) -> ThreadMessagesResponse:
"""Return the canonical UI transcript for a thread.

Unlike checkpoint state, this transcript is not rewritten by model-context
summarization and is therefore the preferred source for chat rendering.
"""
store = get_store(request)
if store is None:
return ThreadMessagesResponse(messages=[])

try:
return ThreadMessagesResponse(messages=await get_thread_transcript(store, thread_id))
except Exception:
logger.exception("Failed to get transcript for thread %s", thread_id)
raise HTTPException(status_code=500, detail="Failed to get thread messages")


@router.post("", response_model=ThreadResponse)
async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadResponse:
"""Create a new thread.
Expand Down
35 changes: 35 additions & 0 deletions backend/app/gateway/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from langchain_core.messages import HumanMessage

from app.gateway.deps import get_run_context, get_run_manager, get_stream_bridge
from app.gateway.transcripts import append_thread_transcript_messages
from app.gateway.utils import sanitize_log_param
from deerflow.config.app_config import get_app_config
from deerflow.runtime import (
Expand Down Expand Up @@ -245,6 +246,29 @@ def build_run_config(
# ---------------------------------------------------------------------------


async def _sync_thread_transcript_after_run(
run_task: asyncio.Task,
thread_id: str,
checkpointer: Any,
store: Any,
) -> None:
"""Wait for *run_task* to finish, then append final visible messages."""
await asyncio.wait({run_task})

try:
ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
ckpt_tuple = await checkpointer.aget_tuple(ckpt_config)
if ckpt_tuple is None:
return

channel_values = ckpt_tuple.checkpoint.get("channel_values", {})
messages = channel_values.get("messages")
if isinstance(messages, list):
await append_thread_transcript_messages(store, thread_id, messages)
except Exception:
logger.debug("Failed to sync transcript for thread %s (non-fatal)", sanitize_log_param(thread_id), exc_info=True)


async def start_run(
body: Any,
thread_id: str,
Expand Down Expand Up @@ -320,6 +344,14 @@ async def start_run(
graph_input = normalize_input(body.input)
config = build_run_config(thread_id, body.config, body.metadata, assistant_id=body.assistant_id)

if run_ctx.store is not None:
try:
input_messages = graph_input.get("messages")
if isinstance(input_messages, list):
await append_thread_transcript_messages(run_ctx.store, thread_id, input_messages)
except Exception:
logger.debug("Failed to append submitted messages for thread %s (non-fatal)", sanitize_log_param(thread_id), exc_info=True)

# Merge DeerFlow-specific context overrides into both ``configurable`` and ``context``.
# The ``context`` field is a custom extension for the langgraph-compat layer
# that carries agent configuration (model_name, thinking_enabled, etc.).
Expand All @@ -346,6 +378,9 @@ async def start_run(
)
record.task = task

if run_ctx.store is not None and run_ctx.checkpointer is not None:
asyncio.create_task(_sync_thread_transcript_after_run(task, thread_id, run_ctx.checkpointer, run_ctx.store))

# Title sync is handled by worker.py's finally block which reads the
# title from the checkpoint and calls thread_store.update_display_name
# after the run completes.
Expand Down
142 changes: 142 additions & 0 deletions backend/app/gateway/transcripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Canonical chat transcript storage.

The LangGraph checkpoint state is model context. It can be summarized,
trimmed, or otherwise rewritten by middlewares. The UI transcript needs a
separate durable record so conversation history survives context compression.
"""

from __future__ import annotations

import json
import time
from typing import Any

from deerflow.runtime import serialize_lc_object

TRANSCRIPTS_NS: tuple[str, ...] = ("thread_transcripts",)

_SUMMARY_MARKER_KEY = "deerflow_conversation_summary"
_LEGACY_SUMMARY_PREFIX = "Here is a summary of the conversation to date:"


def _message_fingerprint(message: dict[str, Any]) -> str:
"""Return a stable identity for messages that do not have ids yet."""

identity_payload = {
"type": message.get("type"),
"name": message.get("name"),
"tool_call_id": message.get("tool_call_id"),
"content": message.get("content"),
}
return json.dumps(identity_payload, sort_keys=True, default=str, ensure_ascii=False)


def _message_text(message: dict[str, Any]) -> str:
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text = item.get("text")
if isinstance(text, str):
parts.append(text)
return "\n".join(parts).strip()
return ""


def _is_visible_transcript_message(message: dict[str, Any]) -> bool:
additional_kwargs = message.get("additional_kwargs")
if not isinstance(additional_kwargs, dict):
additional_kwargs = {}

if additional_kwargs.get("hide_from_ui") is True:
return False
if additional_kwargs.get(_SUMMARY_MARKER_KEY) is True:
return False

# Backward compatibility for summary messages created before they were
# explicitly tagged by DeerFlowSummarizationMiddleware.
if message.get("type") == "human" and _message_text(message).startswith(_LEGACY_SUMMARY_PREFIX):
return False

return message.get("type") in {"human", "ai", "tool"}


def normalize_transcript_messages(messages: list[Any] | tuple[Any, ...] | None) -> list[dict[str, Any]]:
"""Serialize and filter messages before writing them to the transcript."""
normalized: list[dict[str, Any]] = []
for raw_message in messages or []:
message = serialize_lc_object(raw_message)
if isinstance(message, dict) and _is_visible_transcript_message(message):
normalized.append(message)
return normalized


async def get_thread_transcript(store: Any, thread_id: str) -> list[dict[str, Any]]:
"""Read the canonical transcript for *thread_id* from the Store."""
item = await store.aget(TRANSCRIPTS_NS, thread_id)
if item is None:
return []
value = item.value if isinstance(item.value, dict) else {}
messages = value.get("messages", [])
return messages if isinstance(messages, list) else []


async def append_thread_transcript_messages(
store: Any,
thread_id: str,
messages: list[Any] | tuple[Any, ...] | None,
) -> list[dict[str, Any]]:
"""Append visible messages to the canonical transcript, deduplicating by identity."""
incoming = normalize_transcript_messages(messages)
if not incoming:
return await get_thread_transcript(store, thread_id)

existing = await get_thread_transcript(store, thread_id)
seen_ids = {str(message["id"]) for message in existing if isinstance(message, dict) and message.get("id")}
unidentified_by_fingerprint = {_message_fingerprint(message): index for index, message in enumerate(existing) if isinstance(message, dict) and not message.get("id")}
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated
changed = False

for message in incoming:
message_id = message.get("id")
fingerprint = _message_fingerprint(message)
if message_id and str(message_id) in seen_ids:
continue

unidentified_index = unidentified_by_fingerprint.get(fingerprint)
if message_id and unidentified_index is not None:
existing[unidentified_index] = message
seen_ids.add(str(message_id))
del unidentified_by_fingerprint[fingerprint]
changed = True
continue

if message_id:
seen_ids.add(str(message_id))
else:
unidentified_by_fingerprint.setdefault(fingerprint, len(existing))
existing.append(message)
changed = True

if changed:
await store.aput(
TRANSCRIPTS_NS,
thread_id,
{
"thread_id": thread_id,
"messages": existing,
"updated_at": time.time(),
},
)

return existing


async def delete_thread_transcript(store: Any, thread_id: str) -> None:
"""Delete a thread transcript if the active Store supports deletion."""
delete = getattr(store, "adelete", None)
if delete is None:
return
await delete(TRANSCRIPTS_NS, thread_id)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

logger = logging.getLogger(__name__)

SUMMARY_MESSAGE_KWARG = "deerflow_conversation_summary"


@dataclass(frozen=True)
class SummarizationEvent:
Expand Down Expand Up @@ -123,6 +125,15 @@ def before_model(self, state: AgentState, runtime: Runtime) -> dict | None:
async def abefore_model(self, state: AgentState, runtime: Runtime) -> dict | None:
return await self._amaybe_summarize(state, runtime)

def _build_new_messages(self, summary) -> list[AnyMessage]:
messages = super()._build_new_messages(summary)
for message in messages:
additional_kwargs = dict(getattr(message, "additional_kwargs", {}) or {})
additional_kwargs["hide_from_ui"] = True
additional_kwargs[SUMMARY_MESSAGE_KWARG] = True
message.additional_kwargs = additional_kwargs
return messages
Comment thread
LittleChenLiya marked this conversation as resolved.
Outdated

def _maybe_summarize(self, state: AgentState, runtime: Runtime) -> dict | None:
messages = state["messages"]
self._ensure_message_ids(messages)
Expand Down Expand Up @@ -180,7 +191,16 @@ def _build_new_messages(self, summary: str) -> list[HumanMessage]:
"""Override the base implementation to let the human message with the special name 'summary'.
And this message will be ignored to display in the frontend, but still can be used as context for the model.
"""
return [HumanMessage(content=f"Here is a summary of the conversation to date:\n\n{summary}", name="summary")]
return [
HumanMessage(
content=f"Here is a summary of the conversation to date:\n\n{summary}",
name="summary",
additional_kwargs={
"hide_from_ui": True,
SUMMARY_MESSAGE_KWARG: True,
},
)
]
Comment thread
LittleChenLiya marked this conversation as resolved.

def _preserve_dynamic_context_reminders(
self,
Expand Down
2 changes: 2 additions & 0 deletions backend/tests/test_summarization_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def test_before_summarization_hook_receives_messages_before_compression() -> Non
assert captured[0].agent_name is None
assert isinstance(result["messages"][0], RemoveMessage)
assert result["messages"][1].content.startswith("Here is a summary")
assert result["messages"][1].additional_kwargs["hide_from_ui"] is True
assert result["messages"][1].additional_kwargs["deerflow_conversation_summary"] is True


def test_dynamic_context_reminder_is_preserved_across_summarization() -> None:
Expand Down
Loading