Engineering Neura · Post 1 of 4 · Next: Preflight node and conditional safety routing →
A declarative tool pipeline rendered as a LangGraph node with two phases and an aggregated bundle output
Tool fan-out collapses into phases. Adding a tool is a registry edit, not an orchestration edit.

The Engineering Neura series documents the engineering decisions behind Neura, a regulated clinical AI chat product built on LangGraph. Neura connects patients with matched therapists and care coordinators through a conversational interface. Behind every turn, a multi-stage LangGraph workflow assembles structured context, runs a safety check, and routes the response through a navigator. Each post in this series is a working pattern pulled from that codebase, built, iterated on, and deployed, and written with enough specificity to be useful if you are solving similar problems.

This first post is about the context assembly pipeline. Before the AI navigator can respond to a patient's message, five structured context bundles need to be assembled: client signal extracted from conversation history, provider attributes, patient profile data, retrieval results over therapeutic-fit candidates, and a session narrative. Some depend on each other. Some can run concurrently. All can fail or time out, and the turn still has to produce something coherent. The obvious starting point is five sequential await calls in a function. That works until you need a sixth tool, or until you want to skip re-fetching data that has not changed since the last turn, or until a mid-sequence failure leaves you with no clear picture of what ran and what did not.

The pattern we landed on: declare the entire LangGraph topology in one typed spec, fan tool calls out inside the assembly node with a config-driven dispatcher, and use predicate-based skip semantics to avoid redundant work on repeat turns. The most useful feature turned out to be the one that decides not to run.

Where this sits in the LangGraph stack

Neura's whole-turn workflow is a compiled StateGraph: preflight → assembly_gate → (context_assembly ❘❘ empathy) → context_format → navigator → finalize, with a conditional edge from preflight that routes hijacked turns to safety_intervention → finalize. The topology lives in turn_graph_spec.py as typed StageId/GraphEdge values; the executor (turn_graph.py) reads that spec and calls graph.add_node, graph.add_conditional_edges, and graph.add_edge from it. There is no hand-coded sequencing alongside the spec, so the manifest the frontend renders and the graph the runtime executes can never drift.

The problem

Every chat turn needs five pieces of context assembled before the navigator runs: client signal extracted from history, provider attributes, patient profile, retrieval over therapeutic-fit candidates, and a session narrative. Some depend on others. Some can run in parallel. All can fail or time out, and the chat turn still has to produce a useful response. The whole-turn graph hands this off to one node (context_assembly); inside that node we still need a small workflow to run the tools.

The first version was the obvious one: a function with five sequential awaits. It worked until it didn't. Adding a sixth tool meant editing the function body, the order of failures was implicit, parallelism was an afterthought, and skipping work that had already run for an unchanged session was awkward to bolt on. Promoting tool-level concurrency to the outer graph (one node per tool) was tempting but mismatched: the tool set is parameterized per agent at runtime and can change between deploys, while the outer graph is a stable workflow shape (preflight, assembly, empathy, format, navigator, finalize). Tool-level fan-out belongs inside the assembly node.

Topology as a typed spec, executor as a thin reader

The whole-turn graph topology is declared as typed values in one module. Stages are an enum, edges are frozen dataclasses with an explicit EdgeType, conditional edges carry an EdgeCondition, and a manifest function emits a JSON-serializable shape for the frontend. The executor module imports the spec and wires the StateGraph from it; nothing else codifies the order.

class StageId(StrValueEnum):
    PREFLIGHT = "preflight"
    SAFETY_INTERVENTION = "safety_intervention"
    ASSEMBLY_GATE = "assembly_gate"
    CONTEXT_ASSEMBLY = "context_assembly"
    EMPATHY = "empathy"
    CONTEXT_FORMAT = "context_format"
    NAVIGATOR = "navigator"
    FINALIZE = "finalize"

GRAPH_EDGES = (
    GraphEdge(StageId.START, StageId.PREFLIGHT, EdgeType.ENTRY),
    GraphEdge(StageId.PREFLIGHT, StageId.SAFETY_INTERVENTION,
              EdgeType.CONDITIONAL, condition=EdgeCondition.SAFETY_HIJACKED),
    GraphEdge(StageId.PREFLIGHT, StageId.ASSEMBLY_GATE,
              EdgeType.CONDITIONAL, condition=EdgeCondition.NOT_SAFETY_HIJACKED),
    GraphEdge(StageId.SAFETY_INTERVENTION, StageId.FINALIZE, EdgeType.TERMINAL_PATH),
    GraphEdge(StageId.ASSEMBLY_GATE, StageId.CONTEXT_ASSEMBLY, EdgeType.PARALLEL_BRANCH),
    GraphEdge(StageId.ASSEMBLY_GATE, StageId.EMPATHY,         EdgeType.PARALLEL_BRANCH),
    GraphEdge(StageId.CONTEXT_ASSEMBLY, StageId.CONTEXT_FORMAT, EdgeType.JOIN_INPUT),
    GraphEdge(StageId.EMPATHY,          StageId.CONTEXT_FORMAT, EdgeType.JOIN_INPUT),
    GraphEdge(StageId.CONTEXT_FORMAT, StageId.NAVIGATOR, EdgeType.SEQUENCE),
    GraphEdge(StageId.NAVIGATOR,      StageId.FINALIZE,  EdgeType.SEQUENCE),
    GraphEdge(StageId.FINALIZE,       StageId.END,       EdgeType.EXIT),
)

The executor is a thin reader. JOIN_INPUT edges collapse into the LangGraph multi-source add_edge([s1, s2], target) call, so context formatting waits on both context assembly and empathy without a custom join node. CONDITIONAL edges feed add_conditional_edges with a small router that maps state["safety_hijacked"] to a target. The same module exposes runtime_graph_manifest() for the frontend, which means the manifest the UI renders and the graph the runtime executes are derived from one source.

# turn_graph.py
def build_whole_turn_graph(checkpointer=None):
    graph = StateGraph(TurnGraphState)
    for stage, node in _NODE_FUNCTIONS.items():
        graph.add_node(stage_value(stage), node)
    _wire_graph_topology(graph)  # reads GRAPH_EDGES, calls add_edge / add_conditional_edges
    return graph.compile(checkpointer=checkpointer or get_langgraph_checkpointer())

The TurnGraphState type is a TypedDict with one reducer of note: completed_stages: Annotated[list[str], operator.add]. Each node returns a small state patch, and the reducer concatenates the per-stage entries across parallel branches without a custom merge. Mutable per-turn context (DB session, the runtime object, deps) flows through config["configurable"] instead of through state, so the checkpointed snapshot stays small and JSON-friendly.

One config object per tool

Every tool registered in the pipeline declares its dependencies, defaults, and retry shape in a single dataclass. The dispatcher reads this; nothing else does.

tool-server/routes/context.py
@dataclass(frozen=True)
class PipelineToolConfig:
    name: str
    dependencies: tuple[str, ...] = ()
    inject_inputs: tuple[str, ...] = ()
    defaults: dict[str, Any] = field(default_factory=dict)
    max_retries: int = 1
    retry_backoff: float = 0.5
    timeout: float = 30.0
    optional: bool = False

PIPELINE_CONFIG = {
    "client_signal":   PipelineToolConfig("client_signal"),
    "provider_genome": PipelineToolConfig("provider_genome"),
    "patient_context": PipelineToolConfig("patient_context"),
    "therapeutic_fit": PipelineToolConfig(
        "therapeutic_fit",
        dependencies=("client_signal", "provider_genome"),
        inject_inputs=("signal_summary", "genome_summary"),
        max_retries=2,
    ),
}

The dataclass is frozen so the registry binding cannot be reassigned at runtime; mutable defaults like defaults: dict are still mutable in place, so the dispatcher copies them per call rather than sharing references. Dependency declaration uses tool names rather than Python references, so the registry can be loaded from YAML without imports.

Topological sort into phases

The dispatcher groups tools into phases. Tools within a phase run in parallel via asyncio.gather. Each subsequent phase waits for the previous one. The grouping is Kahn's algorithm; the only output that matters is the phase list.

def _build_execution_phases(specs):
    by_name = {s.name: s for s in specs}
    # Dependencies on tools not present in this run are filtered out by
    # intersecting with by_name. The registry validator (run at startup)
    # rejects unknown dependency names; this filter only handles the
    # case where a tool is intentionally excluded from a given agent's
    # spec list.
    remaining = {s.name: set(PIPELINE_CONFIG[s.name].dependencies) & set(by_name)
                 for s in specs}
    phases = []
    while remaining:
        ready = [n for n, deps in remaining.items() if not deps]
        if not ready:
            raise ValueError(f"cycle in pipeline: {remaining}")
        phases.append([by_name[n] for n in ready])
        for n in ready:
            del remaining[n]
        for deps in remaining.values():
            deps.difference_update(ready)
    return phases

A boot-time validator complements the per-request build: it walks each agent's spec list and warns when a tool depends on a tool the same list excludes. That is usually a config mistake rather than an intentional partial run, and surfacing it at boot keeps the dispatcher's runtime behavior unambiguous.

Phase 1 (parallel) client_signal provider_genome patient_context Phase 2 therapeutic_fit Output aggregated bundle
The four-tool pipeline collapses into two phases. Adding a tool is a registry edit, never an orchestration edit.

Argument injection from prior phases

Tools in phase N can read outputs of phases 1..N-1 by listing the keys they want in inject_inputs. The dispatcher snaps those values out of the aggregated state and merges them with per-call arguments and registry defaults.

def _prepare_arguments(spec, aggregated, defaults):
    args = dict(defaults)
    for key in PIPELINE_CONFIG[spec.name].inject_inputs:
        if key in aggregated:
            args[key] = aggregated[key]
    args.update(spec.arguments or {})  # caller overrides win
    return args

The merge order is intentional: defaults at the bottom, injected values in the middle, caller overrides on top. A test that mocks an injected field can shadow it without touching the dispatcher.

Retries with classified errors

Every tool runs through the same retry wrapper. The wrapper distinguishes "definitely retryable" (network blip, 5xx, timeout) from "do not retry" (validation, 4xx, business rejection). Anything else propagates: an unclassified exception is treated as a real failure and surfaces to the dispatcher rather than getting silently retried.

RETRYABLE = (asyncio.TimeoutError, httpx.ConnectError, httpx.ReadTimeout)

async def _execute_with_retry(spec, fn, cfg):
    for attempt in range(cfg.max_retries + 1):
        try:
            return await asyncio.wait_for(fn(), timeout=cfg.timeout)
        except RETRYABLE as exc:
            last = exc
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code < 500 or attempt == cfg.max_retries:
                raise
            last = exc
        await asyncio.sleep(cfg.retry_backoff * (2 ** attempt))
    raise last

Skip semantics: the load-bearing piece

The cheapest tool execution is the one we don't run. Two skip gates protect every assembly turn.

Completeness threshold. Each tool reports a 0..1 completeness score on its output. The session-level assembler tracks the running per-tool completeness. If completeness for a tool is already above 0.4 and the upstream signals haven't changed, the next turn skips that tool and reuses the prior output. The threshold defaults to 0.4 because that is roughly the point at which incremental work starts producing diminishing returns in our calibration set.

Content hash. The session assembler keeps a hash of the conversation slice that fed each tool. Same hash, no re-run. This catches the common case of a clarifying acknowledgement that does not change the upstream signal.

async def enqueue(self, session_id, slice_hash):
    state = self._state.get(session_id, SessionAssemblerState())
    if state.last_slice_hash == slice_hash:
        return  # no-op, identical input
    state.last_slice_hash = slice_hash
    self._state[session_id] = state
    await self._queue.put(session_id)

# inside the worker loop:
for tool_name, comp in record.completeness.items():
    if comp >= self.threshold and tool_name in last_outputs:
        skipped.append(tool_name)
        continue
    to_run.append(tool_name)

In our calibration on a few hundred sessions, the completeness gate skipped a tool on roughly 60% of turns after the first three turns of a session. The numbers will vary by domain and by how aggressively upstream signals change; the win comes from skipping at all, not from the specific threshold.

State topology assumption

The skip cache shown here lives in process memory for clarity. Production stacks with multiple workers need shared, durable state (Redis, the session store, or a checkpointer) so a turn served by worker B sees the completeness scores written by worker A on the previous turn. The skip logic is identical; the storage is not.

asyncio.gather still does the work within a phase: a single phase is a set of independent tool calls with no inter-tool dependencies, and gathering coroutines is the right primitive at that scope. The whole-turn workflow shape (preflight, parallel assembly and empathy branches, format, navigator, finalize) is expressed as graph edges, not as ad-hoc create_task sequencing. The frontend consumes one stream channel for the whole turn: assembly stage events from custom, graph node lifecycle from updates and tasks, and checkpoint identifiers from checkpoints for replay.

Edge cases worth knowing about

  • Required vs optional tools. A required tool that exhausts its retries makes the assembly node return an error patch, and the conditional edge after preflight (or a finalize-side error path) ends the turn with a clear error event; the navigator never runs. An optional=True tool that fails logs a warning, is excluded from the bundle, and the agent receives a flag indicating which sections were degraded so the system prompt can ask follow-up questions on missing sections.
  • Cycle detection. The Kahn loop raises ValueError("cycle in pipeline") at startup, not at request time. A misconfigured registry never reaches production because the boot sanity test exercises the topological sort against the real config.
  • Idempotency for side-effectful tools. Every tool in the assembly node is read-only (extraction or retrieval), so retries are safe by construction. Booking and other side-effectful tools live downstream inside the navigator subgraph and accept an idempotency key on the request; the policy is no automatic retries on side effects.
  • Resume across worker restarts. The whole-turn graph is compiled with a checkpointer (MemorySaver in dev, AsyncPostgresSaver in production), keyed by thread_id = f"turn:{turn_id}". A worker restart between preflight and navigator resumes from the last checkpointed step rather than re-running preflight; the in-node skip cache is the right complement, not a substitute.
The thing to copy

Two things actually. First, declare your topology in one module (typed nodes and edges) and have the executor wire the StateGraph from it; the frontend manifest and the runtime read the same source. Second, inside any node that fans out to a parameterized tool set, keep the per-tool config object with explicit dependencies and inject_inputs. The two patterns compose: the outer graph is stable, the inner fan-out is configurable.

Takeaway checklist

  1. One topology spec for the whole-turn graph. The executor reads it; nothing else hand-codes the order. Manifest and execution share a source.
  2. Outer workflow shape (preflight, parallel branches, joins, conditional safety route) is expressed as graph edges. Parallel branches and the join run on LangGraph, not asyncio.create_task.
  3. One declarative config object per tool. Dependencies declared by name, not by import. Topological sort into phases inside the assembly node, with asyncio.gather at the within-phase scope only.
  4. Retry classification: timeouts and 5xx are retryable, validation and 4xx are not. Unclassified exceptions propagate.
  5. Skip gates earn the most. Completeness threshold plus content hash makes the second-turn-onward assembly disappear from latency dashboards. The graph checkpointer handles resume across restarts.
  6. Stream stage events through get_stream_writer() on stream_mode="custom" alongside updates, tasks, and checkpoints. One channel for the whole turn.

Engineering Neura series · Up next: Preflight node and conditional safety routing →