The first and second posts in the Engineering Neura series covered the structure of Neura's whole-turn LangGraph runtime: a declarative typed topology, phase-based tool fan-out with skip semantics, and a preflight safety gate wired as a conditional edge. Those pieces define what the system is designed to do. This third post is about proving what actually ran, and being able to reconstruct that proof days after the conversation ended.
When you are operating a regulated clinical chat product, "traces exist" is not enough. You need to open a specific user's turn from three days ago and answer: which assembled context did the model receive, did the safety classifier fire, which tools were skipped versus executed, and why did this particular response come out the way it did? That is not a monitoring problem. It is a tracing design problem. The pattern below bridges the gap between default LangSmith auto-instrumentation and that level of observability, with three layers each doing one job.
Neura's whole-turn workflow is a compiled LangGraph StateGraph: preflight, safety_intervention, assembly_gate, parallel context_assembly and empathy, context_format, navigator, finalize. Layer 1 picks up every node automatically; the LangGraph nodes show up as child runs of the graph invocation without any decorator. Layer 2 attaches correlation tags to every LLM call inside those nodes. Layer 3 is a small set of decorator spans for outcomes that have no native LLM call (the safety verdict, attribution result) and for the cross-service tool boundary.
The problem
LangSmith works out of the box. Set LANGSMITH_TRACING=true (formerly LANGCHAIN_TRACING_V2), install the SDK, and any ChatOpenAI.ainvoke() shows up as a run. That gets you "traces exist." It does not get you "I can find the trace for this specific user's specific turn three days later, follow the call tree across our four services, and confirm which version of the assembled context the model actually saw."
The pattern we landed on is three layers, each with one job. Layer 1 is the on-off switch. Layer 2 funnels every LLM through a single factory that attaches correlation metadata. Layer 3 is custom spans for the orchestration around the LLM calls.
The three layers, at a glance
| Layer | What it covers | What you get |
|---|---|---|
| 1. Auto-instrumentation | Every LangChain/LangGraph LLM, retriever, tool runnable | Runs with default names |
| 2. Correlated callbacks | Per-call run name, tags, metadata | Filterable by session_id, turn_id, phase |
| 3. Custom spans | Orchestration boundaries: assembly, safety, attribution | Call tree that mirrors the product |
Layer 1: the on-off switch
Layer 1 is one module per service. It reads the environment, exposes a tracing-enabled flag, and provides a forward-compat seam if we ever need to override the default tracer (per-service projects, alternate endpoint). Nothing else in the system touches LangSmith directly.
agent-platform/core/tracing.py"""LangSmith tracing gate. The actual span recording is handled by
LangChain's built-in tracer when LANGSMITH_TRACING=true; this module
exists so other code never reads env vars directly."""
import os
def is_tracing_enabled() -> bool:
return os.getenv("LANGSMITH_TRACING", "false").lower() == "true"
def tracing_project() -> str:
return os.getenv("LANGSMITH_PROJECT", "default")
The same module gets duplicated in tool-server/core/tracing.py. A 10-line module that almost never changes does not justify a shared package across services.
Layer 1 evidence: the health endpoint
Tracing has the unfortunate property that "I see no spans" looks identical to "tracing is off" and "the LangSmith API is down." Surfacing the gate state on each service's health endpoint makes the failure mode obvious before anyone opens the dashboard.
@app.get("/health")
async def health():
return {
"status": "ok",
"tracing": "enabled" if is_tracing_enabled() else "disabled",
"tracing_project": tracing_project(),
}
The CI suite asserts both services report tracing enabled when the test env turns it on. A misconfigured docker-compose env passthrough is the most common reason new contributors see no spans, and this catches it during boot rather than after the first failed debug session.
Layer 2: correlated callbacks via an LLM factory
Auto-instrumentation gives every LLM call a generic name and no business identifiers. To find the run for "user X's turn 4 in session Y," you have to scroll. Layer 2 funnels every LLM construction through a single factory and attaches correlation metadata to the RunnableConfig passed to ainvoke().
def create_chat_llm(*, model, temperature=0.2, streaming=False) -> ChatOpenAI:
return ChatOpenAI(model=model, temperature=temperature, streaming=streaming)
def with_tracing_metadata(*, session_id, turn_id, agent_id=None,
phase=None, tool_name=None) -> RunnableConfig:
tags = [f"session:{session_id}", f"turn:{turn_id}"]
if agent_id: tags.append(f"agent:{agent_id}")
if phase: tags.append(f"phase:{phase}")
if tool_name: tags.append(f"tool:{tool_name}")
return {
"tags": tags,
"metadata": {"session_id": session_id, "turn_id": turn_id,
"agent_id": agent_id, "phase": phase},
"run_name": tool_name or phase or "llm_call",
}
Used at a callsite, the result is unobtrusive:
llm = create_chat_llm(model=model_name, streaming=True)
config = with_tracing_metadata(session_id=session_id, turn_id=run_id,
agent_id=agent_id, phase="agent")
async for chunk in llm.astream(messages, config=config):
...
The tool server exposes its own copy with the same surface, so a context-extraction call inside the tool server produces a span carrying the same correlation tags as a navigator call inside the agent platform. Cross-service spans are siblings (each service produces its own root run), correlated by shared session:<uuid> and turn:<uuid> tags rather than by a single parent run tree. Tag-based correlation gives a unified dashboard query, not a stitched parent/child tree.
The tag taxonomy
| Tag prefix | Cardinality | Use |
|---|---|---|
session: | per session | Filter all spans for a conversation |
turn: | per turn | Filter spans for one user message |
phase: | small | Filter by orchestration stage |
risk: | 4 values | Filter safety spans by outcome |
Cardinality matters because LangSmith filtering is exact-match per tag. Anything unbounded (free-text user content, exception messages, response strings) goes in metadata, where it remains searchable without bloating the tag index.
Layer 3: custom spans for outcomes the graph does not capture
Layer 1 plus Layer 2 already gives you the LangGraph node tree with tagged LLM runs underneath. The remaining holes are outcomes the graph does not directly express: a safety verdict (a state-flag computation, not a model call) and product-level results that span more than one node. Layer 3 fills those with a small set of @traceable decorators.
from langsmith import traceable
@traceable(name="safety_decision", run_type="chain")
async def _record_safety_decision(verdict): ...
@traceable(name="context_attribution", run_type="chain")
async def compute_attribution(context, response): ...
@traceable(name="tool_server.call", run_type="tool")
async def call_tool_server(tool, payload, headers): ...
Three is a useful bar. The graph exposes node lifecycle automatically; Layer 3 names things that are product facts, not framework lifecycle. Inner helpers (formatters, parsers, override builders) stay untraced; those would bloat the tree without telling us anything that the parent node span does not already cover.
Propagating tags through Layer 3 spans
The decorator alone does not know about session and turn IDs. To make the call tree filterable by the same correlation tags as the leaf LLM calls, the route handler that invokes the graph attaches them on the root run, and they inherit downward through LangChain's run tree to every node and LLM call.
from langsmith import get_current_run_tree
@traceable(name="chat_turn", run_type="chain")
async def stream_chat_turn(*, session, message, agent_id):
rt = get_current_run_tree()
if rt is not None:
rt.add_tags([f"session:{session.id}", f"turn:{turn_id}",
f"agent:{agent_id}"])
rt.metadata.update({"session_id": str(session.id),
"turn_id": turn_id, "agent_id": agent_id})
async for event in run_turn_graph(runtime):
yield event
Shared tags give you query correlation: a single dashboard filter returns all spans for a session across services, in order of timestamp. They do not give you a single distributed trace tree where the tool-server runs are children of the agent-platform run. For a true parent/child distributed tree, propagate the LangSmith run-tree ID in HTTP headers between services and parent the downstream run to it. The factory in this layer is the place to do that. We use tag correlation here because every dashboard query we run is a session/turn filter, but the patterns are not interchangeable.
One non-obvious Layer 3 span: the safety hijack outcome
The preflight node classifies safety alongside intent and writes safety_hijacked into TurnGraphState. The conditional edge after preflight then routes to safety_intervention if the flag is set. From a tracing standpoint, "this turn was hijacked" is the most product-relevant fact about the run, but auto-instrumentation will not surface it because the routing decision is a state-flag computation, not an LLM call.
@traceable(name="safety_decision", run_type="chain")
async def _record_safety_decision(verdict):
rt = get_current_run_tree()
if rt is not None:
rt.add_tags([f"risk:{verdict.get('risk_level', 'none')}"])
rt.metadata.update({
"hijacked": bool(verdict.get("intervention_payload", {}).get("hijacked")),
"intervention_mode": verdict.get("intervention_mode"),
"flags": verdict.get("flags", []),
})
return verdict
This is one of the smallest custom spans we ship and one of the most useful. Filtering by risk:crisis OR risk:high surfaces exactly the conversations where the conditional edge routed to safety_intervention, sorted by time, with the full chat-turn span tree underneath. Auditing that population becomes a query.
Streaming modes and checkpoint metadata
The whole-turn graph runs under graph.astream(state, config, stream_mode=["custom", "updates", "tasks", "checkpoints"]). That single call drives four observable surfaces at once. custom carries the product event stream the frontend consumes. updates emits per-node state patches; pairing those with LangSmith node spans makes "which nodes ran for this turn" queryable from either side. tasks emits per-task lifecycle events, which catch parallel-branch failures (context_assembly raising while empathy is mid-stream) without waiting for the join. checkpoints emits one event per checkpointed step, including the checkpoint_id, step, and next nodes. The runtime stores the latest checkpoint id on its handle and writes it as metadata on the chat-turn span:
async for item in graph.astream(initial_state, config,
stream_mode=["custom", "updates", "tasks", "checkpoints"]):
mode, payload = _stream_mode_payload(item)
if mode == "custom" and isinstance(payload, TurnEvent):
yield payload
elif mode == "checkpoints" and isinstance(payload, dict):
checkpoint_id = ((payload.get("config") or {}).get("configurable") or {}).get("checkpoint_id")
runtime.langgraph_latest_checkpoint = {
"checkpoint_id": checkpoint_id,
"step": (payload.get("metadata") or {}).get("step"),
"next": payload.get("next") or [],
}
With a checkpointer (MemorySaver in dev, AsyncPostgresSaver in production) keyed by thread_id = f"turn:{turn_id}", the same turn: tag that filters LangSmith spans also indexes the LangGraph checkpoints. "Resume this turn from its last checkpoint" and "show me every span for this turn" share an identifier.
PII handling in tracing
For a clinical product, every span input and output passing through LangSmith is a compliance surface. Treat the layers below as a baseline pattern to combine with payload minimization, an explicit retention policy, and validation that redaction is doing what you think it is. They are not a complete PHI/PII compliance solution, and regex-based redaction in particular is best-effort: it catches structured patterns (SSN, email, phone) and misses free-text identifiers (names, addresses, clinical narratives), which the structured-output and allowlist patterns below address.
Allowlist what you send, do not blocklist what you redact. The strongest pattern is to upload only fields you know are safe: structured verdicts, IDs, scores, latencies. The @traceable decorator accepts process_inputs and process_outputs callbacks; use them to project the payload down to an allowlisted shape, then redact what remains.
_PII_PATTERNS = [
(re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
(re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b"), "[EMAIL]"),
(re.compile(r"\b\+?1?\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b"), "[PHONE]"),
]
def redact_pii(payload: dict) -> dict:
def scrub(v):
if isinstance(v, str):
for pat, repl in _PII_PATTERNS:
v = pat.sub(repl, v)
return v
if isinstance(v, dict): return {k: scrub(x) for k, x in v.items()}
if isinstance(v, list): return [scrub(x) for x in v]
return v
return scrub(payload)
def project_chat_turn(payload: dict) -> dict:
# Allowlist: only IDs and structured signals reach the trace payload.
# Free-text user content is suppressed at the source.
return redact_pii({
"session_id": payload.get("session_id"),
"turn_id": payload.get("turn_id"),
"intent": payload.get("intent"),
"risk_level": payload.get("risk_level"),
})
@traceable(name="chat_turn", run_type="chain",
process_inputs=project_chat_turn,
process_outputs=project_chat_turn)
async def stream_chat_turn(...): ...
For spans where allowlisting is too restrictive, LANGSMITH_HIDE_INPUTS=true and LANGSMITH_HIDE_OUTPUTS=true drop payloads entirely while preserving span shape, latency, and tags. We use this on the safety classifier span: the verdict structure goes in metadata, the message text is suppressed at the SDK level. Validation matters: a unit test feeds a fixture containing each PII pattern through the actual decorator and asserts the resulting payload upload contains the redacted form. Without that test, a regex regression is silent.
Pair this with a retention policy on the LangSmith project and an allowlist of who can read it. Trace data is a copy of production payloads; it inherits the same compliance posture as the primary store, not a relaxed one.
Sampling, not on/off
For high-throughput services, the temptation is to flip LANGSMITH_TRACING=false in production to control LangSmith volume. The cost saving is real, the visibility loss is total. The right primitive is sampling: LANGSMITH_TRACING_SAMPLING_RATE=0.1 records 10% of runs, picked deterministically by trace ID so a sampled trace includes every span beneath it.
Production canary instances run at 1.0 (every trace), the rest at 0.05. Anomaly investigation flips a single instance back to 1.0 via env var. The factory pattern ensures there is one switch in one place.
The end-to-end verification test
Setting up tracing without a verification test almost guarantees a quiet regression six weeks later. The check is short: send a message, wait for the LangSmith batch flush, then assert at least one run exists for the test session.
@pytest.mark.asyncio
async def test_chat_produces_langsmith_traces(authed_client, ls_client):
session_id = await _create_session(authed_client)
await _send_chat(authed_client, session_id, "Hi, I have been anxious")
await asyncio.sleep(5.0) # batch flush
runs = list(ls_client.list_runs(
project_name=os.environ["LANGSMITH_PROJECT"],
filter=f'eq(metadata.session_id, "{session_id}")',
limit=20,
))
assert runs and {"chat_turn", "safety_gate_check"} & {r.name for r in runs}
The list_runs filter syntax has shifted across LangSmith SDK versions. The tag-based filter (has(tags, "session:<uuid>")) is the alternative when metadata filtering is unavailable.
Edge cases worth knowing about
- Streaming and span lifetime. A graph
astreamcall's span closes when the iterator is fully consumed. If a FastAPI endpoint never iterates because the client disconnected, wrap the iteration intry/finallyto ensure the span closes. The stream-mode multiplex closes node spans incrementally rather than only at the end. - Sensitive content in tags. Never put user message content or model output on a tag; they land in the global tag index. Metadata fields are searchable but not indexed for exact-match filtering.
- Project isolation. CI, staging, and production each get a separate
LANGSMITH_PROJECT. The health endpoint reports the project name so a mistargeted env var is visible in the boot sanity check. - Checkpoint id as a debugging primitive. Logging the latest
checkpoint_idon the chat-turn span (and on application logs) lets a support engineer answer "replay this turn from where it failed" without reconstructing state by hand.
If you only add one custom tag, make it turn:<uuid>. It scopes a noisy production dashboard to exactly the spans for the user complaint you are debugging, regardless of which service produced the run.
What comes after observability
Two LangSmith primitives extend this layout naturally. Client.create_feedback(run_id, key="clinician_approved", score=1) closes the loop on user reactions; for a clinical product, "did the reviewer approve this turn?" is a metric you want bound to the trace. And LangSmith datasets plus evaluators turn the same correlation tags into regression suites: a curated set of (input, expected) pairs, evaluated against a target prompt or model, with the verdict shape coming from the same Pydantic schemas the production code uses. The tracing layout is the foundation; eval is the next floor.
Takeaway checklist
- Three layers, each with one job. Layer 1 is the switch, Layer 2 is correlation, Layer 3 is span shape for outcomes the graph does not capture.
- Funnel every LLM construction through a single factory module per service. The only durable place to attach correlation tags.
- Pick a tag taxonomy with bounded cardinality.
session:,turn:,phase:,risk:covers most product needs. Reuse the sameturn:<uuid>as the LangGraphthread_idso spans and checkpoints share an identifier. - Distinguish tag correlation from a true distributed parent/child trace. Both are useful, only the latter gives you a single nested tree across services.
- Add Layer 3 spans for outcomes the graph cannot express on its own (safety decision, attribution, cross-service tool calls). Three or four is plenty.
- Use
stream_mode=["custom", "updates", "tasks", "checkpoints"]in one place. Persist the latestcheckpoint_idon span metadata for resume and audit. - Allowlist trace payloads with
process_inputs/process_outputs; treat regex redaction as a baseline. Pair with a retention policy and a redaction-validation test. - Use
LANGSMITH_TRACING_SAMPLING_RATEfor cost control, not the master switch. - Surface tracing state on a health endpoint and assert on it in CI.