The Engineering Neura series documents the engineering decisions behind Neura, a regulated clinical AI chat product built on LangGraph. Neura connects patients with matched therapists and care coordinators through a conversational interface. Behind every turn, a multi-stage LangGraph workflow assembles structured context, runs a safety check, and routes the response through a navigator. Each post in this series is a working pattern pulled from that codebase, built, iterated on, and deployed, and written with enough specificity to be useful if you are solving similar problems.
This first post is about the context assembly pipeline. Before the AI navigator can respond to a patient's message, five structured context bundles need to be assembled: client signal extracted from conversation history, provider attributes, patient profile data, retrieval results over therapeutic-fit candidates, and a session narrative. Some depend on each other. Some can run concurrently. All can fail or time out, and the turn still has to produce something coherent. The obvious starting point is five sequential await calls in a function. That works until you need a sixth tool, or until you want to skip re-fetching data that has not changed since the last turn, or until a mid-sequence failure leaves you with no clear picture of what ran and what did not.
The pattern we landed on: declare the entire LangGraph topology in one typed spec, fan tool calls out inside the assembly node with a config-driven dispatcher, and use predicate-based skip semantics to avoid redundant work on repeat turns. The most useful feature turned out to be the one that decides not to run.
Neura's whole-turn workflow is a compiled StateGraph: preflight → assembly_gate → (context_assembly ❘❘ empathy) → context_format → navigator → finalize, with a conditional edge from preflight that routes hijacked turns to safety_intervention → finalize. The topology lives in turn_graph_spec.py as typed StageId/GraphEdge values; the executor (turn_graph.py) reads that spec and calls graph.add_node, graph.add_conditional_edges, and graph.add_edge from it. There is no hand-coded sequencing alongside the spec, so the manifest the frontend renders and the graph the runtime executes can never drift.
The problem
Every chat turn needs five pieces of context assembled before the navigator runs: client signal extracted from history, provider attributes, patient profile, retrieval over therapeutic-fit candidates, and a session narrative. Some depend on others. Some can run in parallel. All can fail or time out, and the chat turn still has to produce a useful response. The whole-turn graph hands this off to one node (context_assembly); inside that node we still need a small workflow to run the tools.
The first version was the obvious one: a function with five sequential awaits. It worked until it didn't. Adding a sixth tool meant editing the function body, the order of failures was implicit, parallelism was an afterthought, and skipping work that had already run for an unchanged session was awkward to bolt on. Promoting tool-level concurrency to the outer graph (one node per tool) was tempting but mismatched: the tool set is parameterized per agent at runtime and can change between deploys, while the outer graph is a stable workflow shape (preflight, assembly, empathy, format, navigator, finalize). Tool-level fan-out belongs inside the assembly node.
Topology as a typed spec, executor as a thin reader
The whole-turn graph topology is declared as typed values in one module. Stages are an enum, edges are frozen dataclasses with an explicit EdgeType, conditional edges carry an EdgeCondition, and a manifest function emits a JSON-serializable shape for the frontend. The executor module imports the spec and wires the StateGraph from it; nothing else codifies the order.
class StageId(StrValueEnum):
PREFLIGHT = "preflight"
SAFETY_INTERVENTION = "safety_intervention"
ASSEMBLY_GATE = "assembly_gate"
CONTEXT_ASSEMBLY = "context_assembly"
EMPATHY = "empathy"
CONTEXT_FORMAT = "context_format"
NAVIGATOR = "navigator"
FINALIZE = "finalize"
GRAPH_EDGES = (
GraphEdge(StageId.START, StageId.PREFLIGHT, EdgeType.ENTRY),
GraphEdge(StageId.PREFLIGHT, StageId.SAFETY_INTERVENTION,
EdgeType.CONDITIONAL, condition=EdgeCondition.SAFETY_HIJACKED),
GraphEdge(StageId.PREFLIGHT, StageId.ASSEMBLY_GATE,
EdgeType.CONDITIONAL, condition=EdgeCondition.NOT_SAFETY_HIJACKED),
GraphEdge(StageId.SAFETY_INTERVENTION, StageId.FINALIZE, EdgeType.TERMINAL_PATH),
GraphEdge(StageId.ASSEMBLY_GATE, StageId.CONTEXT_ASSEMBLY, EdgeType.PARALLEL_BRANCH),
GraphEdge(StageId.ASSEMBLY_GATE, StageId.EMPATHY, EdgeType.PARALLEL_BRANCH),
GraphEdge(StageId.CONTEXT_ASSEMBLY, StageId.CONTEXT_FORMAT, EdgeType.JOIN_INPUT),
GraphEdge(StageId.EMPATHY, StageId.CONTEXT_FORMAT, EdgeType.JOIN_INPUT),
GraphEdge(StageId.CONTEXT_FORMAT, StageId.NAVIGATOR, EdgeType.SEQUENCE),
GraphEdge(StageId.NAVIGATOR, StageId.FINALIZE, EdgeType.SEQUENCE),
GraphEdge(StageId.FINALIZE, StageId.END, EdgeType.EXIT),
)
The executor is a thin reader. JOIN_INPUT edges collapse into the LangGraph multi-source add_edge([s1, s2], target) call, so context formatting waits on both context assembly and empathy without a custom join node. CONDITIONAL edges feed add_conditional_edges with a small router that maps state["safety_hijacked"] to a target. The same module exposes runtime_graph_manifest() for the frontend, which means the manifest the UI renders and the graph the runtime executes are derived from one source.
# turn_graph.py
def build_whole_turn_graph(checkpointer=None):
graph = StateGraph(TurnGraphState)
for stage, node in _NODE_FUNCTIONS.items():
graph.add_node(stage_value(stage), node)
_wire_graph_topology(graph) # reads GRAPH_EDGES, calls add_edge / add_conditional_edges
return graph.compile(checkpointer=checkpointer or get_langgraph_checkpointer())
The TurnGraphState type is a TypedDict with one reducer of note: completed_stages: Annotated[list[str], operator.add]. Each node returns a small state patch, and the reducer concatenates the per-stage entries across parallel branches without a custom merge. Mutable per-turn context (DB session, the runtime object, deps) flows through config["configurable"] instead of through state, so the checkpointed snapshot stays small and JSON-friendly.
One config object per tool
Every tool registered in the pipeline declares its dependencies, defaults, and retry shape in a single dataclass. The dispatcher reads this; nothing else does.
tool-server/routes/context.py@dataclass(frozen=True)
class PipelineToolConfig:
name: str
dependencies: tuple[str, ...] = ()
inject_inputs: tuple[str, ...] = ()
defaults: dict[str, Any] = field(default_factory=dict)
max_retries: int = 1
retry_backoff: float = 0.5
timeout: float = 30.0
optional: bool = False
PIPELINE_CONFIG = {
"client_signal": PipelineToolConfig("client_signal"),
"provider_genome": PipelineToolConfig("provider_genome"),
"patient_context": PipelineToolConfig("patient_context"),
"therapeutic_fit": PipelineToolConfig(
"therapeutic_fit",
dependencies=("client_signal", "provider_genome"),
inject_inputs=("signal_summary", "genome_summary"),
max_retries=2,
),
}
The dataclass is frozen so the registry binding cannot be reassigned at runtime; mutable defaults like defaults: dict are still mutable in place, so the dispatcher copies them per call rather than sharing references. Dependency declaration uses tool names rather than Python references, so the registry can be loaded from YAML without imports.
Topological sort into phases
The dispatcher groups tools into phases. Tools within a phase run in parallel via asyncio.gather. Each subsequent phase waits for the previous one. The grouping is Kahn's algorithm; the only output that matters is the phase list.
def _build_execution_phases(specs):
by_name = {s.name: s for s in specs}
# Dependencies on tools not present in this run are filtered out by
# intersecting with by_name. The registry validator (run at startup)
# rejects unknown dependency names; this filter only handles the
# case where a tool is intentionally excluded from a given agent's
# spec list.
remaining = {s.name: set(PIPELINE_CONFIG[s.name].dependencies) & set(by_name)
for s in specs}
phases = []
while remaining:
ready = [n for n, deps in remaining.items() if not deps]
if not ready:
raise ValueError(f"cycle in pipeline: {remaining}")
phases.append([by_name[n] for n in ready])
for n in ready:
del remaining[n]
for deps in remaining.values():
deps.difference_update(ready)
return phases
A boot-time validator complements the per-request build: it walks each agent's spec list and warns when a tool depends on a tool the same list excludes. That is usually a config mistake rather than an intentional partial run, and surfacing it at boot keeps the dispatcher's runtime behavior unambiguous.
Argument injection from prior phases
Tools in phase N can read outputs of phases 1..N-1 by listing the keys they want in inject_inputs. The dispatcher snaps those values out of the aggregated state and merges them with per-call arguments and registry defaults.
def _prepare_arguments(spec, aggregated, defaults):
args = dict(defaults)
for key in PIPELINE_CONFIG[spec.name].inject_inputs:
if key in aggregated:
args[key] = aggregated[key]
args.update(spec.arguments or {}) # caller overrides win
return args
The merge order is intentional: defaults at the bottom, injected values in the middle, caller overrides on top. A test that mocks an injected field can shadow it without touching the dispatcher.
Retries with classified errors
Every tool runs through the same retry wrapper. The wrapper distinguishes "definitely retryable" (network blip, 5xx, timeout) from "do not retry" (validation, 4xx, business rejection). Anything else propagates: an unclassified exception is treated as a real failure and surfaces to the dispatcher rather than getting silently retried.
RETRYABLE = (asyncio.TimeoutError, httpx.ConnectError, httpx.ReadTimeout)
async def _execute_with_retry(spec, fn, cfg):
for attempt in range(cfg.max_retries + 1):
try:
return await asyncio.wait_for(fn(), timeout=cfg.timeout)
except RETRYABLE as exc:
last = exc
except httpx.HTTPStatusError as exc:
if exc.response.status_code < 500 or attempt == cfg.max_retries:
raise
last = exc
await asyncio.sleep(cfg.retry_backoff * (2 ** attempt))
raise last
Skip semantics: the load-bearing piece
The cheapest tool execution is the one we don't run. Two skip gates protect every assembly turn.
Completeness threshold. Each tool reports a 0..1 completeness score on its output. The session-level assembler tracks the running per-tool completeness. If completeness for a tool is already above 0.4 and the upstream signals haven't changed, the next turn skips that tool and reuses the prior output. The threshold defaults to 0.4 because that is roughly the point at which incremental work starts producing diminishing returns in our calibration set.
Content hash. The session assembler keeps a hash of the conversation slice that fed each tool. Same hash, no re-run. This catches the common case of a clarifying acknowledgement that does not change the upstream signal.
async def enqueue(self, session_id, slice_hash):
state = self._state.get(session_id, SessionAssemblerState())
if state.last_slice_hash == slice_hash:
return # no-op, identical input
state.last_slice_hash = slice_hash
self._state[session_id] = state
await self._queue.put(session_id)
# inside the worker loop:
for tool_name, comp in record.completeness.items():
if comp >= self.threshold and tool_name in last_outputs:
skipped.append(tool_name)
continue
to_run.append(tool_name)
In our calibration on a few hundred sessions, the completeness gate skipped a tool on roughly 60% of turns after the first three turns of a session. The numbers will vary by domain and by how aggressively upstream signals change; the win comes from skipping at all, not from the specific threshold.
The skip cache shown here lives in process memory for clarity. Production stacks with multiple workers need shared, durable state (Redis, the session store, or a checkpointer) so a turn served by worker B sees the completeness scores written by worker A on the previous turn. The skip logic is identical; the storage is not.
asyncio.gather still does the work within a phase: a single phase is a set of independent tool calls with no inter-tool dependencies, and gathering coroutines is the right primitive at that scope. The whole-turn workflow shape (preflight, parallel assembly and empathy branches, format, navigator, finalize) is expressed as graph edges, not as ad-hoc create_task sequencing. The frontend consumes one stream channel for the whole turn: assembly stage events from custom, graph node lifecycle from updates and tasks, and checkpoint identifiers from checkpoints for replay.
Edge cases worth knowing about
- Required vs optional tools. A required tool that exhausts its retries makes the assembly node return an
errorpatch, and the conditional edge after preflight (or a finalize-side error path) ends the turn with a clear error event; the navigator never runs. Anoptional=Truetool that fails logs a warning, is excluded from the bundle, and the agent receives a flag indicating which sections were degraded so the system prompt can ask follow-up questions on missing sections. - Cycle detection. The Kahn loop raises
ValueError("cycle in pipeline")at startup, not at request time. A misconfigured registry never reaches production because the boot sanity test exercises the topological sort against the real config. - Idempotency for side-effectful tools. Every tool in the assembly node is read-only (extraction or retrieval), so retries are safe by construction. Booking and other side-effectful tools live downstream inside the navigator subgraph and accept an idempotency key on the request; the policy is no automatic retries on side effects.
- Resume across worker restarts. The whole-turn graph is compiled with a checkpointer (
MemorySaverin dev,AsyncPostgresSaverin production), keyed bythread_id = f"turn:{turn_id}". A worker restart betweenpreflightandnavigatorresumes from the last checkpointed step rather than re-running preflight; the in-node skip cache is the right complement, not a substitute.
Two things actually. First, declare your topology in one module (typed nodes and edges) and have the executor wire the StateGraph from it; the frontend manifest and the runtime read the same source. Second, inside any node that fans out to a parameterized tool set, keep the per-tool config object with explicit dependencies and inject_inputs. The two patterns compose: the outer graph is stable, the inner fan-out is configurable.
Takeaway checklist
- One topology spec for the whole-turn graph. The executor reads it; nothing else hand-codes the order. Manifest and execution share a source.
- Outer workflow shape (preflight, parallel branches, joins, conditional safety route) is expressed as graph edges. Parallel branches and the join run on LangGraph, not
asyncio.create_task. - One declarative config object per tool. Dependencies declared by name, not by import. Topological sort into phases inside the assembly node, with
asyncio.gatherat the within-phase scope only. - Retry classification: timeouts and 5xx are retryable, validation and 4xx are not. Unclassified exceptions propagate.
- Skip gates earn the most. Completeness threshold plus content hash makes the second-turn-onward assembly disappear from latency dashboards. The graph checkpointer handles resume across restarts.
- Stream stage events through
get_stream_writer()onstream_mode="custom"alongsideupdates,tasks, andcheckpoints. One channel for the whole turn.