Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backend/packages/harness/deerflow/runtime/runs/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ async def run_agent(
if journal is not None:
config.setdefault("callbacks", []).append(journal)

# Inject Langfuse per-run session/user metadata so traces are grouped by
# thread and user in the Langfuse dashboard. The LangfuseCallbackHandler
# reads langfuse_session_id and langfuse_user_id from the LangChain run
# metadata dict; no handler-level changes are needed.
config.setdefault("metadata", {}).update(
{
"langfuse_session_id": thread_id,
"langfuse_user_id": runtime_ctx.get("user_id", ""),
}
)

runnable_config = RunnableConfig(**config)
if ctx.app_config is not None and _agent_factory_supports_app_config(agent_factory):
agent = agent_factory(config=runnable_config, app_config=ctx.app_config)
Expand Down
36 changes: 35 additions & 1 deletion backend/tests/test_run_worker_rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,5 +382,39 @@ def __call__(self, **kwargs):
return kwargs

monkeypatch.setattr("deerflow.runtime.runs.worker.inspect.signature", lambda _obj: (_ for _ in ()).throw(ValueError("boom")))

assert _agent_factory_supports_app_config(BrokenCallable()) is False


@pytest.mark.anyio
async def test_run_agent_injects_langfuse_session_and_user_into_config_metadata():
"""run_agent must set langfuse_session_id=thread_id and langfuse_user_id in metadata."""
run_manager = RunManager()
record = await run_manager.create("thread-langfuse")
bridge = SimpleNamespace(
publish=AsyncMock(),
publish_end=AsyncMock(),
cleanup=AsyncMock(),
)
captured: dict = {}

class DummyAgent:
async def astream(self, graph_input, config=None, stream_mode=None, subgraphs=False):
captured["metadata"] = dict(config.get("metadata") or {})
yield {"messages": []}

def factory(config):
return DummyAgent()

await run_agent(
bridge,
run_manager,
record,
ctx=RunContext(checkpointer=None, app_config=None),
agent_factory=factory,
graph_input={},
config={"context": {"user_id": "user-99"}},
)
await asyncio.sleep(0)

assert captured["metadata"]["langfuse_session_id"] == "thread-langfuse"
assert captured["metadata"]["langfuse_user_id"] == "user-99"