Claude Code transcript - page 1/1

[Request interrupted by user for tool use]

Implement the following plan:

PLMB-037 Step 5c: Agent binary ZMQ rewrite

Context

The fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ sockets, but agent subprocesses still communicate via stdin/stdout with __port JSON envelopes. A bridge module (bridge.ml, ~445 lines) translates between the two.

Step 5c eliminates the bridge for agents by having the agent binary speak ZMQ directly — one socket per logical port, no envelopes.

Worktree: create from plumbing-1 branch

Review consensus (5 reviewers, 2 rounds)

Topology

Writer-binds, reader-connects (architect + theory-critic). For agent output endpoints, the agent PUSH binds; the fabric's downstream PULL connects directly to it. For agent input endpoints, the upstream PUSH binds (as today); the agent PULL connects. Fabric skips creating any socket at agent-side endpoints. No intermediate sockets, no forwarding fibres, no close-and-rebind.

Agent binary

ZMQ-only — no transport abstraction. Loop.config is already transport-agnostic via callbacks. The bridge IS the stdio compat layer. A conditional branch in main() falls back to stdio if PLUMB_ENDPOINT_INPUT is absent (for standalone/bridge use during transition).

Thread→fibre simultaneous with ZMQ migration. No hybrid state. Mutex/ConditionEio.Mutex/Eio.Condition. Critical invariant: never hold Eio.Mutex across ZMQ recv. One fibre per socket, no sharing.

Protocols

Startup handshake: agent sends {"status":"ready"} on ctrl_out after connecting all sockets. Fabric waits before sending data. Reaper fibre starts BEFORE ready-wait (theory-critic: crash before ready → deadlock otherwise). Ready signal consumed by handshake, not forwarded to ctrl_out consumer. 30s timeout.

Shutdown: (1) fabric sends EOF on data_in; (2) agent enters drain phase; (3) agent handles pending control; (4) fabric sends {"stop":true} then EOF on ctrl_in; (5) agent sends EOF on all output sockets; (6) agent closes sockets, destroys context, exits; (7) fabric detects EOF on PULLs, reaps child. Per-socket EOF timeouts. Tool sockets remain active until ctrl_out EOF or agent death.

Crash handling: concurrent reaper fibre (waitpid) races normal shutdown via Eio.Fiber.first. On agent death, reaper force-closes all fabric-side sockets for that agent. Hard backstop: SIGTERM after N seconds, SIGKILL after N+5.

Back-pressure: ZMQ_SNDHWM = 0 (unlimited buffering) on all agent sockets — deliberate simplification for LLM-speed workloads. Not equivalent to pipe semantics (pipes block on full 64KB buffer; HWM=0 never blocks, unbounded memory). Follow-on for telemetry queue depth observability.

Module boundaries

ZMQ handler in zmq_agent.ml — not in bridge.ml (architect: bridge is a natural transformation; ZMQ handler is a direct embedding).

PUSH/PULL for tool dispatch (not REQ/REP) — no strict alternation; supports future parallel tool calls. Pipe adapters to existing tool_dispatch_loop. Follow-on ticket for correlation-ID routing.

Naming

PLUMB_ENDPOINT_* (matching implementation plan D8). Presence of PLUMB_ENDPOINT_INPUT selects ZMQ mode.

Architecture

Current (bridge)

Fabric                                    Agent process
  PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop
  PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop

After (ZMQ direct)

Fabric                     Agent process
  PUSH ─bind──(ZMQ)──connect─ PULL → main fibre
  PULL ────(ZMQ)──connect─bind─ PUSH ← main fibre

Writer binds, reader connects. No pipes, no envelopes, no demux thread.

Changes

1. Fabric socket bifurcation

lib/fabric/fabric.ml — modify socket creation loop (lines 100–150):

For routes involving Subprocess processes (checked via proc_kind on process_entry), skip creating the agent-side socket. Specifically:

  • Route where src = subprocess: skip creating PUSH at endpoint(src, src_port). Agent will bind its own PUSH there.
  • Route where dst = subprocess: skip creating PULL at endpoint(dst, dst_port). Agent will connect its own PULL to the upstream PUSH endpoint.

The connect phase also skips these routes (no PULL to connect).

Extend process_handler callback:

?process_handler:(
  sw:Eio.Switch.t ->
  proc_entry:process_entry ->
  push_sockets:(string * [`Push] T.socket) list ->
  pull_sockets:(string * [`Pull] T.socket) list ->
  endpoints:(string * string) list ->
  unit)

push_sockets/pull_sockets: fabric-side sockets only (for bridge path). endpoints: all port→URI mappings (for ZMQ path). For bridge handlers, endpoints is ignored. For ZMQ handlers, socket lists are [].

Update bridge's make_handler to accept ~endpoints:_.

2. ZMQ process handler — lib/fabric/zmq_agent.ml (new)

make_handler function:

  1. Synthesise agent spec (reuse Bridge.synthesise_spec or extract shared helper)
  2. Create tool dispatch sockets: bind PUSH at tool_resp endpoint, bind PULL at tool_req endpoint. Set ZMQ_SNDHWM = 0. Run pipe-adapter fibres to tool_dispatch_loop.
  3. Create ctrl_out PULL socket: bind at ctrl_out endpoint. This socket is created by zmq_agent.ml, not by the general fabric loop (ctrl_out may not appear as a route).
  4. Spawn agent with PLUMB_ENDPOINT_* env vars + credentials
  5. Fork reaper fibre (waitpid) — BEFORE ready-wait
  6. Wait for {"status":"ready"} on ctrl_out PULL (30s timeout; if agent dies first, reaper cancels the wait)
  7. Consume the ready message (do not forward to downstream)
  8. Fork per-socket EOF timeout fibres

Use Eio.Fiber.first to race normal shutdown vs crash reaper. On crash: force-close all fabric-side sockets for this agent. Hard backstop: SIGTERM after 30s of no output EOF, SIGKILL +5s.

3. Rewrite bin/agent/main.ml

  • Remove stdout_mutex, mux_write, start_stdin_demux (~90 lines)
  • Read PLUMB_ENDPOINT_* env vars. If PLUMB_ENDPOINT_INPUT absent, fall back to existing stdio path (preserves bridge compatibility during transition).
  • Create ZMQ context. For each endpoint env var:
  • Input sockets (data_in, ctrl_in, tool_resp): create PULL, connect to endpoint
  • Output sockets (data_out, ctrl_out, telemetry, tool_req): create PUSH, bind at endpoint
  • Set ZMQ_SNDHWM = 0 on all sockets
  • Send {"status":"ready"} on ctrl_out PUSH
  • Main loop: T.recv data_in_sock instead of input_line; raise End_of_file on empty payload (EOF sentinel)
  • Output: T.send data_out_sock, T.send telemetry_sock, etc.
  • Tool dispatch: T.send tool_req_sock / T.recv tool_resp_sock
  • Control reader: daemon fibre doing T.recv ctrl_in_sock
  • control_state: Eio.Mutex.t + Eio.Condition.t
  • Shutdown: send EOF on all output sockets, close context

4. Spawn lib — env var support

lib/spawn/spawn_lib.ml: - Add PLUMB_ENDPOINT_* to safe_env_keys - Helper to compute env var list from port→endpoint mapping

5. Dune dependency

bin/agent/dune: add transport to libraries.

6. Tests (16)

test/test_fabric.ml — ZMQ agent tests: 1. test_agent_identity_zmq — identity response 2. test_agent_eof_zmq — EOF propagation 3. test_agent_eof_cascade_zmqempty ; agent ; id terminates (clean exit code + no zombies) 4. test_agent_telemetry_zmqkind:config on telemetry socket, NOT on data socket (channel separation) 5. test_agent_tool_dispatch_zmq — full chain through pipe adapter 6. test_agent_multi_message_zmq — 3 messages in, 3 responses in order 7. test_agent_concurrent_data_ctrl_zmq — data + pause arrive simultaneously; fibre synchronisation works

test/test_agent_control.ml — port key tests to ZMQ: 8. test_stop_zmq 9. test_stdin_eof_zmq — data EOF triggers drain 10. test_pause_resume_zmq — pause/get/set/resume flow 11. test_drain_zmq — drain completes protocol 12. test_set_memory_malformed_zmq 13. test_basic_response_zmq

Error/lifecycle tests: 14. test_missing_endpoint_env — clear error on missing env var 15. test_startup_ready_signal — white-box: assert send_count_before_ready == 0 16. test_ipc_cleanup — no socket files remain after agent exit

Test infrastructure: - New with_fabric_zmq_agent helper (NOT parameterised — separate from with_fabric_agent) - Extract transport-independent helpers to test_helpers.ml: parse_and_compile, find_agent_binary, fresh_id, fixture, JSON assertion helpers - Set ZMQ_LINGER = 0 on all test sockets (prevent 30s hangs) - Fun.protect wrapper for IPC cleanup on test failure - Existing stdio tests serve as regression for fallback path

7. Documentation

doc/agent-runtime.md — draft at step 2 (contracts before code): - New "ZMQ transport mode" subsection: socket summary table (name, ZMQ type, bind/connect, message format), env var table with detection logic, ready handshake contract (D5), shutdown protocol contract (D6), back-pressure policy - Update telemetry section (dedicated socket, no envelopes) - Update streaming section (no stdin/stdout in ZMQ mode) - Update control channel section (fibre, not thread; socket, not fd)

doc/processes.md: - Annotate "Multiplexed I/O" as stdio transport path - Note Port names table applies to both paths (same logical ports, different physical channels) - Cross-reference agent-runtime.md ZMQ section

doc/architecture.md: - Add lib/fabric/zmq_agent.ml to module table - Update dependency diagram: bin/agenttransport

Files to modify

File Change
lib/fabric/fabric.ml Socket bifurcation; extend callback
lib/fabric/zmq_agent.ml New — ZMQ process handler
bin/agent/main.ml ZMQ rewrite; threads→fibres
bin/agent/dune Add transport dependency
lib/spawn/spawn_lib.ml Endpoint env var helpers
test/test_fabric.ml 7 new ZMQ agent tests
test/test_agent_control.ml 6 control tests ported to ZMQ
test/test_zmq_errors.ml 3 error/lifecycle tests
test/test_helpers.ml New — shared test infrastructure
doc/agent-runtime.md ZMQ transport mode subsection
doc/processes.md Annotate Multiplexed I/O
doc/architecture.md Module table, dependency graph

Implementation order

  1. Documentation contracts — draft ZMQ transport mode subsection in agent-runtime.md: env vars, ready handshake, shutdown protocol, socket table. These are design contracts; writing them first reduces drift.

  2. Fabric socket bifurcation — modify fabric.ml to skip agent-side sockets for subprocess processes; extend callback with endpoints. Update bridge's make_handler to accept new arg. dune build && dune test — all existing tests pass.

  3. ZMQ handlerzmq_agent.ml: spec synthesis, tool dispatch wiring, endpoint env vars, agent spawn, reaper, ready handshake. dune build.

  4. Agent binary rewrite — main.ml: ZMQ sockets, daemon fibres, Eio.Mutex/Condition, ready signal, shutdown protocol, stdio fallback. dune build.

  5. Integration + tests — wire ZMQ handler into fabric, extract test helpers, run all 16 tests + existing suite. dune build && dune test.

  6. Documentation finalisation — update processes.md, architecture.md; review agent-runtime.md against implementation.

Verification

  1. After step 2: dune build && dune test — all existing bridge tests pass with extended callback
  2. After step 4: dune build — agent binary compiles with ZMQ
  3. After step 5: full test suite; all 16 new tests + existing
  4. EOF cascade: empty ; agent ; id terminates cleanly (test 3)
  5. Control protocol: pause/resume/memory over ZMQ (tests 10-12)
  6. Tool dispatch: round-trip through pipe adapter (test 5)
  7. Channel separation: telemetry on telemetry socket only (test 4)
  8. Crash handling: agent death → fabric cleans up (reaper fibre)
  9. IPC cleanup: no socket files after exit (test 16)
  10. Update PLMB ticket and engineering state in LDC

Follow-on tickets

  • Remove stdio fallback from agent binary (after bridge retirement)
  • Correlation-ID routing for parallel tool calls
  • Telemetry queue depth observability (HWM=0 monitoring)
  • doc/executor.md scope note for bridge/stdio path

If you need specific details from before exiting plan mode (like exact code snippets, error messages, or content you generated), read the full transcript at: /Users/ww/.claude/projects/-Users-ww-src-plumbing/e80e21f4-2105-4797-9e0e-4a90f6bb6fc8.jsonl

Thinking

I'll start by reading the engineering state and key source files in parallel to understand the current codebase before setting up a worktree.

ToolSearch
{
  "query": "select:mcp__claude_ai_LDC__read,mcp__claude_ai_LDC__whoareyou",
  "max_results": 2
}
{
  "type": "tool_reference",
  "tool_name": "mcp__claude_ai_LDC__read"
}
{
  "type": "tool_reference",
  "tool_name": "mcp__claude_ai_LDC__whoareyou"
}
mcp__claude_ai_LDC__whoareyou
{}
mcp__claude_ai_LDC__read
{
  "document_id": "research/plumbing/engineering-state.md"
}
{"success":true,"content":{"email":"wwaites@gmail.com","role":"owner","name":null}}
{"success":true,"content":"# Plumbing — Engineering State\n\nLast updated: 2026-03-14 by Research (opus) — PLMB-056 closed\n\n## Current Work\n\n**PLMB-037: ZeroMQ supervisor + router runtime** — replacing the\npipe-based runtime with ZMQ message fabric.\n\n**Branch:** `plmb-037-step5b` (pending merge to main)\n\n### Completed Steps\n\n**Step 0** (`6125c7a`): ZMQ transport layer\n- `lib/transport/` with `TRANSPORT` module type and ZMQ backend\n- 6 smoke tests (PUSH/PULL, PUB/SUB, PAIR, FIFO, concurrent, try_recv)\n- Custom nix derivation for zmq-eio at `nix/zmq-eio.nix`\n\n**Step 2** (`8bcab98`): Routing table compiler\n- `lib/runtime/routing.ml` — compiles plumb body spawn lists into\n  routing tables with process entries and routes\n- Channel kind classification, proper type synthesis for structural\n  morphisms\n- 5 tests\n\n**Step 3** (`f622b60`): Supervisor\n- `lib/supervisor/supervisor.ml` — spawn, kill (three-tier), ordered\n  shutdown\n- 7 tests\n\n**Step 4 partial** (`6452f28`): Fabric and inline morphisms\n- `lib/fabric/fabric.ml` — wires ZMQ sockets from routing table,\n  starts inline morphism fibres\n- `lib/fabric/inline.ml` — all morphisms from `forward.ml` adapted\n  to ZMQ transport\n- **Identity pipeline works end-to-end through ZMQ**\n- Copy+merge test disabled — merge synchronisation needs investigation\n\n**Step 5a** (`ec37ed6`): Agent subprocess bridge\n- `lib/spawn/spawn_lib.ml` — extracted from `lib/plumb/spawn.ml`,\n  dune-site-free (bin_dir and resource_find injected by callers)\n- `lib/plumb/spawn.ml` — thin wrapper injecting dune-site paths\n- `lib/fabric/bridge.ml` — stdio mux/demux adapter for agent\n  subprocesses (natural transformation between ZMQ sockets and\n  __port envelope protocol)\n- `fabric.ml` gains `?process_handler` callback, re-exports Bridge\n- **Agent subprocesses work end-to-end through the fabric**\n- 2 agent tests: identity response + EOF propagation (ELIZA mock)\n\n**Step 5b** (`3139c91`): Exec→Fabric switchover fixes\n- Three fixes: `List.nth` guard broadened (`\"nth\" || \"List.nth\"`),\n  `send_eof` wrapped in transport error handler, test relocation\n  for PLMB-053\n- 87 tests pass\n- Post-review follow-up: narrowed `with _ -> ()` to\n  `Transport.Transport_error _` in fabric.ml, fixed ⊥ comment in\n  routing.ml\n- Filed 6 follow-on tickets: PLMB-054..059\n\n**Step 5b follow-up** (`454f96f`): PLMB-054, 057, 058\n- bridge.ml: narrowed telemetry observation callback catch from\n  `with _ ->` to `Yojson.Json_error` (PLMB-054)\n- routing.ml: named-arg validation now raises clear \"missing\n  required port\" error instead of cryptic `List.nth` failure\n  (PLMB-057)\n- test_routing.ml: 6 new validation tests — dead ports, discard\n  output type, project non-product, wrong channel count, missing\n  named port, ctrl_in on non-product agent (PLMB-058)\n- Routing tests now total 16 (6 compile + 6 validation + 4 resolution)\n\n**Step 5b follow-up** (`b88a546`): PLMB-055\n- routing.ml: extracted ~130-line type validation block from main\n  spawn loop into named local function `check_spawn_types`\n- Replaced wildcard `| _ -> ()` with explicit match arms for all\n  `process_impl` variants (exhaustiveness warning on new variants)\n- Pure refactor, no behavioural change; all 16 routing tests pass\n\n**PLMB-056** (`379f96b`): Empty morphism + EOF correctness\n- Added `Empty` variant to `process_impl` — dual of `Discard`\n- `forward_empty` in `inline.ml`: sends EOF via `Bridge.send_eof`\n- Fixed `recv_loop`: empty payload → `Stream_eof` (was silently\n  skipping — contradicted Bridge's documented EOF protocol)\n- Added EOF propagation to all 12 output morphisms (id, copy, merge,\n  merge_any, barrier, map, filter, project, tagger, format_json,\n  parse_json, empty)\n- Desugarer: `insert_empty_for_writerless` auto-inserts empty for\n  channels declared but never referenced in any spawn\n- Routing compiler: writer-only and reader-only channels promoted\n  from warnings to compile-time errors with actionable messages\n- Removed `dead_ports` from `routing_table` type\n- Removed Phase 3b (dead port EOF seeding) from fabric\n- 3 new tests (empty auto-insert, writer-only error, explicit empty)\n- Routing tests now total 18 (6 compile + 8 validation + 4 resolution)\n- Documentation updated: processes.md, executor.md, architecture.md\n\n### Key Findings\n\n- **`List.nth` on OCaml 5.4.0**: raises `Failure \"nth\"`, NOT\n  `Invalid_argument`. The broadened guard handles both for\n  defensive compatibility.\n- **zmq-eio abandoned**: zmq-eio 5.3.0 has two fatal issues:\n  (1) `send_all` pushes to receivers queue (deadlock), (2) event loop\n  fibres are non-daemon and prevent Eio switch exit. Replaced with\n  direct ZMQ + `Eio_unix.await_readable`/`await_writable` — simpler,\n  properly cancellable, no persistent event loops.\n- **Socket cleanup**: with fd-based polling, sockets can be closed\n  directly via `Zmq.Socket.close`. No event loop deadlock.\n  `Context.terminate` should NOT be called while daemon fibres may\n  still hold socket references — causes hangs.\n- **ipc:// directories**: all endpoint directories must be created\n  before socket bind. The fabric creates dirs for all src/dst names\n  in the routing table, not just process entries.\n- **Bridge Eio threading model**: ZMQ recv/send must run in Eio fibre\n  context (uses `Eio_unix.await_readable/writable`). Pipe I/O must\n  use `Eio_unix.run_in_systhread`. Mux fibres alternate: T.recv\n  (Eio) → run_in_systhread(Mutex + pipe write). Demux:\n  run_in_systhread(input_line) → T.send (Eio).\n- **Binary resolution in tests**: `Spawn_lib.resolve_binary` uses\n  `Sys.executable_name` for build-tree sibling lookup, which fails\n  from test binaries. Bridge accepts `?resolve_bin` override for\n  tests to supply the agent binary path directly.\n- **EOF propagation**: `recv_loop` was silently skipping empty\n  payloads — contradicting Bridge's documented protocol (bridge.ml:\n  20-21). Fixed: empty payload raises `Stream_eof`. All output\n  morphisms now propagate EOF downstream via `Bridge.send_eof` after\n  `recv_loop` returns.\n\n### Next Steps\n\n1. Merge step 5b branch to main\n2. **Step 5c**: Agent binary ZMQ rewrite (separate ticket)\n3. Step 5d: Cleanup — delete exec.ml, forward.ml\n4. Documentation (Step 6) — see PLMB-059\n\nPlan: [plmb-037-implementation-plan.md](research/plumbing/design/plmb-037-implementation-plan.md)\n\n## Build\n\n- Build: `nix develop --command dune build`\n- **CRITICAL**: Must run from **worktree directory**, not the repo\n  root\n- Test: same prefix + `dune test`\n- **test_plumb.ml integration tests**: fail in worktrees because\n  they hardcode `../bin/plumb/main.exe` relative path. Pre-existing\n  issue, not related to PLMB-037.\n- **Dev binary**: `_build/default/bin/plumb/main.exe`\n- **Worktree build**: always use `--root=.`\n- **zmq-eio nix**: custom derivation at `nix/zmq-eio.nix` (still\n  in deps for nix build, but transport lib no longer uses it —\n  uses raw `zmq` + `eio.unix` instead)\n\nFor packaging, release, Docker, and Nix details see\n[build-reference.md](research/plumbing/build-reference.md).\n\n## Key Files\n\n- **AST**: `lib/language/ast.ml`\n- **Parser**: `lib/language/parser.mly` (Menhir)\n- **Session compiler**: `lib/language/compile_session.ml`\n- **Desugarer**: `lib/language/desugar.ml`\n- **Validator**: `lib/language/validate.ml`\n- **Spec loader**: `lib/runtime/spec.ml`\n- **Routing table**: `lib/runtime/routing.ml` (PLMB-037)\n- **Transport**: `lib/transport/transport_intf.ml`, `transport_zmq.ml`\n- **Supervisor**: `lib/supervisor/supervisor.ml`\n- **Fabric**: `lib/fabric/fabric.ml`, `inline.ml`\n- **Bridge**: `lib/fabric/bridge.ml` (Step 5a)\n- **Spawn lib**: `lib/spawn/spawn_lib.ml` (extracted Step 5a)\n- **Spawn wrapper**: `lib/plumb/spawn.ml` (thin dune-site wrapper)\n- **Tool dispatch**: `lib/plumb/tool_dispatch.ml` (~700 lines)\n- **Executor**: `lib/plumb/exec.ml` (~1540 lines) — being replaced\n- **Forwarding**: `lib/plumb/forward.ml` (~530 lines) — being replaced\n- **Tests**: transport (6), routing (18), supervisor (7), fabric (4),\n  plus all existing tests\n\n## Active Tickets\n\n- **PLMB-037**: supervisor + router runtime (Step 5b done, 5c next)\n- **PLMB-041**: select+loop session protocol deadlock\n- **PLMB-043**: dead Copy for ST_end branches\n- **PLMB-059**: documentation for fabric switchover (low)\n"}
Thinking

Now let me read the key source files in parallel.

Read
{
  "file_path": "/Users/ww/src/plumbing/lib/fabric/fabric.ml"
}
     1→(** Fabric — wire ZMQ sockets from a routing table and run the pipeline.
     2→
     3→    The fabric is the runtime core. It:
     4→    1. Creates ZMQ sockets for all routes in the routing table
     5→    2. Binds and connects sockets per the wiring instructions
     6→    3. Seeds channels with initial tokens (D7)
     7→    4. Starts inline morphism fibres
     8→    5. Coordinates subprocess spawning via the supervisor
     9→
    10→    Startup sequence (D7, bind-before-connect for inproc://):
    11→    1. Create ZMQ context
    12→    2. Bind phase: create and bind all sockets, seed channels
    13→    3. Connect phase: connect all consumer sockets
    14→    4. Start phase: start inline fibres, supervisor spawns subprocesses
    15→
    16→    See PLMB-037 Step 4, design decisions D6–D9. *)
    17→
    18→open Language
    19→
    20→module T = Transport.Zmq
    21→
    22→(** A running pipeline handle. *)
    23→type t = {
    24→  ctx : T.context;
    25→  supervisor : Supervisor.t;
    26→  push_sockets : (string, [`Push] T.socket) Hashtbl.t;
    27→  pull_sockets : (string, [`Pull] T.socket) Hashtbl.t;
    28→  pipeline_id : string;
    29→  cancel : unit -> unit;
    30→  (** Cancel function: fails the inner switch, causing all socket
    31→      fibres and morphism fibres to terminate. *)
    32→}
    33→
    34→(** Endpoint naming for ipc:// transport.
    35→    Format: ipc:///tmp/plumb-{pid}/{process}/{port} *)
    36→let endpoint ~pipeline_id proc_name port_name =
    37→  Printf.sprintf "ipc:///tmp/plumb-%s/%s/%s" pipeline_id proc_name port_name
    38→
    39→(** Get the PUSH socket for writing to a boundary input port.
    40→    The caller sends data on this socket to feed the pipeline. *)
    41→let boundary_input t port_name =
    42→  let ep = endpoint ~pipeline_id:t.pipeline_id port_name "out" in
    43→  Hashtbl.find t.push_sockets ep
    44→
    45→(** Get the PULL socket for reading from a boundary output port.
    46→    The caller receives data from this socket. *)
    47→let boundary_output t port_name =
    48→  let ep = endpoint ~pipeline_id:t.pipeline_id port_name "in" in
    49→  Hashtbl.find t.pull_sockets ep
    50→
    51→(** Create the fabric directory for ipc:// endpoints. *)
    52→let ensure_fabric_dir pipeline_id =
    53→  let dir = Printf.sprintf "/tmp/plumb-%s" pipeline_id in
    54→  (try Unix.mkdir dir 0o700 with Unix.Unix_error (Unix.EEXIST, _, _) -> ());
    55→  dir
    56→
    57→(** Clean up the fabric directory. *)
    58→let cleanup_fabric_dir pipeline_id =
    59→  let dir = Printf.sprintf "/tmp/plumb-%s" pipeline_id in
    60→  let cleanup_dir d =
    61→    try
    62→      let entries = Sys.readdir d in
    63→      Array.iter (fun f ->
    64→        try Sys.remove (Filename.concat d f) with Sys_error _ -> ()
    65→      ) entries;
    66→      (try Unix.rmdir d with Unix.Unix_error _ -> ())
    67→    with Sys_error _ -> ()
    68→  in
    69→  (* Remove per-process subdirectories *)
    70→  (try
    71→    let entries = Sys.readdir dir in
    72→    Array.iter (fun sub ->
    73→      let path = Filename.concat dir sub in
    74→      if Sys.is_directory path then cleanup_dir path
    75→    ) entries
    76→  with Sys_error _ -> ());
    77→  cleanup_dir dir
    78→
    79→(** Execute a pipeline from a routing table.
    80→
    81→    This is the main entry point. It replaces [Exec.execute]. *)
    82→let execute ~sw ~routing_table ~type_env
    83→    ?(pipeline_id = string_of_int (Unix.getpid ()))
    84→    ?process_handler () =
    85→  let ctx = T.create_context () in
    86→  let _fabric_dir = ensure_fabric_dir pipeline_id in
    87→  let sup = Supervisor.create () in
    88→
    89→  (* Create subdirectories for all endpoints (processes and channels) *)
    90→  let all_names = Hashtbl.create 16 in
    91→  List.iter (fun (r : Runtime.Routing.route) ->
    92→    Hashtbl.replace all_names r.src ();
    93→    Hashtbl.replace all_names r.dst ()
    94→  ) routing_table.Runtime.Routing.routes;
    95→  Hashtbl.iter (fun name () ->
    96→    let dir = Printf.sprintf "/tmp/plumb-%s/%s" pipeline_id name in
    97→    (try Unix.mkdir dir 0o700 with Unix.Unix_error (Unix.EEXIST, _, _) -> ())
    98→  ) all_names;
    99→
   100→  (* === Phase 1: Create and bind sockets ===
   101→
   102→     For each route, create a socket pair (sender, receiver).
   103→     The sender binds; the receiver connects.
   104→     Routes are keyed by (src, src_port) → sender socket,
   105→     and (dst, dst_port) → receiver socket. *)
   106→
   107→  (* Socket tables: endpoint → socket *)
   108→  let push_sockets : (string, [`Push] T.socket) Hashtbl.t = Hashtbl.create 16 in
   109→  let pull_sockets : (string, [`Pull] T.socket) Hashtbl.t = Hashtbl.create 16 in
   110→
   111→  (* For each route, ensure sender and receiver sockets exist *)
   112→  List.iter (fun (r : Runtime.Routing.route) ->
   113→    let ep = endpoint ~pipeline_id r.src r.src_port in
   114→    if not (Hashtbl.mem push_sockets ep) then begin
   115→      let sock = match r.kind with
   116→        | Runtime.Routing.Fan_out ->
   117→          (* PUB/SUB for copy — but we use PUSH/PULL per-destination
   118→             since our single-frame encoding doesn't support ZMQ topic
   119→             filtering. The routing table handles fan-out by creating
   120→             multiple routes from the same source. *)
   121→          let s = T.push_socket ctx ~sw in
   122→          T.bind s ep;
   123→          s
   124→        | _ ->
   125→          let s = T.push_socket ctx ~sw in
   126→          T.bind s ep;
   127→          s
   128→      in
   129→      Hashtbl.replace push_sockets ep sock
   130→    end;
   131→    let ep_dst = endpoint ~pipeline_id r.dst r.dst_port in
   132→    if not (Hashtbl.mem pull_sockets ep_dst) then begin
   133→      let sock = T.pull_socket ctx ~sw in
   134→      Hashtbl.replace pull_sockets ep_dst sock
   135→    end
   136→  ) routing_table.Runtime.Routing.routes;
   137→
   138→  (* === Phase 2: Connect sockets === *)
   139→
   140→  List.iter (fun (r : Runtime.Routing.route) ->
   141→    let ep = endpoint ~pipeline_id r.src r.src_port in
   142→    let ep_dst = endpoint ~pipeline_id r.dst r.dst_port in
   143→    match Hashtbl.find_opt pull_sockets ep_dst with
   144→    | Some pull_sock -> T.connect pull_sock ep
   145→    | None ->
   146→      Runtime.Log.warn "fabric"
   147→        ["event", `String "no_pull_socket";
   148→         "dst", `String r.dst;
   149→         "dst_port", `String r.dst_port]
   150→  ) routing_table.Runtime.Routing.routes;
   151→
   152→  (* === Phase 3: Seed channels (D7) ===
   153→     Send null token on seeded channels before starting fibres. *)
   154→
   155→  List.iter (fun ch_name ->
   156→    (* Find the PUSH socket that writes to this channel *)
   157→    let matching_routes = List.filter (fun (r : Runtime.Routing.route) ->
   158→      r.topic = ch_name && r.src <> ch_name
   159→    ) routing_table.Runtime.Routing.routes in
   160→    match matching_routes with
   161→    | r :: _ ->
   162→      let ep = endpoint ~pipeline_id r.src r.src_port in
   163→      (match Hashtbl.find_opt push_sockets ep with
   164→       | Some sock -> T.send sock ~topic:ch_name "null"
   165→       | None ->
   166→         Runtime.Log.warn "fabric"
   167→           ["event", `String "seed_no_socket";
   168→            "channel", `String ch_name])
   169→    | [] ->
   170→      Runtime.Log.warn "fabric"
   171→        ["event", `String "seed_no_route";
   172→         "channel", `String ch_name]
   173→  ) routing_table.Runtime.Routing.seeded_channels;
   174→
   175→  (* === Phase 4: Start inline morphism fibres ===
   176→
   177→     Each inline morphism reads from its input PULL socket(s) and
   178→     writes to its output PUSH socket(s). Subprocess-boundary
   179→     processes are handled by the supervisor (deferred — agents
   180→     need the agent binary rewrite from Step 5). *)
   181→
   182→  let find_push proc_name port =
   183→    let ep = endpoint ~pipeline_id proc_name port in
   184→    match Hashtbl.find_opt push_sockets ep with
   185→    | Some s -> s
   186→    | None -> failwith (Printf.sprintf "fabric: no PUSH socket for %s:%s" proc_name port)
   187→  in
   188→  let find_pull proc_name port =
   189→    let ep = endpoint ~pipeline_id proc_name port in
   190→    match Hashtbl.find_opt pull_sockets ep with
   191→    | Some s -> s
   192→    | None -> failwith (Printf.sprintf "fabric: no PULL socket for %s:%s" proc_name port)
   193→  in
   194→
   195→  List.iter (fun (pe : Runtime.Routing.process_entry) ->
   196→    match pe.proc_kind with
   197→    | Runtime.Routing.Subprocess ->
   198→      (match process_handler with
   199→       | Some handler ->
   200→         (* Collect sockets for this subprocess.
   201→            PUSH sockets: routes where src = proc_name (process writes)
   202→            PULL sockets: routes where dst = proc_name (process reads) *)
   203→         let proc_push = List.filter_map (fun (r : Runtime.Routing.route) ->
   204→           if r.src = pe.proc_name then
   205→             let ep = endpoint ~pipeline_id r.src r.src_port in
   206→             match Hashtbl.find_opt push_sockets ep with
   207→             | Some s -> Some (r.src_port, s)
   208→             | None -> None
   209→           else None
   210→         ) routing_table.Runtime.Routing.routes in
   211→         let proc_pull = List.filter_map (fun (r : Runtime.Routing.route) ->
   212→           if r.dst = pe.proc_name then
   213→             let ep = endpoint ~pipeline_id r.dst r.dst_port in
   214→             match Hashtbl.find_opt pull_sockets ep with
   215→             | Some s -> Some (r.dst_port, s)
   216→             | None -> None
   217→           else None
   218→         ) routing_table.Runtime.Routing.routes in
   219→         handler ~sw ~proc_entry:pe
   220→           ~push_sockets:proc_push ~pull_sockets:proc_pull
   221→       | None ->
   222→         Runtime.Log.info "fabric"
   223→           ["event", `String "subprocess_deferred";
   224→            "name", `String pe.proc_name])
   225→    | Runtime.Routing.Inline ->
   226→      Eio.Fiber.fork_daemon ~sw (fun () ->
   227→        (try match pe.proc_impl with
   228→          | Id ->
   229→            let msg_type = Validate.unwrap_stream pe.proc_binding.input in
   230→            Inline.forward_id ~name:pe.proc_name type_env msg_type
   231→              (find_pull pe.proc_name "in")
   232→              (find_push pe.proc_name "out")
   233→          | Copy ->
   234→            let msg_type = Validate.unwrap_stream pe.proc_binding.input in
   235→            Inline.forward_copy ~name:pe.proc_name type_env msg_type
   236→              (find_pull pe.proc_name "in")
   237→              (find_push pe.proc_name "out0")
   238→              (find_push pe.proc_name "out1")
   239→          | Merge ->
   240→            let inp = Validate.unwrap_stream pe.proc_binding.input in
   241→            let in0_t, in1_t = match Validate.resolve type_env inp with
   242→              | Product [t0; t1] -> (t0, t1)
   243→              | t -> (t, t)
   244→            in
   245→            Inline.forward_merge ~name:pe.proc_name type_env in0_t in1_t
   246→              (find_pull pe.proc_name "in0")
   247→              (find_pull pe.proc_name "in1")
   248→              (find_push pe.proc_name "out")
   249→          | MergeAny ->
   250→            let inp = Validate.unwrap_stream pe.proc_binding.input in
   251→            let in0_t, in1_t = match Validate.resolve type_env inp with
   252→              | Product [t0; t1] -> (t0, t1)
   253→              | t -> (t, t)
   254→            in
   255→            Inline.forward_merge_any ~name:pe.proc_name type_env in0_t in1_t
   256→              (find_pull pe.proc_name "in0")
   257→              (find_pull pe.proc_name "in1")
   258→              (find_push pe.proc_name "out")
   259→          | Barrier ->
   260→            let inp = Validate.unwrap_stream pe.proc_binding.input in
   261→            let in_types = match Validate.resolve type_env inp with
   262→              | Product ts -> ts
   263→              | _ -> failwith "barrier: expected product input"
   264→            in
   265→            let n = List.length in_types in
   266→            let recv_socks = List.init n (fun i ->
   267→              find_pull pe.proc_name (Printf.sprintf "in%d" i)) in
   268→            Inline.forward_barrier ~name:pe.proc_name type_env
   269→              in_types recv_socks
   270→              (find_push pe.proc_name "out")
   271→          | Discard ->
   272→            let msg_type = Validate.unwrap_stream pe.proc_binding.input in
   273→            Inline.drain_discard ~name:pe.proc_name type_env msg_type
   274→              (find_pull pe.proc_name "in")
   275→          | Empty ->
   276→            Inline.forward_empty ~name:pe.proc_name
   277→              (find_push pe.proc_name "out")
   278→          | Log label ->
   279→            Inline.forward_log label
   280→              (find_pull pe.proc_name "in")
   281→          | FormatJson ->
   282→            Inline.forward_format_json ~name:pe.proc_name
   283→              (find_pull pe.proc_name "in")
   284→              (find_push pe.proc_name "out")
   285→          | ParseJson ->
   286→            Inline.forward_parse_json ~name:pe.proc_name
   287→              (find_pull pe.proc_name "in")
   288→              (find_push pe.proc_name "out")
   289→          | Map expr ->
   290→            let in_type = Validate.unwrap_stream pe.proc_binding.input in
   291→            let out_type = Validate.unwrap_stream pe.proc_binding.output in
   292→            Inline.forward_map ~name:pe.proc_name type_env
   293→              in_type out_type expr
   294→              (find_pull pe.proc_name "in")
   295→              (find_push pe.proc_name "out")
   296→          | Filter expr ->
   297→            let msg_type = Validate.unwrap_stream pe.proc_binding.input in
   298→            Inline.forward_filter ~name:pe.proc_name type_env msg_type expr
   299→              (find_pull pe.proc_name "in")
   300→              (find_push pe.proc_name "out")
   301→          | Project n ->
   302→            Inline.forward_project ~name:pe.proc_name n
   303→              (find_pull pe.proc_name "in")
   304→              (find_push pe.proc_name "out")
   305→          | Tagger (indices, label) ->
   306→            Inline.forward_tagger ~name:pe.proc_name indices label
   307→              (find_pull pe.proc_name "in")
   308→              (find_push pe.proc_name "out")
   309→          | Agent _ | Plumb _ ->
   310→            (* Should not reach here — filtered by proc_kind *)
   311→            failwith "unreachable: subprocess in inline dispatch"
   312→          | Tool _ | Lower _ | Builtin _ ->
   313→            failwith "unreachable: tool binding in inline dispatch"
   314→        with
   315→        | Inline.Morphism_fatal (label, _extras) ->
   316→          Runtime.Log.error "fabric"
   317→            ["event", `String "morphism_fatal";
   318→             "morphism", `String label]
   319→        | Transport.Transport_error (_, msg) ->
   320→          Runtime.Log.debug "fabric"
   321→            ["event", `String "transport_closed";
   322→             "morphism", `String pe.proc_name;
   323→             "detail", `String msg]);
   324→        `Stop_daemon)
   325→  ) routing_table.Runtime.Routing.processes;
   326→
   327→  (* The cancel function: force-close all sockets. This closes the
   328→     underlying ZMQ sockets directly, bypassing zmq-eio's close
   329→     protocol. The zmq-eio event loops will fail on their next
   330→     operation and terminate, allowing the Eio switch to exit. *)
   331→  let cancel () =
   332→    Hashtbl.iter (fun _ s ->
   333→      (try T.force_close s with _ -> ())) push_sockets;
   334→    Hashtbl.iter (fun _ s ->
   335→      (try T.force_close s with _ -> ())) pull_sockets
   336→  in
   337→  { ctx; supervisor = sup; push_sockets; pull_sockets; pipeline_id; cancel }
   338→
   339→(** Re-export Bridge for external access. *)
   340→module Bridge = Bridge
   341→
   342→(** Shut down the pipeline.
   343→    Closes all sockets (triggering recv timeout/failure in system
   344→    threads), then destroys the ZMQ context and cleans up. *)
   345→let shutdown t =
   346→  let _ = Supervisor.shutdown t.supervisor in
   347→  t.cancel ();
   348→  (* Wait for system threads blocked in recv to notice the socket
   349→     closure (up to the 1s receive timeout) *)
   350→  Unix.sleepf 1.5;
   351→  T.destroy_context t.ctx;
   352→  cleanup_fabric_dir t.pipeline_id
   353→
   354→(** Lightweight cleanup after orderly shutdown.  Use this when
   355→    [cancel()] was already called and all fibres have exited
   356→    (e.g. after the Eio switch exits).  Skips the 1.5s sleep
   357→    since no system threads are outstanding. *)
   358→let cleanup t =
   359→  let _ = Supervisor.shutdown t.supervisor in
   360→  T.destroy_context t.ctx;
   361→  cleanup_fabric_dir t.pipeline_id
   362→
Read
{
  "file_path": "/Users/ww/src/plumbing/lib/fabric/bridge.ml"
}
     1→(** Bridge — stdio mux/demux adapter for agent subprocesses.
     2→
     3→    Natural transformation between ZMQ topic-framed sockets and the
     4→    agent binary's port-multiplexed stdin/stdout protocol.  The agent
     5→    sees the same [{"__port":...,"msg":...}] envelopes as under exec.ml.
     6→
     7→    All pipe writes use [Mutex] + [Eio_unix.run_in_systhread] (R2).
     8→
     9→    The [__port] envelope protocol is confined to this module and the
    10→    agent binary — it must never appear in [fabric.ml] or [inline.ml]
    11→    (R7: the bridge is NOT a morphism, it is a natural transformation
    12→    between two concrete stream representations). *)
    13→
    14→open Language
    15→
    16→module T = Transport.Zmq
    17→
    18→(** {1 EOF Sentinel}
    19→
    20→    An empty-payload message on a PUSH socket signals end-of-stream.
    21→    The receiver exits its recv loop on seeing this sentinel. *)
    22→
    23→let send_eof sock ~topic =
    24→  T.send sock ~topic ""
    25→
    26→let is_eof payload =
    27→  String.length payload = 0
    28→
    29→(** {1 Process Handler Type}
    30→
    31→    The fabric calls a process handler for each subprocess entry.
    32→    The handler receives the process's ZMQ socket endpoints and starts
    33→    the subprocess.  The fabric does not know how. *)
    34→
    35→type process_handler =
    36→  sw:Eio.Switch.t ->
    37→  proc_entry:Runtime.Routing.process_entry ->
    38→  push_sockets:(string * [`Push] T.socket) list ->
    39→  pull_sockets:(string * [`Pull] T.socket) list ->
    40→  unit
    41→
    42→(** {1 Tool Dispatch Callback}
    43→
    44→    The bridge accepts an optional tool dispatch callback rather than
    45→    depending on [Tool_dispatch] directly.  The caller constructs
    46→    this from [Tool_dispatch.tool_dispatch_loop]. *)
    47→
    48→type tool_dispatch =
    49→  req_fd:Unix.file_descr ->
    50→  resp_fd:Unix.file_descr ->
    51→  agent_name:string ->
    52→  unit
    53→
    54→(** {1 Observation Callback} *)
    55→
    56→type observation = {
    57→  agent : string;
    58→  payload : Yojson.Safe.t;
    59→}
    60→
    61→(** {1 Pipe Write Helpers}
    62→
    63→    All writes to child stdin go through mux_envelope / mux_eof,
    64→    protected by an OCaml [Mutex].  These MUST be called from within
    65→    [Eio_unix.run_in_systhread] — [Mutex.lock] is not safe from
    66→    Eio fibres. *)
    67→
    68→(** Write a port-tagged envelope to the child stdin.
    69→    Caller must hold (or be inside run_in_systhread for) the mux mutex. *)
    70→let write_envelope mux_m oc port json_str =
    71→  match Yojson.Safe.from_string json_str with
    72→  | _ ->
    73→    let s = Printf.sprintf
    74→      "{\"__port\":%S,\"msg\":%s}\n" port json_str in
    75→    Mutex.lock mux_m;
    76→    Fun.protect ~finally:(fun () -> Mutex.unlock mux_m) (fun () ->
    77→      output_string oc s; flush oc)
    78→  | exception Yojson.Json_error msg ->
    79→    Runtime.Log.warn "bridge_mux"
    80→      [("port", `String port); ("error", `String msg)]
    81→
    82→(** Write an EOF marker for a port. *)
    83→let write_eof mux_m oc port =
    84→  let s = Printf.sprintf
    85→    "{\"__port\":%S,\"__eof\":true}\n" port in
    86→  Mutex.lock mux_m;
    87→  Fun.protect ~finally:(fun () -> Mutex.unlock mux_m) (fun () ->
    88→    output_string oc s; flush oc)
    89→
    90→(** {1 Bridge Construction} *)
    91→
    92→(** Construct a process handler that spawns agent subprocesses with
    93→    stdio bridge. *)
    94→let make_handler
    95→    ~(prog : program)
    96→    ~type_env
    97→    ~(bin_dir : string option)
    98→    ~(resource_find : string -> string)
    99→    ?(resolve_bin : (string -> string) option)
   100→    ?(tool_dispatch_fn : tool_dispatch option)
   101→    ?(on_observe : (observation -> unit) option)
   102→    ()
   103→  : process_handler =
   104→  fun ~sw ~proc_entry ~push_sockets ~pull_sockets ->
   105→    let pe = proc_entry in
   106→    let binding = pe.Runtime.Routing.proc_binding in
   107→    let config = match binding.impl with
   108→      | Agent config -> config
   109→      | _ -> failwith "bridge: expected Agent impl"
   110→    in
   111→
   112→    (* Resolve prompt paths *)
   113→    let config = Spawn_lib.resolve_prompts_config
   114→      ~resource_find ~base_dir:(Sys.getcwd ()) config in
   115→    let binding = { binding with impl = Agent config } in
   116→
   117→    (* Decompose input/output types *)
   118→    let decomp_in = Spawn_lib.agent_input_decomposition type_env binding.input in
   119→    let decomp_out = Spawn_lib.agent_output_decomposition type_env binding.output in
   120→    let has_control = decomp_in <> None in
   121→    let has_telemetry = decomp_out <> None in
   122→
   123→    (* Synthesize child spec with simple types (no products) *)
   124→    let child_binding =
   125→      let b = match decomp_in with
   126→        | Some (data_t, _) -> { binding with input = Stream data_t }
   127→        | None -> binding
   128→      in
   129→      match decomp_out with
   130→      | Some (result_t, _) -> { b with output = Stream result_t }
   131→      | None -> b
   132→    in
   133→
   134→    (* Find tool bindings *)
   135→    let tool_names = match List.assoc_opt "tools" config with
   136→      | Some (Arr items) ->
   137→        List.filter_map (fun item ->
   138→          match item with Ident name -> Some name | _ -> None
   139→        ) items
   140→      | _ -> []
   141→    in
   142→    let tool_bindings = List.filter_map (fun name ->
   143→      List.find_opt (fun (b : binding) ->
   144→        b.bd_name = name && Validate.is_tool_binding b
   145→      ) prog.prog_bindings
   146→    ) tool_names in
   147→    let has_tools = tool_bindings <> [] || tool_dispatch_fn <> None in
   148→
   149→    (* Synthesize spec *)
   150→    let spec_source =
   151→      if tool_bindings <> [] then
   152→        Spawn_lib.synthesize_agent_spec prog child_binding tool_bindings
   153→      else
   154→        Spawn_lib.synthesize_spec prog child_binding
   155→    in
   156→    let spec_path = Spawn_lib.write_temp_spec spec_source in
   157→
   158→    (* Resolve binary *)
   159→    let resolve = match resolve_bin with
   160→      | Some f -> f
   161→      | None -> Spawn_lib.resolve_binary ~bin_dir
   162→    in
   163→    let bin_path = resolve "plumb-agent" in
   164→
   165→    (* Create child stdin/stdout pipes *)
   166→    let child_in_r, child_in_w = Unix.pipe ~cloexec:true () in
   167→    let child_out_r, child_out_w = Unix.pipe ~cloexec:true () in
   168→    let child_in_oc = Unix.out_channel_of_descr child_in_w in
   169→
   170→    (* MCP connections and env *)
   171→    let mcp_connections = match List.assoc_opt "mcp" config with
   172→      | Some (Arr items) ->
   173→        List.filter_map (fun item ->
   174→          match item with
   175→          | Obj kvs ->
   176→            (match Spawn_lib.connect_mcp_server kvs with
   177→             | Ok conn -> Some conn
   178→             | Error msg ->
   179→               Runtime.Log.warn "mcp_connect_failed"
   180→                 [("detail", `String msg)];
   181→               None)
   182→          | _ -> None
   183→        ) items
   184→      | _ -> []
   185→    in
   186→    let mcp_env = if mcp_connections <> [] then begin
   187→      let mcp_schemas = List.concat_map (fun conn ->
   188→        List.map Mcp_client.tool_schema (Mcp_client.tools conn)
   189→      ) mcp_connections in
   190→      let mcp_path = Filename.temp_file "agentpi_mcp_" ".json" in
   191→      let mcp_oc = open_out_gen
   192→        [Open_wronly; Open_creat; Open_trunc] 0o600 mcp_path in
   193→      Fun.protect ~finally:(fun () -> close_out mcp_oc) (fun () ->
   194→        output_string mcp_oc
   195→          (Yojson.Safe.to_string (`List mcp_schemas)));
   196→      [("PLUMB_MCP_TOOLS", mcp_path)]
   197→    end else [] in
   198→
   199→    (* Agent children need LLM credentials, docroot, and name *)
   200→    let env = Spawn_lib.credential_env () @ Spawn_lib.provider_env ()
   201→              @ [("PLUMB_AGENT_NAME", binding.bd_name)]
   202→              @ mcp_env in
   203→
   204→    (* Spawn child — closes child_in_r and child_out_w in parent *)
   205→    let pid = Spawn_lib.spawn_subprocess ~bin_path ~spec_path
   206→      ~stdin_fd:child_in_r ~stdout_fd:child_out_w
   207→      ~env () in
   208→    Runtime.Log.info "bridge"
   209→      ["event", `String "spawned";
   210→       "agent", `String binding.bd_name;
   211→       "pid", `Int pid;
   212→       "has_control", `Bool has_control;
   213→       "has_telemetry", `Bool has_telemetry;
   214→       "has_tools", `Bool has_tools];
   215→
   216→    (* Mux: OCaml Mutex for system-thread safety (R2) *)
   217→    let mux_m = Mutex.create () in
   218→
   219→    (* Track active mux sources; close child stdin when last exits *)
   220→    let n_sources = 2 + (if has_tools && tool_dispatch_fn <> None then 1 else 0) in
   221→    let mux_remaining = ref n_sources in
   222→    let mux_rem_m = Mutex.create () in
   223→    let mux_source_done () =
   224→      Mutex.lock mux_rem_m;
   225→      decr mux_remaining;
   226→      let n = !mux_remaining in
   227→      Mutex.unlock mux_rem_m;
   228→      if n = 0 then close_out_noerr child_in_oc
   229→    in
   230→
   231→    (* Find sockets by port name *)
   232→    let find_pull port = List.assoc_opt port pull_sockets in
   233→    let find_push port = List.assoc_opt port push_sockets in
   234→
   235→    (* === Tool dispatch setup === *)
   236→    let tool_req_oc = ref None in
   237→    let tool_resp_r = ref None in
   238→    (match tool_dispatch_fn with
   239→     | Some dispatch when has_tools ->
   240→       let req_r, req_w = Unix.pipe ~cloexec:true () in
   241→       let resp_r', resp_w = Unix.pipe ~cloexec:true () in
   242→       tool_req_oc := Some (Unix.out_channel_of_descr req_w);
   243→       tool_resp_r := Some resp_r';
   244→       let _dispatch_thread = Thread.create (fun () ->
   245→         dispatch ~req_fd:req_r ~resp_fd:resp_w
   246→           ~agent_name:binding.bd_name
   247→       ) () in
   248→       ()
   249→     | _ -> ());
   250→
   251→    (* === Mux: data input (ZMQ PULL → child stdin) ===
   252→
   253→       T.recv runs in Eio fibre context (uses Eio_unix.await_readable).
   254→       Pipe writes use Eio_unix.run_in_systhread + Mutex. *)
   255→    (match find_pull "data_in" with
   256→     | Some data_in_sock ->
   257→       Eio.Fiber.fork_daemon ~sw (fun () ->
   258→         (try
   259→           while true do
   260→             let (_topic, payload) = T.recv data_in_sock in
   261→             if is_eof payload then begin
   262→               Eio_unix.run_in_systhread (fun () ->
   263→                 (try write_eof mux_m child_in_oc "input"
   264→                  with Sys_error _ -> ()));
   265→               raise Exit
   266→             end;
   267→             Eio_unix.run_in_systhread (fun () ->
   268→               try write_envelope mux_m child_in_oc "input" payload
   269→               with Sys_error _ -> raise Exit)
   270→           done
   271→         with
   272→         | Exit -> ()
   273→         | Transport.Transport_error (_, _msg) -> ());
   274→        (* After data EOF, send ctrl stop (R4) *)
   275→        if has_control then
   276→          Eio_unix.run_in_systhread (fun () ->
   277→            try
   278→              let s = "{\"__port\":\"ctrl_in\",\"msg\":{\"stop\":true}}\n" in
   279→              Mutex.lock mux_m;
   280→              Fun.protect ~finally:(fun () -> Mutex.unlock mux_m) (fun () ->
   281→                output_string child_in_oc s; flush child_in_oc)
   282→            with Sys_error _ -> ());
   283→        Eio_unix.run_in_systhread (fun () -> mux_source_done ());
   284→        `Stop_daemon)
   285→     | None ->
   286→       (* No data input socket — mux source done immediately *)
   287→       mux_source_done ());
   288→
   289→    (* === Mux: control input (ZMQ PULL → child stdin) === *)
   290→    (match find_pull "ctrl_in" with
   291→     | Some ctrl_in_sock ->
   292→       Eio.Fiber.fork_daemon ~sw (fun () ->
   293→         (try
   294→           while true do
   295→             let (_topic, payload) = T.recv ctrl_in_sock in
   296→             if is_eof payload then begin
   297→               Eio_unix.run_in_systhread (fun () ->
   298→                 (try write_eof mux_m child_in_oc "ctrl_in"
   299→                  with Sys_error _ -> ()));
   300→               raise Exit
   301→             end;
   302→             Eio_unix.run_in_systhread (fun () ->
   303→               try write_envelope mux_m child_in_oc "ctrl_in" payload
   304→               with Sys_error _ -> raise Exit)
   305→           done
   306→         with
   307→         | Exit -> ()
   308→         | Transport.Transport_error (_, _msg) -> ());
   309→        Eio_unix.run_in_systhread (fun () -> mux_source_done ());
   310→        `Stop_daemon)
   311→     | None ->
   312→       (* No control socket — mux source done immediately *)
   313→       mux_source_done ());
   314→
   315→    (* === Mux: tool responses (pipe → child stdin) === *)
   316→    (match !tool_resp_r with
   317→     | Some resp_r' ->
   318→       Eio.Fiber.fork_daemon ~sw (fun () ->
   319→         Eio_unix.run_in_systhread (fun () ->
   320→           let ic = Unix.in_channel_of_descr resp_r' in
   321→           (try while true do
   322→             let line = input_line ic in
   323→             write_envelope mux_m child_in_oc "tool_resp" line
   324→           done with End_of_file | Sys_error _ -> ());
   325→           close_in_noerr ic;
   326→           mux_source_done ());
   327→         `Stop_daemon)
   328→     | None -> ());
   329→
   330→    (* === Demux: child stdout → ZMQ PUSH sockets ===
   331→
   332→       Pipe reads use Eio_unix.run_in_systhread (blocking input_line).
   333→       ZMQ sends run in Eio fibre context. *)
   334→    let data_out_sock = find_push "data_out" in
   335→    let ctrl_out_sock = find_push "ctrl_out" in
   336→    let telemetry_sock = find_push "telemetry" in
   337→
   338→    Eio.Fiber.fork_daemon ~sw (fun () ->
   339→      let ic = Unix.in_channel_of_descr child_out_r in
   340→      (try while true do
   341→        let line = Eio_unix.run_in_systhread (fun () ->
   342→          input_line ic) in
   343→        if String.length (String.trim line) > 0 then
   344→          match Yojson.Safe.from_string line with
   345→          | `Assoc fields ->
   346→            let port = match List.assoc_opt "__port" fields with
   347→              | Some (`String s) -> s | _ -> "output" in
   348→            let msg = match List.assoc_opt "msg" fields with
   349→              | Some m -> Yojson.Safe.to_string m
   350→              | None -> line in
   351→            (match port with
   352→             | "output" ->
   353→               Option.iter (fun sock ->
   354→                 T.send sock ~topic:binding.bd_name msg
   355→               ) data_out_sock
   356→             | "ctrl_out" ->
   357→               Option.iter (fun sock ->
   358→                 T.send sock ~topic:binding.bd_name msg
   359→               ) ctrl_out_sock
   360→             | "telemetry" ->
   361→               Option.iter (fun sock ->
   362→                 T.send sock ~topic:binding.bd_name msg
   363→               ) telemetry_sock;
   364→               (match on_observe with
   365→                | Some cb ->
   366→                  (match Yojson.Safe.from_string msg with
   367→                   | json -> cb { agent = binding.bd_name; payload = json }
   368→                   | exception Yojson.Json_error _ -> ())
   369→                | None -> ())
   370→             | "tool_req" ->
   371→               Eio_unix.run_in_systhread (fun () ->
   372→                 match !tool_req_oc with
   373→                 | Some oc ->
   374→                   output_string oc msg;
   375→                   output_char oc '\n';
   376→                   flush oc
   377→                 | None ->
   378→                   Runtime.Log.warn "bridge_demux"
   379→                     [("event", `String "tool_req_no_dispatch");
   380→                      ("agent", `String binding.bd_name)])
   381→             | _ ->
   382→               Runtime.Log.warn "bridge_demux"
   383→                 [("port", `String port);
   384→                  ("agent", `String binding.bd_name)])
   385→          | _ ->
   386→            (* Not an envelope — treat as raw output *)
   387→            Option.iter (fun sock ->
   388→              T.send sock ~topic:binding.bd_name line
   389→            ) data_out_sock
   390→          | exception exn ->
   391→            Runtime.Log.debug "bridge_demux"
   392→              ["agent", `String binding.bd_name;
   393→               "event", `String "json_parse_failed";
   394→               "error", `String (Printexc.to_string exn)];
   395→            Option.iter (fun sock ->
   396→              T.send sock ~topic:binding.bd_name line
   397→            ) data_out_sock
   398→      done with
   399→      | End_of_file -> ()
   400→      | Sys_error msg ->
   401→        Runtime.Log.debug "bridge_demux"
   402→          ["agent", `String binding.bd_name;
   403→           "event", `String "pipe_closed";
   404→           "detail", `String msg]);
   405→      close_in_noerr ic;
   406→      (* Send EOF sentinel on all output ZMQ sockets (R5) *)
   407→      Option.iter (fun sock ->
   408→        (try send_eof sock ~topic:binding.bd_name with _ -> ())
   409→      ) data_out_sock;
   410→      Option.iter (fun sock ->
   411→        (try send_eof sock ~topic:binding.bd_name with _ -> ())
   412→      ) ctrl_out_sock;
   413→      Option.iter (fun sock ->
   414→        (try send_eof sock ~topic:binding.bd_name with _ -> ())
   415→      ) telemetry_sock;
   416→      (* Close tool_req write-end to terminate dispatch loop *)
   417→      Eio_unix.run_in_systhread (fun () ->
   418→        (match !tool_req_oc with
   419→         | Some oc -> close_out_noerr oc
   420→         | None -> ());
   421→        (* Reap child process (R6) *)
   422→        let timeout_s = match Spawn_lib.timeout_of_binding binding with
   423→          | Some t -> t | None -> 60.0 in
   424→        let status, timed_out =
   425→          Spawn_lib.waitpid_timed ~timeout_seconds:timeout_s pid in
   426→        if timed_out then
   427→          Runtime.Log.warn "bridge_timeout"
   428→            ["agent", `String binding.bd_name;
   429→             "timeout_seconds", `Float timeout_s];
   430→        (match status with
   431→         | Unix.WEXITED 0 -> ()
   432→         | Unix.WEXITED code ->
   433→           Runtime.Log.warn "bridge_exit"
   434→             ["agent", `String binding.bd_name;
   435→              "code", `Int code]
   436→         | Unix.WSIGNALED sig_ ->
   437→           Runtime.Log.warn "bridge_signal"
   438→             ["agent", `String binding.bd_name;
   439→              "signal", `Int sig_]
   440→         | Unix.WSTOPPED _ -> ());
   441→        (* Clean up temp files *)
   442→        (try Sys.remove spec_path with Sys_error _ -> ()));
   443→      `Stop_daemon);
   444→
   445→    ignore has_telemetry
   446→
Read
{
  "file_path": "/Users/ww/src/plumbing/bin/agent/main.ml"
}
Read
{
  "file_path": "/Users/ww/src/plumbing/lib/spawn/spawn_lib.ml"
}
     1→(** Process spawning, spec synthesis, and subprocess infrastructure.
     2→
     3→    Extracted from [lib/plumb/spawn.ml] to break the dependency between
     4→    the fabric and the plumb library.  Dune-site paths (bin_dir,
     5→    resource_find) are injected by callers — this library stays
     6→    dune-site-free. *)
     7→
     8→open Language
     9→
    10→(** {1 Binary Resolution} *)
    11→
    12→let binary_of_impl = function
    13→  | Agent _ -> "plumb-agent"
    14→  | Plumb _ -> "plumb"
    15→  | Tool _ | Lower _ | Builtin _ -> failwith "tools are not spawned as pipeline nodes"
    16→  | Id | Copy | Merge | MergeAny | Discard | Empty | Barrier | Project _
    17→  | Map _ | Filter _ | Tagger _ | Log _ | FormatJson | ParseJson ->
    18→    failwith "structural morphisms do not need binaries"
    19→
    20→(** Resolve a sibling process binary.  Tries two strategies:
    21→    1. Sibling in the same directory (works after [dune install])
    22→    2. Sibling in the dune _build tree (swaps the binary's subdirectory)
    23→    Falls back to bare name for PATH lookup.
    24→
    25→    [~bin_dir] is the directory containing the current binary.  Pass
    26→    [Prefix_path.bin_dir] from the plumb library, or a test fixture. *)
    27→let resolve_binary ~bin_dir name =
    28→  let dir = match bin_dir with
    29→    | Some d -> d
    30→    | None -> Filename.dirname Sys.executable_name
    31→  in
    32→  let installed = Filename.concat dir name in
    33→  if Sys.file_exists installed then installed
    34→  else
    35→    let parent = Filename.dirname dir in
    36→    let base = Filename.basename Sys.executable_name in
    37→    let build = Filename.concat (Filename.concat parent name) base in
    38→    if Sys.file_exists build then build
    39→    else
    40→      (* In the dune build tree, directories use the short name (e.g. "agent")
    41→         while installed binaries use the namespaced name ("plumb-agent").
    42→         Strip the "plumb-" prefix for the build-tree lookup. *)
    43→      let short =
    44→        let prefix = "plumb-" in
    45→        let plen = String.length prefix in
    46→        if String.length name > plen && String.sub name 0 plen = prefix
    47→        then String.sub name plen (String.length name - plen)
    48→        else name
    49→      in
    50→      let build2 = Filename.concat (Filename.concat parent short) base in
    51→      if Sys.file_exists build2 then build2
    52→      else name
    53→
    54→(** Resolve a single prompt path entry.
    55→
    56→    - Contains [/] → filesystem path: absolute paths used as-is,
    57→      relative paths resolved from [base_dir] (the pipeline file's directory).
    58→    - No [/] → bare name: searched via [resource_find].
    59→
    60→    [~resource_find] is a function that resolves bare names to absolute
    61→    paths.  Pass [Resource_path.find] from the plumb library. *)
    62→let resolve_prompt_path ~resource_find ~base_dir name =
    63→  if String.contains name '/' then
    64→    if not (Filename.is_relative name) then name
    65→    else Filename.concat base_dir name
    66→  else
    67→    resource_find name
    68→
    69→(** Transform an agent config's [prompts] entries to absolute paths.
    70→    Other config items pass through unchanged. *)
    71→let resolve_prompts_config ~resource_find ~base_dir config =
    72→  List.map (fun (key, value) ->
    73→    match key, value with
    74→    | "prompts", Arr items ->
    75→      let resolved = List.map (fun item ->
    76→        match item with
    77→        | Str path ->
    78→          Str (resolve_prompt_path ~resource_find ~base_dir path)
    79→        | other -> other
    80→      ) items in
    81→      (key, Arr resolved)
    82→    | _ -> (key, value)
    83→  ) config
    84→
    85→(** {1 Environment Scrubbing}
    86→
    87→    Subprocess environment whitelist.  Only these variables are inherited
    88→    from the parent process.  Everything else — including API keys — is
    89→    stripped.  Callers that need credentials (e.g. agent spawns) must pass
    90→    them explicitly via the [~env] parameter.
    91→
    92→    This is a compile-time upper limit.  Adjust as needed, but keep the
    93→    list minimal: every entry is a potential information leak. *)
    94→let safe_env_keys = [
    95→  "PATH"; "HOME"; "LANG"; "LC_ALL"; "LC_CTYPE"; "LC_MESSAGES";
    96→  "TERM"; "TMPDIR"; "USER"; "SHELL";
    97→  "PLUMB_PATH"; "PLUMB_RESOURCES";
    98→]
    99→
   100→(** Build a scrubbed environment array from the whitelist plus
   101→    caller-supplied overrides. *)
   102→let build_env env =
   103→  let base = List.filter_map (fun k ->
   104→    match Sys.getenv_opt k with
   105→    | Some v -> Some (k ^ "=" ^ v)
   106→    | None -> None
   107→  ) safe_env_keys in
   108→  let extra = List.map (fun (k, v) -> k ^ "=" ^ v) env in
   109→  Array.of_list (base @ extra)
   110→
   111→(** Collect LLM provider API key env vars for agent subprocess spawns.
   112→    Returns a list of [(key, value)] pairs for all set provider keys. *)
   113→let credential_env () =
   114→  List.filter_map (fun k ->
   115→    match Sys.getenv_opt k with
   116→    | Some v -> Some (k, v)
   117→    | None -> None
   118→  ) ["ANTHROPIC_API_KEY"; "OPENAI_API_KEY"]
   119→
   120→(** Collect provider/model override env vars for agent subprocess spawns.
   121→    These allow CLI flags (--provider, --model) to propagate through pipelines. *)
   122→let provider_env () =
   123→  List.filter_map (fun k ->
   124→    match Sys.getenv_opt k with
   125→    | Some v -> Some (k, v)
   126→    | None -> None
   127→  ) ["PLUMB_PROVIDER"; "PLUMB_MODEL"]
   128→
   129→(** {1 Subprocess Spawning}
   130→
   131→    Unified subprocess spawn: fork, wire stdin/stdout/stderr, exec.
   132→    Used by both agent and plumb process spawns.
   133→
   134→    Environment is scrubbed: only [safe_env_keys] plus caller-supplied
   135→    [~env] pairs are visible to the child.  API keys must be passed
   136→    explicitly by callers that need them. *)
   137→let spawn_subprocess ~bin_path ~spec_path
   138→    ~stdin_fd ~stdout_fd ?stderr_fd
   139→    ?(env=[]) ?cwd () =
   140→  let env_array = build_env env in
   141→  let pid = Unix.fork () in
   142→  if pid = 0 then begin
   143→    (* Child process: wire fds and exec.  Use _exit (not exit) on all
   144→       failure paths to avoid running at_exit handlers in the child. *)
   145→    (match cwd with
   146→     | Some d ->
   147→       (try Unix.chdir d
   148→        with Unix.Unix_error (err, _, _) ->
   149→          Runtime.Log.error "spawn_subprocess"
   150→            ["event", `String "chdir_failed";
   151→             "path", `String d;
   152→             "detail", `String (Unix.error_message err)];
   153→          Unix._exit 2)
   154→     | None -> ());
   155→    Unix.dup2 stdin_fd Unix.stdin;
   156→    Unix.close stdin_fd;
   157→    Unix.dup2 stdout_fd Unix.stdout;
   158→    Unix.close stdout_fd;
   159→    (match stderr_fd with
   160→     | Some fd -> Unix.dup2 fd Unix.stderr; Unix.close fd
   161→     | None -> ());
   162→    (try Unix.execve bin_path [| bin_path; spec_path |] env_array
   163→     with exn ->
   164→       Runtime.Log.error "spawn_subprocess"
   165→         ["event", `String "execve_failed";
   166→          "binary", `String bin_path;
   167→          "detail", `String (Printexc.to_string exn)];
   168→       Unix._exit 2)
   169→  end else begin
   170→    Unix.close stdin_fd;
   171→    Unix.close stdout_fd;
   172→    (match stderr_fd with Some fd -> Unix.close fd | None -> ());
   173→    pid
   174→  end
   175→
   176→(** {1 Type Dependency Collection} *)
   177→
   178→module StringSet = Set.Make(String)
   179→
   180→(** Collect all Named type references from a type expression. *)
   181→let rec collect_named acc = function
   182→  | Named name -> StringSet.add name acc
   183→  | Record fields -> List.fold_left (fun a f -> collect_named a f.typ) acc fields
   184→  | List t -> collect_named acc t
   185→  | Stream t -> collect_named acc t
   186→  | Sum variants -> List.fold_left collect_named acc variants
   187→  | Product components -> List.fold_left collect_named acc components
   188→  | Primitive _ | Unit -> acc
   189→
   190→(** Find the type declarations a binding depends on, transitively.
   191→    Walks the binding's input/output types, then chases Named references
   192→    through type declarations until reaching a fixed point. *)
   193→let required_types (prog : program) (binding : binding) =
   194→  let all_type_decls = prog.prog_types in
   195→  let rec expand needed =
   196→    let next = List.fold_left (fun acc td ->
   197→      if StringSet.mem td.td_name acc
   198→      then collect_named acc td.td_type
   199→      else acc
   200→    ) needed all_type_decls in
   201→    if StringSet.equal next needed then needed
   202→    else expand next
   203→  in
   204→  let seeds =
   205→    collect_named (collect_named StringSet.empty binding.input) binding.output
   206→  in
   207→  let needed = expand seeds in
   208→  List.filter (fun td -> StringSet.mem td.td_name needed) all_type_decls
   209→
   210→(** {1 Spec Synthesis} *)
   211→
   212→(** Synthesize a minimal .plumb spec for a child process.
   213→    Contains only the binding and its transitive type dependencies. *)
   214→let synthesize_spec (prog : program) (binding : binding) =
   215→  let types = required_types prog binding in
   216→  let mini = { prog_types = types; prog_protocols = [];
   217→               prog_bindings = [binding] } in
   218→  pp_program mini
   219→
   220→(** Collect required types from multiple bindings transitively. *)
   221→let required_types_multi (prog : program) (bindings : binding list) =
   222→  let all_type_decls = prog.prog_types in
   223→  let rec expand needed =
   224→    let next = List.fold_left (fun acc td ->
   225→      if StringSet.mem td.td_name acc
   226→      then collect_named acc td.td_type
   227→      else acc
   228→    ) needed all_type_decls in
   229→    if StringSet.equal next needed then needed
   230→    else expand next
   231→  in
   232→  let seeds = List.fold_left (fun acc b ->
   233→    collect_named (collect_named acc b.input) b.output
   234→  ) StringSet.empty bindings in
   235→  let needed = expand seeds in
   236→  List.filter (fun td -> StringSet.mem td.td_name needed) all_type_decls
   237→
   238→(** Synthesize a spec for an agent that has tool bindings.
   239→    Includes the agent binding + tool bindings + shared type dependencies.
   240→    Does NOT include backing agent bindings — the runtime dispatches those. *)
   241→let synthesize_agent_spec (prog : program) (agent_binding : binding)
   242→    (tool_bindings : binding list) =
   243→  let all_bindings = agent_binding :: tool_bindings in
   244→  let types = required_types_multi prog all_bindings in
   245→  let mini = { prog_types = types; prog_protocols = [];
   246→               prog_bindings = all_bindings } in
   247→  pp_program mini
   248→
   249→(** Write a synthesized spec to a temp file. *)
   250→let write_temp_spec source =
   251→  let path = Filename.temp_file "agentpi_" ".plumb" in
   252→  let oc = open_out_gen [Open_wronly; Open_creat; Open_trunc] 0o600 path in
   253→  Fun.protect ~finally:(fun () -> close_out oc) (fun () ->
   254→    output_string oc source);
   255→  path
   256→
   257→(** {1 Agent Output Decomposition} *)
   258→
   259→(** If agent output is a product of streams [(!B, !T)], return [Some (B, T)].
   260→    Non-product or non-stream-component output returns [None].
   261→    Product with other than 2 stream components raises a config error —
   262→    agent product output is always (result, telemetry).
   263→
   264→    Note: [!(A, B)] (stream of pairs) is NOT decomposed — that's a
   265→    synchronized stream, not separate channels.  Only [(!A, !B)] (pair
   266→    of streams) maps to separate fds. *)
   267→let agent_output_decomposition type_env output =
   268→  match Validate.resolve type_env output with
   269→  | Product [Stream result_t; Stream think_t] -> Some (result_t, think_t)
   270→  | Product _ ->
   271→    Runtime.Error.raise_ Config "config_error"
   272→      "agent: product output must be a pair of streams, e.g. (!A, !B)"
   273→  | _ -> None
   274→
   275→(** {1 Agent Input Decomposition} *)
   276→
   277→(** If agent input is a product of streams [(!A, !C)], return [Some (A, C)].
   278→    Non-product or non-stream-component input returns [None].
   279→    Product with other than 2 stream components raises a config error —
   280→    agent product input is always (data, control).
   281→
   282→    Symmetric to [agent_output_decomposition]. *)
   283→let agent_input_decomposition type_env input =
   284→  match Validate.resolve type_env input with
   285→  | Product [Stream data_t; Stream control_t] -> Some (data_t, control_t)
   286→  | Product _ ->
   287→    Runtime.Error.raise_ Config "config_error"
   288→      "agent: product input must be a pair of streams, e.g. (!A, !C)"
   289→  | _ -> None
   290→
   291→(** {1 Port Name Resolution} *)
   292→
   293→(** Port names for structural morphisms by keyword.
   294→    Returns [None] if the keyword is not a structural morphism.
   295→    [~n_args] is the spawn argument count, used by barrier to
   296→    determine arity (N inputs = n_args - 1). *)
   297→let port_names_of_structural ~n_args = function
   298→  | "id"      -> Some [| "input"; "output" |]
   299→  | "copy"    -> Some [| "input"; "out0"; "out1" |]
   300→  | "merge"   -> Some [| "in0"; "in1"; "output" |]
   301→  | "discard" | "_format_json" | "_parse_json" -> Some [| "input"; "output" |]
   302→  | "barrier" ->
   303→    let n = n_args - 1 in
   304→    Some (Array.init n_args (fun i ->
   305→      if i < n then Printf.sprintf "in%d" i else "output"))
   306→  | _ -> None
   307→
   308→(** Port names for a binding, determined by impl kind and input/output types.
   309→    Structural morphisms use fixed ports; agents use up to 4 ports
   310→    (input, control, output, telemetry); plumbs use declared ports. *)
   311→let port_names_of_binding type_env (binding : binding) =
   312→  match binding.impl with
   313→  | Id      -> [| "input"; "output" |]
   314→  | Copy    -> [| "input"; "out0"; "out1" |]
   315→  | Merge | MergeAny -> [| "in0"; "in1"; "output" |]
   316→  | Discard -> [| "input"; "output" |]
   317→  | Empty -> [| "output" |]
   318→  | Barrier ->
   319→    let inp = match Validate.resolve type_env binding.input with
   320→      | Stream t -> Validate.resolve type_env t
   321→      | t -> t
   322→    in
   323→    let n = match inp with
   324→      | Product ts -> List.length ts
   325→      | _ -> 2  (* fallback for non-product; error caught later *)
   326→    in
   327→    Array.init (n + 1) (fun i ->
   328→      if i < n then Printf.sprintf "in%d" i else "output")
   329→  | Agent _ ->
   330→    let has_control = agent_input_decomposition type_env binding.input <> None in
   331→    let has_telemetry = agent_output_decomposition type_env binding.output <> None in
   332→    (match has_control, has_telemetry with
   333→     | false, false -> [| "input"; "output" |]
   334→     | false, true  -> [| "input"; "output"; "telemetry" |]
   335→     | true,  false -> [| "input"; "ctrl_in"; "output"; "ctrl_out" |]
   336→     | true,  true  -> [| "input"; "ctrl_in"; "output"; "ctrl_out"; "telemetry" |])
   337→  | Map _ | Filter _ | Project _ | Tagger _ | FormatJson | ParseJson -> [| "input"; "output" |]
   338→  | Log _ -> [| "input"; "output" |]
   339→  | Tool _ | Lower _ | Builtin _ -> failwith "tool bindings cannot be spawned as pipeline nodes"
   340→  | Plumb (ports, _) -> Array.of_list ports
   341→
   342→(** Resolve spawn args to a positional channel list.
   343→    Positional args pass through; named args are reordered to match
   344→    the port_names array.  Mixed positional/named is rejected. *)
   345→let resolve_channels (sp : spawn_stmt) port_names =
   346→  match sp.sp_args with
   347→  | [] ->
   348→    Runtime.Error.raise_ Config "config_error"
   349→      (Printf.sprintf "spawn %s: no arguments" sp.sp_process)
   350→  | (Positional _) :: _ ->
   351→    List.map (function
   352→      | Positional ch -> ch
   353→      | Labeled _ ->
   354→        Runtime.Error.raise_ Config "config_error"
   355→          (Printf.sprintf "spawn %s: cannot mix positional and named arguments"
   356→            sp.sp_process)
   357→    ) sp.sp_args
   358→  | (Labeled _) :: _ ->
   359→    let named = List.map (function
   360→      | Labeled (p, ch) -> (p, ch)
   361→      | Positional _ ->
   362→        Runtime.Error.raise_ Config "config_error"
   363→          (Printf.sprintf "spawn %s: cannot mix positional and named arguments"
   364→            sp.sp_process)
   365→    ) sp.sp_args in
   366→    (* Check for unknown port names *)
   367→    List.iter (fun (p, _) ->
   368→      if not (Array.exists (fun n -> n = p) port_names) then
   369→        Runtime.Error.raise_ Config "config_error"
   370→          (Printf.sprintf "spawn %s: unknown port '%s'" sp.sp_process p)
   371→    ) named;
   372→    (* Check for duplicate port names *)
   373→    let seen = Hashtbl.create (Array.length port_names) in
   374→    List.iter (fun (p, _) ->
   375→      if Hashtbl.mem seen p then
   376→        Runtime.Error.raise_ Config "config_error"
   377→          (Printf.sprintf "spawn %s: duplicate port '%s'" sp.sp_process p);
   378→      Hashtbl.add seen p ()
   379→    ) named;
   380→    (* Build channel list in positional order.
   381→       All ports are required — no exemptions. *)
   382→    Array.to_seq port_names |> Seq.filter_map (fun port_name ->
   383→      match List.assoc_opt port_name named with
   384→      | Some ch -> Some ch
   385→      | None ->
   386→        Runtime.Error.raise_ Config "config_error"
   387→          (Printf.sprintf "spawn %s: missing required port '%s'"
   388→            sp.sp_process port_name)
   389→    ) |> List.of_seq
   390→
   391→(** Maximum output per stream for the [exec] builtin, in bytes.
   392→    Compile-time upper limit — adjust as needed.  128 KB is enough for
   393→    useful command output without overrunning LLM context windows. *)
   394→let max_exec_output_bytes = 131_072
   395→
   396→(** Exec timeout in seconds. *)
   397→let exec_timeout_seconds = 30.0
   398→
   399→(** Read from a file descriptor into a buffer, up to [max_bytes].
   400→    Returns [(contents, truncated)].  Used by the [exec] builtin to
   401→    capture stdout/stderr with a size cap. *)
   402→let read_capped fd max_bytes =
   403→  let buf = Buffer.create 4096 in
   404→  let chunk = Bytes.create 4096 in
   405→  let truncated = ref false in
   406→  (try
   407→    let continue = ref true in
   408→    while !continue do
   409→      let n = Unix.read fd chunk 0 4096 in
   410→      if n = 0 then continue := false
   411→      else begin
   412→        let remaining = max_bytes - Buffer.length buf in
   413→        if remaining <= 0 then begin
   414→          truncated := true;
   415→          continue := false
   416→        end else begin
   417→          let take = min n remaining in
   418→          Buffer.add_subbytes buf chunk 0 take;
   419→          if take < n then begin
   420→            truncated := true;
   421→            continue := false
   422→          end
   423→        end
   424→      end
   425→    done
   426→  with Unix.Unix_error (err, _, _) ->
   427→    (* Unexpected: EBADF or similar — child pipe should EOF cleanly. *)
   428→    Runtime.Log.debug "read_capped"
   429→      ["event", `String "read_error";
   430→       "detail", `String (Unix.error_message err)]);
   431→  (Buffer.contents buf, !truncated)
   432→
   433→(** {1 Timeout Helpers} *)
   434→
   435→(** Extract [@timeout N] annotation value from a binding's attributes.
   436→    Returns [Some seconds] if the annotation exists, [None] otherwise. *)
   437→let timeout_of_binding (b : binding) : float option =
   438→  List.find_map (fun (key, value) ->
   439→    if key = "timeout" then
   440→      match value with
   441→      | Num n -> Some n
   442→      | _ -> None
   443→    else None
   444→  ) b.bd_attrs
   445→
   446→(** Timed wait for a child process.  Polls with [Unix.waitpid WNOHANG]
   447→    until the child exits or the deadline is reached.
   448→
   449→    On timeout: sends SIGTERM, waits [grace_seconds] (default 5s) with
   450→    polling, then sends SIGKILL and reaps.
   451→
   452→    [timeout_seconds <= 0.0] means no timeout — blocks indefinitely.
   453→    Returns [(status, timed_out)]. *)
   454→let waitpid_timed ?(grace_seconds = 5.0) ~timeout_seconds pid =
   455→  if timeout_seconds <= 0.0 then
   456→    let _, status = Unix.waitpid [] pid in
   457→    (status, false)
   458→  else
   459→    let deadline = Unix.gettimeofday () +. timeout_seconds in
   460→    let rec poll () =
   461→      match Unix.waitpid [Unix.WNOHANG] pid with
   462→      | 0, _ ->
   463→        if Unix.gettimeofday () >= deadline then begin
   464→          (* Timeout: SIGTERM first *)
   465→          (try Unix.kill pid Sys.sigterm with Unix.Unix_error _ -> ());
   466→          let grace_deadline = Unix.gettimeofday () +. grace_seconds in
   467→          let rec grace_poll () =
   468→            match Unix.waitpid [Unix.WNOHANG] pid with
   469→            | 0, _ ->
   470→              if Unix.gettimeofday () >= grace_deadline then begin
   471→                (* Grace period expired: SIGKILL *)
   472→                (try Unix.kill pid Sys.sigkill with Unix.Unix_error _ -> ());
   473→                let _, status = Unix.waitpid [] pid in
   474→                (status, true)
   475→              end else begin
   476→                ignore (Unix.select [] [] [] 0.1);
   477→                grace_poll ()
   478→              end
   479→            | _, status -> (status, true)
   480→          in
   481→          grace_poll ()
   482→        end else begin
   483→          ignore (Unix.select [] [] [] 0.1);
   484→          poll ()
   485→        end
   486→      | _, status -> (status, false)
   487→    in
   488→    poll ()
   489→
   490→(** {1 MCP Connection} *)
   491→
   492→(** Parse MCP server config and connect.  Returns [Ok conn] on success,
   493→    [Error msg] on config or connection failure. *)
   494→let connect_mcp_server (kvs : (string * config_value) list) =
   495→  let get_str key =
   496→    List.assoc_opt key kvs |> Option.map (function
   497→      | Str s -> s | _ -> failwith (key ^ " must be a string"))
   498→  in
   499→  let get_str_list key =
   500→    match List.assoc_opt key kvs with
   501→    | Some (Arr items) ->
   502→      Some (List.filter_map (function Str s -> Some s | _ -> None) items)
   503→    | _ -> None
   504→  in
   505→  try
   506→    let has_url = get_str "url" <> None in
   507→    let has_command = get_str "command" <> None in
   508→    if has_url && has_command then
   509→      Error "mcp entry has both 'url' and 'command'"
   510→    else if has_url then begin
   511→      let url = Option.get (get_str "url") in
   512→      let token = match get_str "token" with
   513→        | Some t -> Some t
   514→        | None ->
   515→          match get_str "token_env" with
   516→          | Some env_var ->
   517→            (try Some (Sys.getenv env_var)
   518→             with Not_found ->
   519→               failwith (Printf.sprintf "env var '%s' not set" env_var))
   520→          | None -> None
   521→      in
   522→      let tool_filter = get_str_list "tools" in
   523→      let prefix = get_str "prefix" in
   524→      Ok (Mcp_client.connect_http ~url ?token ?tool_filter ?prefix ())
   525→    end else
   526→      match get_str "command" with
   527→      | None -> Error "mcp config requires 'url' or 'command'"
   528→      | Some command ->
   529→        let args = get_str_list "args" |> Option.value ~default:[] in
   530→        let tool_filter = get_str_list "tools" in
   531→        let env = match List.assoc_opt "env" kvs with
   532→          | Some (Obj pairs) ->
   533→            List.filter_map (fun (k, v) ->
   534→              match v with Str s -> Some (k, s) | _ -> None
   535→            ) pairs
   536→          | _ -> []
   537→        in
   538→        let prefix = get_str "prefix" in
   539→        Ok (Mcp_client.connect_stdio ~command ~args ~env ?tool_filter ?prefix ())
   540→  with Failure msg -> Error msg
   541→
     1→(** Agent process — LLM conversation with validated I/O.
     2→
     3→    Reads JSON Lines from stdin, runs one conversation loop per line
     4→    with the configured LLM, validates the output, and writes
     5→    JSON Lines to stdout. *)
     6→
     7→let get_config_str config key =
     8→  List.assoc_opt key config |> Option.map (function
     9→    | Language.Str s -> s
    10→    | _ -> Runtime.Error.raise_ Config "config_error"
    11→             (Printf.sprintf "%s must be a string" key))
    12→
    13→let get_config_num config key =
    14→  List.assoc_opt key config |> Option.map (function
    15→    | Language.Num n -> n
    16→    | _ -> Runtime.Error.raise_ Config "config_error"
    17→             (Printf.sprintf "%s must be a number" key))
    18→
    19→let get_config_bool config key =
    20→  List.assoc_opt key config |> Option.map (function
    21→    | Language.Bool b -> b
    22→    | _ -> Runtime.Error.raise_ Config "config_error"
    23→             (Printf.sprintf "%s must be a boolean" key))
    24→
    25→let get_config_str_list config key =
    26→  List.assoc_opt key config |> Option.map (function
    27→    | Language.Arr items ->
    28→      List.map (function
    29→        | Language.Str s -> s
    30→        | _ -> Runtime.Error.raise_ Config "config_error"
    31→                 (Printf.sprintf "%s array must contain strings" key)
    32→      ) items
    33→    | _ -> Runtime.Error.raise_ Config "config_error"
    34→             (Printf.sprintf "%s must be an array" key))
    35→
    36→(** {1 Port-multiplexed I/O}
    37→
    38→    All logical channels share stdin/stdout via tagged JSON envelopes:
    39→    [{"__port": "name", "msg": <json>}].
    40→
    41→    Input: a single demux thread reads stdin, parses __port, and routes
    42→    to internal pipes (one per logical input channel).
    43→
    44→    Output: a mutex-protected [mux_write] function wraps each message
    45→    with its __port tag before writing to stdout. *)
    46→
    47→(** Mutex protecting stdout writes.  Multiple threads (conversation loop,
    48→    control handler, telemetry emitter) may write concurrently. *)
    49→let stdout_mutex = Mutex.create ()
    50→
    51→(** Write a JSON line to stdout wrapped in a port envelope. *)
    52→let mux_write port json_str =
    53→  let envelope = Printf.sprintf "{\"__port\":%S,\"msg\":%s}\n" port json_str in
    54→  Mutex.lock stdout_mutex;
    55→  Fun.protect ~finally:(fun () -> Mutex.unlock stdout_mutex) (fun () ->
    56→    output_string stdout envelope;
    57→    flush stdout)
    58→
    59→(** Start the stdin demux thread.  Reads JSON Lines from stdin,
    60→    parses the [__port] field, and routes the [msg] payload to the
    61→    appropriate internal pipe.
    62→
    63→    Port-specific EOF: when a message has [__eof: true], the demux
    64→    thread closes that port's internal pipe without closing stdin.
    65→    This allows the data input pipe to close (triggering the drain
    66→    loop) while control messages continue flowing.
    67→
    68→    On stdin EOF, all remaining open pipes are closed. *)
    69→let start_stdin_demux ~data_w ~ctrl_w ~tool_resp_w =
    70→  let data_open = ref true in
    71→  let ctrl_open = ref true in
    72→  let tool_open = ref (tool_resp_w <> None) in
    73→  let route_line line =
    74→    if String.length (String.trim line) = 0 then ()
    75→    else match Yojson.Safe.from_string line with
    76→    | `Assoc fields ->
    77→      let port = match List.assoc_opt "__port" fields with
    78→        | Some (`String s) -> s | _ -> "input" in
    79→      let is_eof = match List.assoc_opt "__eof" fields with
    80→        | Some (`Bool true) -> true | _ -> false in
    81→      if is_eof then begin
    82→        (* Close the internal pipe for this port *)
    83→        (match port with
    84→         | "input" when !data_open ->
    85→           close_out_noerr data_w; data_open := false
    86→         | "ctrl_in" when !ctrl_open ->
    87→           close_out_noerr ctrl_w; ctrl_open := false
    88→         | "tool_resp" ->
    89→           Option.iter close_out_noerr tool_resp_w; tool_open := false
    90→         | _ -> ())
    91→      end else begin
    92→        let msg = match List.assoc_opt "msg" fields with
    93→          | Some `Null -> ""  (* null msg → blank line (skip/no-op) *)
    94→          | Some m -> Yojson.Safe.to_string m
    95→          | None -> line in
    96→        let target = match port with
    97→          | "input" when !data_open -> Some data_w
    98→          | "ctrl_in" when !ctrl_open -> Some ctrl_w
    99→          | "tool_resp" when !tool_open -> tool_resp_w
   100→          | _ ->
   101→            Runtime.Log.warn "mux_port_closed_or_unknown"
   102→              [("port", `String port)];
   103→            None
   104→        in
   105→        (match target with
   106→         | Some oc ->
   107→           output_string oc msg;
   108→           output_char oc '\n';
   109→           flush oc
   110→         | None -> ())
   111→      end
   112→    | _ ->
   113→      (* Not an envelope — treat as raw data input *)
   114→      if !data_open then begin
   115→        output_string data_w line;
   116→        output_char data_w '\n';
   117→        flush data_w
   118→      end
   119→    | exception Yojson.Json_error _ ->
   120→      if !data_open then begin
   121→        output_string data_w line;
   122→        output_char data_w '\n';
   123→        flush data_w
   124→      end
   125→  in
   126→  Thread.create (fun () ->
   127→    (try while true do
   128→      let line = input_line stdin in
   129→      route_line line
   130→    done with End_of_file | Sys_error _ -> ());
   131→    if !data_open then close_out_noerr data_w;
   132→    if !ctrl_open then close_out_noerr ctrl_w;
   133→    if !tool_open then Option.iter close_out_noerr tool_resp_w
   134→  ) ()
   135→
   136→(** Memory request from the control channel, handled by the main loop. *)
   137→type memory_request =
   138→  | Get_memory
   139→  | Set_memory of Client.message list
   140→
   141→(** Mutable state updated by the control channel reader thread.
   142→    The conversation loop reads this before each API call. *)
   143→type control_state = {
   144→  mutable temperature : float option;
   145→  mutable model_override : string option;
   146→  mutable stop : bool;
   147→  mutable paused : bool;
   148→  mutable pause_acked : bool;
   149→  mutable memory_request : memory_request option;
   150→  memory_cond : Condition.t;
   151→  mutex : Mutex.t;
   152→}
   153→
   154→let make_control_state () = {
   155→  temperature = None;
   156→  model_override = None;
   157→  stop = false;
   158→  paused = false;
   159→  pause_acked = false;
   160→  memory_request = None;
   161→  memory_cond = Condition.create ();
   162→  mutex = Mutex.create ();
   163→}
   164→
   165→(** Parse a JSON array into a message list. Returns None if any
   166→    message is malformed (missing role or content). *)
   167→let parse_messages json =
   168→  match json with
   169→  | `List items ->
   170→    let msgs = List.filter_map (fun m ->
   171→      match m with
   172→      | `Assoc fields ->
   173→        (match List.assoc_opt "role" fields,
   174→               List.assoc_opt "content" fields with
   175→         | Some (`String role), Some content ->
   176→           Some { Client.role; content }
   177→         | _ -> None)
   178→      | _ -> None
   179→    ) items in
   180→    if List.length msgs <> List.length items then
   181→      None  (* reject if any message malformed *)
   182→    else
   183→      Some msgs
   184→  | _ -> None
   185→
   186→(** Read control messages from the internal ctrl pipe. Each line is a JSON object;
   187→    recognised fields update the control state. Unknown fields
   188→    are silently ignored (forward compatibility). *)
   189→let control_reader_thread (state : control_state) ic =
   190→  (try while true do
   191→    let line = input_line ic in
   192→    if String.length (String.trim line) > 0 then begin
   193→      let json = try Some (Yojson.Safe.from_string line) with _ -> None in
   194→      match json with
   195→      | Some (`Assoc fields) ->
   196→        Mutex.lock state.mutex;
   197→        Fun.protect ~finally:(fun () -> Mutex.unlock state.mutex) (fun () ->
   198→          (match List.assoc_opt "set_temp" fields with
   199→           | Some (`Float f) -> state.temperature <- Some f
   200→           | Some (`Int n) -> state.temperature <- Some (float_of_int n)
   201→           | Some `Null -> state.temperature <- None
   202→           | _ -> ());
   203→          (match List.assoc_opt "set_model" fields with
   204→           | Some (`String s) -> state.model_override <- Some s
   205→           | Some `Null -> state.model_override <- None
   206→           | _ -> ());
   207→          (match List.assoc_opt "stop" fields with
   208→           | Some (`Bool true) -> state.stop <- true
   209→           | _ -> ());
   210→          (match List.assoc_opt "pause" fields with
   211→           | Some (`Bool true) -> state.paused <- true
   212→           | _ -> ());
   213→          (match List.assoc_opt "resume" fields with
   214→           | Some (`Bool true) -> state.paused <- false
   215→           | _ -> ());
   216→          (match List.assoc_opt "get_memory" fields with
   217→           | Some (`Bool true) ->
   218→             state.memory_request <- Some Get_memory
   219→           | _ -> ());
   220→          (match List.assoc_opt "set_memory" fields with
   221→           | Some payload ->
   222→             (match parse_messages payload with
   223→              | Some msgs ->
   224→                state.memory_request <- Some (Set_memory msgs)
   225→              | None ->
   226→                Runtime.Log.warn "set_memory_malformed"
   227→                  [("line", `String line)])
   228→           | _ -> ());
   229→          Condition.broadcast state.memory_cond);
   230→        Runtime.Log.debug "control"
   231→          [("message", Option.get json)]
   232→      | _ ->
   233→        Runtime.Log.warn "control_parse"
   234→          [("line", `String line)]
   235→    end
   236→  done
   237→  with
   238→  | End_of_file ->
   239→    Mutex.lock state.mutex;
   240→    state.stop <- true;
   241→    Condition.broadcast state.memory_cond;
   242→    Mutex.unlock state.mutex
   243→  | Unix.Unix_error (_err, _fn, _arg) ->
   244→    Mutex.lock state.mutex;
   245→    state.stop <- true;
   246→    Condition.broadcast state.memory_cond;
   247→    Mutex.unlock state.mutex)
   248→
   249→let () =
   250→  Runtime.Entry.main
   251→    ~usage:"Usage: plumb-agent <spec.plumb>"
   252→    ~run:(fun spec_path ->
   253→      (* Tag log lines with agent name so stderr is attributable *)
   254→      (match Sys.getenv_opt "PLUMB_AGENT_NAME" with
   255→       | Some name -> Runtime.Log.set_prefix [("agent", `String name)]
   256→       | None -> ());
   257→      (* Raise log level to Info so agent_config and tool_call are visible.
   258→         Preserve Debug if PIPELINE_DEBUG=1 already set it higher. *)
   259→      if Runtime.Log.level_rank (Runtime.Log.get_level ()) >
   260→         Runtime.Log.level_rank Runtime.Log.Info then
   261→        Runtime.Log.set_level Runtime.Log.Info;
   262→      let source = Runtime.Entry.read_file spec_path in
   263→      let loaded = Runtime.Spec.load_program source in
   264→      (* Find the agent binding (there should be exactly one) *)
   265→      let agent_bindings = List.filter (fun b ->
   266→        match b.Language.impl with Language.Agent _ -> true | _ -> false
   267→      ) loaded.program.prog_bindings in
   268→      let binding = match agent_bindings with
   269→        | [b] -> b
   270→        | [] -> Runtime.Error.raise_ Config "config_error" "no agent binding in spec"
   271→        | _ ->
   272→          let names = List.map (fun b -> b.Language.bd_name) agent_bindings
   273→            |> String.concat ", " in
   274→          Runtime.Error.raise_ Config "config_error"
   275→            (Printf.sprintf "multiple agent bindings: %s; specify one" names)
   276→      in
   277→      let type_env = loaded.type_env in
   278→      let input_type = Language.Validate.unwrap_stream binding.input in
   279→      let output_type = Language.Validate.unwrap_stream binding.output in
   280→      let config = match binding.impl with
   281→        | Language.Agent pairs -> pairs
   282→        | _ -> Runtime.Error.raise_ Config "config_error" "not an agent binding"
   283→      in
   284→      (* Find tool bindings: Tool impl or @tool annotation on bare types *)
   285→      let tool_bindings = List.filter
   286→        Language.Validate.is_tool_binding
   287→        loaded.program.prog_bindings in
   288→      (* Provider: config > env > error *)
   289→      let provider = match get_config_str config "provider" with
   290→        | Some p -> p
   291→        | None ->
   292→          (match Sys.getenv_opt "PLUMB_PROVIDER" with
   293→           | Some p -> p
   294→           | None -> Runtime.Error.raise_ Config "config_error"
   295→                       "no provider: set provider in agent config or PLUMB_PROVIDER env var")
   296→      in
   297→      (* Model: config > env > error *)
   298→      let model = match get_config_str config "model" with
   299→        | Some m -> m
   300→        | None ->
   301→          (match Sys.getenv_opt "PLUMB_MODEL" with
   302→           | Some m -> m
   303→           | None -> Runtime.Error.raise_ Config "config_error"
   304→                       "no model: set model in agent config or PLUMB_MODEL env var")
   305→      in
   306→      (* System prompt *)
   307→      let prompt_parts = ref [] in
   308→      (match get_config_str config "prompt" with
   309→       | Some s -> prompt_parts := s :: !prompt_parts
   310→       | None -> ());
   311→      (match get_config_str_list config "prompts" with
   312→       | Some files ->
   313→         List.iter (fun file ->
   314→           let content =
   315→             try Runtime.Entry.read_file file
   316→             with Runtime.Error.Process_error _ ->
   317→               Runtime.Error.raise_ Config "config_error"
   318→                 (Printf.sprintf "cannot read prompt file: %s" file)
   319→           in
   320→           let tagged = Printf.sprintf "<doc id=\"%s\">\n%s\n</doc>" file content in
   321→           prompt_parts := tagged :: !prompt_parts
   322→         ) files
   323→       | None -> ());
   324→      let output_instruction =
   325→        let pp = Language.pp_type_expr type_env output_type in
   326→        Printf.sprintf
   327→          "Your output must be valid JSON matching this type:\n\n\
   328→          \  %s\n\n\
   329→          Respond with ONLY the JSON value, no surrounding text or explanation." pp
   330→      in
   331→      let system_prompt = List.rev_map (fun text ->
   332→        { Client.text; cache_break = false }
   333→      ) !prompt_parts
   334→        @ [{ Client.text = output_instruction; cache_break = true }] in
   335→      (* Amnesiac mode: fresh conversation per line (default: accumulate) *)
   336→      let amnesiac = get_config_bool config "amnesiac"
   337→        |> Option.value ~default:false in
   338→      (* Max retries *)
   339→      let max_retries = get_config_num config "max_retries"
   340→        |> Option.map int_of_float |> Option.value ~default:3 in
   341→      (* Max tokens *)
   342→      let max_tokens = get_config_num config "max_tokens"
   343→        |> Option.map int_of_float |> Option.value ~default:8192 in
   344→      (* Extended thinking *)
   345→      let thinking_budget = get_config_num config "thinking_budget"
   346→        |> Option.map int_of_float in
   347→      (* Max messages: cap on input messages before clean exit *)
   348→      let max_messages = get_config_num config "max_messages"
   349→        |> Option.map int_of_float in
   350→      (* Runtime context: inject time, token counts, pin count into system prompt *)
   351→      let runtime_context = get_config_bool config "runtime_context"
   352→        |> Option.value ~default:false in
   353→      (* API key — ELIZA needs none *)
   354→      let api_key_var = match provider with
   355→        | "anthropic" -> Some "ANTHROPIC_API_KEY"
   356→        | "openai" -> Some "OPENAI_API_KEY"
   357→        | "eliza" | "claude-code" -> None
   358→        | p -> Runtime.Error.raise_ Config "config_error"
   359→                 (Printf.sprintf "unknown provider: %s" p)
   360→      in
   361→      (* Telemetry: emit via __port envelope on stdout *)
   362→      let emit_telemetry_line = Some (mux_write "telemetry") in
   363→      (* Announce resolved provider/model before API key check,
   364→         so the user sees what was resolved even on config errors. *)
   365→      let msg = `Assoc [
   366→        ("kind", `String "config");
   367→        ("provider", `String provider);
   368→        ("model", `String model);
   369→      ] in
   370→      mux_write "telemetry" (Yojson.Safe.to_string msg);
   371→      Runtime.Log.info "agent_config"
   372→        [("provider", `String provider); ("model", `String model)];
   373→      let api_key = match api_key_var with
   374→        | Some var ->
   375→          (match Sys.getenv_opt var with
   376→           | Some k -> k
   377→           | None -> Runtime.Error.raise_ Config "config_error"
   378→                       (Printf.sprintf "%s not set" var))
   379→        | None -> ""
   380→      in
   381→      let endpoint = get_config_str config "endpoint" in
   382→      (* Tool schemas: convert tool bindings to API tool definitions.
   383→         For non-record inputs, wrap in an object for Anthropic API
   384→         (which requires input_schema to be object-typed). *)
   385→      let tool_schemas = List.map (fun (b : Language.binding) ->
   386→        let base = Language.Validate.tool_schema type_env b in
   387→        (* Anthropic API requires input_schema to be an object.
   388→           For non-record tool inputs, wrap in { input: <schema> }. *)
   389→        match Language.Validate.resolve type_env b.input with
   390→        | Language.Record _ -> base
   391→        | _ ->
   392→          let open Yojson.Safe.Util in
   393→          let name = base |> member "name" in
   394→          let description = base |> member "description" in
   395→          let inner_schema = base |> member "input_schema" in
   396→          `Assoc [
   397→            ("name", name);
   398→            ("description", description);
   399→            ("input_schema", `Assoc [
   400→              ("type", `String "object");
   401→              ("properties", `Assoc [
   402→                ("input", inner_schema)
   403→              ]);
   404→              ("required", `List [`String "input"]);
   405→              ("additionalProperties", `Bool false);
   406→            ]);
   407→          ]
   408→      ) tool_bindings in
   409→      (* MCP tool schemas: read from temp file when PLUMB_MCP_TOOLS is set.
   410→         These are already in Anthropic format with input_schema (snake_case)
   411→         and object-typed schemas — no wrapping needed. *)
   412→      let mcp_schemas = match Sys.getenv_opt "PLUMB_MCP_TOOLS" with
   413→        | Some path ->
   414→          (try
   415→            let content = In_channel.with_open_text path In_channel.input_all in
   416→            (try Yojson.Safe.from_string content
   417→              |> Yojson.Safe.Util.to_list
   418→            with _ ->
   419→              Runtime.Log.warn "mcp_schema_parse_error"
   420→                [("path", `String path)];
   421→              [])
   422→          with Sys_error msg ->
   423→            Runtime.Log.warn "mcp_schema_read_error"
   424→              [("path", `String path); ("detail", `String msg)];
   425→            [])
   426→        | None -> []
   427→      in
   428→      let tool_schemas = tool_schemas @ mcp_schemas
   429→        @ [Agent.Pinning.pin_schema; Agent.Pinning.unpin_schema] in
   430→      (* Tool dispatch: request via __port envelope, response via
   431→         internal pipe from the stdin demux thread. *)
   432→      let has_tools = tool_schemas <> [] in
   433→      let tool_resp_r, tool_resp_w =
   434→        if has_tools then
   435→          let r, w = Unix.pipe ~cloexec:true () in
   436→          (Some (Unix.in_channel_of_descr r),
   437→           Some (Unix.out_channel_of_descr w))
   438→        else (None, None) in
   439→      (* Control: input via internal pipe from stdin demux,
   440→         output via __port envelope on stdout. *)
   441→      let ctrl_r, ctrl_w = Unix.pipe ~cloexec:true () in
   442→      let ctrl_ic = Unix.in_channel_of_descr ctrl_r in
   443→      let ctrl_oc = Unix.out_channel_of_descr ctrl_w in
   444→      let control_state = make_control_state () in
   445→      ignore (Thread.create (control_reader_thread control_state) ctrl_ic);
   446→      (* Data input: internal pipe from stdin demux *)
   447→      let data_r, data_w = Unix.pipe ~cloexec:true () in
   448→      let data_ic = Unix.in_channel_of_descr data_r in
   449→      let data_oc = Unix.out_channel_of_descr data_w in
   450→      (* Start stdin demux thread *)
   451→      ignore (start_stdin_demux
   452→        ~data_w:data_oc ~ctrl_w:ctrl_oc ~tool_resp_w);
   453→      let get_model () =
   454→        Mutex.lock control_state.mutex;
   455→        let m = match control_state.model_override with
   456→          | Some s -> s | None -> model
   457→        in
   458→        Mutex.unlock control_state.mutex;
   459→        m
   460→      in
   461→      let get_temperature () =
   462→        Mutex.lock control_state.mutex;
   463→        let t = control_state.temperature in
   464→        Mutex.unlock control_state.mutex;
   465→        t
   466→      in
   467→      let is_stopped () =
   468→        Mutex.lock control_state.mutex;
   469→        let s = control_state.stop in
   470→        Mutex.unlock control_state.mutex;
   471→        s
   472→      in
   473→      (* Run with Eio *)
   474→      Mirage_crypto_rng_unix.use_default ();
   475→      Eio_main.run @@ fun env ->
   476→      Eio.Switch.run @@ fun sw ->
   477→        let net = Eio.Stdenv.net env in
   478→        let client = match provider with
   479→          | "anthropic" ->
   480→            Client.Anthropic.make ~sw ~net ~api_key
   481→              ?endpoint ()
   482→          | "openai" ->
   483→            Client.Openai.make ~sw ~net ~api_key
   484→              ?endpoint ()
   485→          | "eliza" ->
   486→            Client.Eliza.make ()
   487→          | "claude-code" ->
   488→            Client.Claude_code.make ()
   489→          | p -> Runtime.Error.raise_ Config "config_error"
   490→                   (Printf.sprintf "unknown provider: %s" p)
   491→        in
   492→        (* Write a JSON line to control out via mux envelope.
   493→           Once the control pipe breaks, skip all further writes —
   494→           the agent continues serving data. *)
   495→        let control_out_live = ref true in
   496→        let control_out json =
   497→          if !control_out_live then
   498→            (try mux_write "ctrl_out" (Yojson.Safe.to_string json)
   499→             with Sys_error _ ->
   500→               control_out_live := false;
   501→               Runtime.Log.warn "control_out_broken" [])
   502→        in
   503→        (* JSON Lines: process input stream *)
   504→        let line_num = ref 0 in
   505→        let msg_count = ref 0 in
   506→        let history = ref [] in
   507→        let pin_set = ref Agent.Pinning.PinMap.empty in
   508→        let pin_counter = ref 0 in
   509→        (* Session-wide token counters for runtime context injection *)
   510→        let session_input_tokens = ref 0 in
   511→        let session_output_tokens = ref 0 in
   512→        let api_calls = ref 0 in
   513→        let compaction_cycles = ref 0 in
   514→        (* Wrap telemetry emitter to also accumulate token counts *)
   515→        let emit_telemetry_line = match emit_telemetry_line with
   516→          | Some write_line ->
   517→            Some (fun line ->
   518→              (try
   519→                match Yojson.Safe.from_string line with
   520→                | `Assoc fields ->
   521→                  (match List.assoc_opt "kind" fields with
   522→                   | Some (`String "usage") ->
   523→                     incr api_calls;
   524→                     (match List.assoc_opt "input_tokens" fields with
   525→                      | Some (`Int n) ->
   526→                        session_input_tokens := !session_input_tokens + n
   527→                      | _ -> ());
   528→                     (match List.assoc_opt "output_tokens" fields with
   529→                      | Some (`Int n) ->
   530→                        session_output_tokens := !session_output_tokens + n
   531→                      | _ -> ())
   532→                   | _ -> ())
   533→                | _ -> ()
   534→              with _ -> ());
   535→              write_line line)
   536→          | None -> None
   537→        in
   538→        (* Emit absolute pin state to telemetry after every pin/unpin. *)
   539→        let emit_pin_telemetry action =
   540→          match emit_telemetry_line with
   541→          | Some write_line ->
   542→            let keys = Agent.Pinning.PinMap.bindings !pin_set
   543→              |> List.map (fun (k, _) ->
   544→                   `String (Agent.Pinning.PinKey.to_string k)) in
   545→            let ev = `Assoc [
   546→              ("kind", `String "pin_state");
   547→              ("action", `String action);
   548→              ("count", `Int (Agent.Pinning.PinMap.cardinal !pin_set));
   549→              ("keys", `List keys);
   550→            ] in
   551→            write_line (Yojson.Safe.to_string ev)
   552→          | None -> ()
   553→        in
   554→        (* dispatch_tool: handles pin/unpin, cache hits, and mux dispatch *)
   555→        let dispatch_tool =
   556→          let emit_req = mux_write "tool_req" in
   557→          let tool_ic = tool_resp_r in
   558→          fun id name input ->
   559→            let open Agent.Pinning in
   560→            match name with
   561→            | "pin" ->
   562→              let open Yojson.Safe.Util in
   563→              let tool = input |> member "tool" |> to_string in
   564→              let args = input |> member "arguments" in
   565→              let key = canonical_key tool args in
   566→              if PinMap.mem key !pin_set then begin
   567→                emit_pin_telemetry "pin";
   568→                ({|{"ok":true,"already_pinned":true}|}, false)
   569→              end else begin
   570→                (* Evict oldest if at capacity *)
   571→                if PinMap.cardinal !pin_set >= max_pins then begin
   572→                  let oldest_key = PinMap.bindings !pin_set
   573→                    |> List.sort (fun (_, a) (_, b) ->
   574→                         compare a.pinned_at b.pinned_at)
   575→                    |> List.hd |> fst in
   576→                  pin_set := PinMap.remove oldest_key !pin_set
   577→                end;
   578→                let content = find_tool_result !history tool args
   579→                  |> Option.value ~default:"" in
   580→                incr pin_counter;
   581→                pin_set := PinMap.add key
   582→                  { tool_name = tool; arguments = args;
   583→                    content; pinned_at = !pin_counter }
   584→                  !pin_set;
   585→                emit_pin_telemetry "pin";
   586→                (Printf.sprintf {|{"ok":true,"pin_count":%d}|}
   587→                  (PinMap.cardinal !pin_set), false)
   588→              end
   589→            | "unpin" ->
   590→              let open Yojson.Safe.Util in
   591→              let tool = input |> member "tool" |> to_string in
   592→              let args = input |> member "arguments" in
   593→              let key = canonical_key tool args in
   594→              pin_set := PinMap.remove key !pin_set;
   595→              emit_pin_telemetry "unpin";
   596→              (Printf.sprintf {|{"ok":true,"pin_count":%d}|}
   597→                (PinMap.cardinal !pin_set), false)
   598→            | _ ->
   599→              (* Check pin cache *)
   600→              let key = canonical_key name input in
   601→              (match PinMap.find_opt key !pin_set with
   602→               | Some entry when entry.content <> "" ->
   603→                 Runtime.Log.info "pin_cache_hit" [("tool", `String name)];
   604→                 let key_str = PinKey.to_string key in
   605→                 (Printf.sprintf
   606→                   {|{"pinned":true,"note":"This tool result is already pinned in your context. Look for <pinned key=\"%s\"> in the pinned-documents block at the top of your conversation."}|}
   607→                   (String.escaped key_str), false)
   608→               | _ ->
   609→                 (* Normal dispatch via mux *)
   610→                 (match tool_ic with
   611→                  | Some ic ->
   612→                    let req = `Assoc [
   613→                      ("id", `String id); ("name", `String name);
   614→                      ("input", input)] in
   615→                    emit_req (Yojson.Safe.to_string req);
   616→                    let resp_line = input_line ic in
   617→                    let resp_json = Yojson.Safe.from_string resp_line in
   618→                    let open Yojson.Safe.Util in
   619→                    let content = resp_json |> member "content" |> to_string in
   620→                    let is_error = resp_json |> member "is_error" |> to_bool in
   621→                    (content, is_error)
   622→                  | None ->
   623→                    ("{\"error\":\"no tools available\"}", true)))
   624→        in
   625→        let get_runtime_context =
   626→          if runtime_context then
   627→            Some (fun () ->
   628→              let now = Unix.gettimeofday () in
   629→              let tm = Unix.gmtime now in
   630→              let time_str = Printf.sprintf "%04d-%02d-%02dT%02d:%02d:%02dZ"
   631→                (1900 + tm.Unix.tm_year) (1 + tm.Unix.tm_mon) tm.Unix.tm_mday
   632→                tm.Unix.tm_hour tm.Unix.tm_min tm.Unix.tm_sec in
   633→              let pin_count = Agent.Pinning.PinMap.cardinal !pin_set in
   634→              let history_len = List.length !history in
   635→              Printf.sprintf
   636→                "[Runtime context]\n\
   637→                 Time: %s\n\
   638→                 Session input tokens: %d\n\
   639→                 Session output tokens: %d\n\
   640→                 API calls: %d\n\
   641→                 Max output tokens per turn: %d\n\
   642→                 History messages: %d\n\
   643→                 Pinned documents: %d\n\
   644→                 Compaction cycles: %d"
   645→                time_str !session_input_tokens !session_output_tokens
   646→                !api_calls max_tokens history_len pin_count
   647→                !compaction_cycles)
   648→          else None
   649→        in
   650→        let loop_config = {
   651→          Loop.client;
   652→          system_prompt;
   653→          max_retries;
   654→          max_tokens;
   655→          thinking_budget;
   656→          emit_telemetry_line;
   657→          type_env;
   658→          input_type;
   659→          output_type;
   660→          tool_schemas;
   661→          dispatch_tool = if has_tools then Some dispatch_tool else None;
   662→          get_model;
   663→          get_temperature;
   664→          is_stopped;
   665→          get_runtime_context;
   666→        } in
   667→        let handle_memory_request () =
   668→          (* Called INSIDE mutex lock. Releases lock for I/O,
   669→             re-acquires before returning. *)
   670→          match control_state.memory_request with
   671→          | Some Get_memory ->
   672→            let msgs = List.map (fun (m : Client.message) ->
   673→              `Assoc [("role", `String m.role); ("content", m.content)]
   674→            ) !history in
   675→            let pinned = Agent.Pinning.serialise_pins !pin_set in
   676→            let dump = `Assoc [
   677→              ("kind", `String "memory");
   678→              ("messages", `List msgs);
   679→              ("pinned", `List pinned);
   680→            ] in
   681→            control_state.memory_request <- None;
   682→            Mutex.unlock control_state.mutex;
   683→            control_out dump;
   684→            Mutex.lock control_state.mutex
   685→          | Some (Set_memory msgs) ->
   686→            let old_len = List.length !history in
   687→            let pin_msg = Agent.Pinning.reinject_pins_as_message !pin_set in
   688→            let with_pins = match pin_msg with
   689→              | Some m -> m :: msgs
   690→              | None -> msgs
   691→            in
   692→            history := with_pins;
   693→            incr compaction_cycles;
   694→            control_state.memory_request <- None;
   695→            Runtime.Log.info "memory_set"
   696→              [("old_messages", `Int old_len);
   697→               ("new_messages", `Int (List.length with_pins))];
   698→            Mutex.unlock control_state.mutex;
   699→            control_out (`Assoc [
   700→              ("kind", `String "memory_set");
   701→              ("old_messages", `Int old_len);
   702→              ("new_messages", `Int (List.length with_pins));
   703→            ]);
   704→            Mutex.lock control_state.mutex
   705→          | None -> ()
   706→        in
   707→        try while true do
   708→          (* Pause loop: wait while paused, handle memory requests *)
   709→          Mutex.lock control_state.mutex;
   710→          while control_state.paused && not control_state.stop do
   711→            (* Emit pause_ack on first entry into paused state *)
   712→            if not control_state.pause_acked then begin
   713→              control_state.pause_acked <- true;
   714→              Mutex.unlock control_state.mutex;
   715→              control_out (`Assoc [("kind", `String "pause_ack")]);
   716→              Mutex.lock control_state.mutex
   717→            end;
   718→            handle_memory_request ();
   719→            if control_state.paused && not control_state.stop then
   720→              Condition.wait control_state.memory_cond control_state.mutex
   721→          done;
   722→          (* Emit resume_ack when leaving paused state *)
   723→          if control_state.pause_acked then begin
   724→            control_state.pause_acked <- false;
   725→            Mutex.unlock control_state.mutex;
   726→            control_out (`Assoc [("kind", `String "resume_ack");
   727→                                    ("resumed", `Bool true)]);
   728→            Mutex.lock control_state.mutex
   729→          end;
   730→          (* Handle any pending request when not paused *)
   731→          handle_memory_request ();
   732→          Mutex.unlock control_state.mutex;
   733→          (* Do NOT check is_stopped() here.  Data and ctrl share a muxed
   734→             stdin; when the executor closes the ctrl write-fd after data
   735→             EOF, the ctrl EOF and data EOF arrive in quick succession.
   736→             The ctrl reader may set stop=true before the main loop reads
   737→             the last data line, discarding valid input.  The data loop
   738→             terminates naturally on End_of_file from data_ic.  Stop is
   739→             checked in the post-data-EOF wait. *)
   740→          let line = input_line data_ic in
   741→          incr line_num;
   742→          if String.length (String.trim line) = 0 then ()
   743→          else begin try
   744→            (match max_messages with
   745→             | Some limit when !msg_count >= limit -> raise Exit
   746→             | _ -> ());
   747→            let json =
   748→              try Yojson.Safe.from_string line
   749→              with Yojson.Json_error msg ->
   750→                Runtime.Error.raise_
   751→                  ~extras:[("line", `Int !line_num)]
   752→                  Invalid "parse_error" msg
   753→            in
   754→            match Language.Validate.check type_env input_type json with
   755→            | Ok cleaned ->
   756→              let result, new_history =
   757→                Loop.run loop_config ~history:!history cleaned
   758→              in
   759→              if not amnesiac then history := new_history;
   760→              incr msg_count;
   761→              mux_write "output" (Yojson.Safe.to_string result)
   762→            | Error errs ->
   763→              Runtime.Error.raise_
   764→                ~extras:[("line", `Int !line_num)]
   765→                Invalid "validation_error"
   766→                (Language.Validate.error_string errs)
   767→          with Runtime.Error.Process_error err ->
   768→            (* Log the error and emit it on the output channel so
   769→               downstream sees the dropped message.  Do not crash —
   770→               the agent stays alive for subsequent valid input. *)
   771→            let error_json = Runtime.Error.to_json err in
   772→            (try mux_write "output" (Yojson.Safe.to_string error_json)
   773→             with Sys_error _ -> ());
   774→            Runtime.Log.error "input_error"
   775→              [("error", `String err.code);
   776→               ("detail", `String err.detail)];
   777→            (* Emit on telemetry for TUI visibility *)
   778→            let telem_json = `Assoc [
   779→              ("kind", `String "error");
   780→              ("error", `String err.code);
   781→              ("detail", `String err.detail);
   782→            ] in
   783→            (try mux_write "telemetry"
   784→                   (Yojson.Safe.to_string telem_json)
   785→             with Sys_error _ -> ())
   786→          end
   787→        done
   788→        with
   789→        | End_of_file ->
   790→          (* Data input exhausted — remain available for protocol
   791→             commands (pause/resume/memory) until the control pipe
   792→             closes.  In muxed mode, data and control share stdin;
   793→             the demux thread closes the data pipe on data EOF but
   794→             the ctrl pipe stays open while stdin carries control
   795→             messages.  See PLMB-033. *)
   796→          Mutex.lock control_state.mutex;
   797→          (try while not control_state.stop do
   798→            while control_state.paused && not control_state.stop do
   799→              if not control_state.pause_acked then begin
   800→                control_state.pause_acked <- true;
   801→                Mutex.unlock control_state.mutex;
   802→                control_out (`Assoc [("kind", `String "pause_ack")]);
   803→                Mutex.lock control_state.mutex
   804→              end;
   805→              handle_memory_request ();
   806→              if control_state.paused && not control_state.stop then
   807→                Condition.wait control_state.memory_cond control_state.mutex
   808→            done;
   809→            if control_state.pause_acked then begin
   810→              control_state.pause_acked <- false;
   811→              Mutex.unlock control_state.mutex;
   812→              control_out (`Assoc [("kind", `String "resume_ack");
   813→                                    ("resumed", `Bool true)]);
   814→              Mutex.lock control_state.mutex
   815→            end;
   816→            handle_memory_request ();
   817→            if not control_state.stop then
   818→              Condition.wait control_state.memory_cond control_state.mutex
   819→          done
   820→          with Exit -> ());
   821→          Mutex.unlock control_state.mutex
   822→        | Exit -> ())
   823→
Read
{
  "file_path": "/Users/ww/src/plumbing/bin/agent/dune"
}
     1→(executable
     2→ (name main)
     3→ (public_name plumb-agent)
     4→ (libraries language runtime client agent yojson eio_main
     5→            mirage-crypto-rng.unix))
     6→
mcp__claude_ai_LDC__read
{
  "document_id": "agents/engineering/users/wwaites@gmail.com"
}
{"success":false,"error":"document not found"}
Thinking

Let me read more files I'll need.

Read
{
  "file_path": "/Users/ww/src/plumbing/test/test_fabric.ml"
}
     1→(* End-to-end tests for the fabric with inline morphisms and
     2→   agent subprocesses via the bridge.
     3→   Uses the transport layer, routing table, fabric, and bridge
     4→   to run complete pipelines. *)
     5→
     6→module T = Transport.Zmq
     7→
     8→let pipeline_counter = ref 0
     9→let fresh_id () =
    10→  incr pipeline_counter;
    11→  Printf.sprintf "test-%d-%d" (Unix.getpid ()) !pipeline_counter
    12→
    13→let parse_and_compile src =
    14→  let open Language in
    15→  let prog = Language.parse src in
    16→  let main = List.find (fun (b : binding) -> b.bd_name = "main") prog.prog_bindings in
    17→  let ports, pb = match main.impl with
    18→    | Plumb (ports, pb) -> (ports, pb)
    19→    | _ -> failwith "main must be plumb"
    20→  in
    21→  let type_env =
    22→    try Validate.build_type_env prog.prog_types
    23→    with Validate.Schema_error msg -> failwith msg
    24→  in
    25→  let routing_table = Runtime.Routing.compile ~prog ~ports ~pb
    26→    ~type_env ~main_binding:main ~seeded:pb.pb_seeded in
    27→  (prog, routing_table, type_env)
    28→
    29→(** Run a fabric pipeline (inline only), execute [f] with the boundary sockets. *)
    30→let with_fabric src f =
    31→  let _prog, routing_table, type_env = parse_and_compile src in
    32→  let pipeline_id = fresh_id () in
    33→  Eio_main.run (fun _env ->
    34→    Eio.Switch.run (fun sw ->
    35→      let fabric = Fabric.execute ~sw ~routing_table ~type_env
    36→        ~pipeline_id () in
    37→      Unix.sleepf 0.1;
    38→      let push_in = Fabric.boundary_input fabric "input" in
    39→      let pull_out = Fabric.boundary_output fabric "output" in
    40→      f push_in pull_out;
    41→      fabric.cancel ()))
    42→
    43→(** Find the agent binary for tests. *)
    44→let find_agent_binary _name =
    45→  let candidates = [
    46→    "../bin/agent/main.exe";
    47→    "_build/default/bin/agent/main.exe";
    48→  ] in
    49→  match List.find_opt Sys.file_exists candidates with
    50→  | Some p ->
    51→    if Filename.is_relative p then Filename.concat (Sys.getcwd ()) p
    52→    else p
    53→  | None -> failwith "cannot find agent binary (plumb-agent)"
    54→
    55→(** Run a fabric pipeline with agent bridge, execute [f] with boundary sockets. *)
    56→let with_fabric_agent src f =
    57→  let prog, routing_table, type_env = parse_and_compile src in
    58→  let pipeline_id = fresh_id () in
    59→  Eio_main.run (fun _env ->
    60→    Eio.Switch.run (fun sw ->
    61→      let handler = Fabric.Bridge.make_handler
    62→        ~prog ~type_env
    63→        ~bin_dir:None
    64→        ~resource_find:(fun name ->
    65→          failwith ("resource_find not available in test: " ^ name))
    66→        ~resolve_bin:find_agent_binary
    67→        () in
    68→      let fabric = Fabric.execute ~sw ~routing_table ~type_env
    69→        ~pipeline_id ~process_handler:handler () in
    70→      Unix.sleepf 0.2;
    71→      let push_in = Fabric.boundary_input fabric "input" in
    72→      let pull_out = Fabric.boundary_output fabric "output" in
    73→      f push_in pull_out;
    74→      fabric.cancel ()))
    75→
    76→(* --- Identity pipeline --- *)
    77→
    78→let test_identity () =
    79→  with_fabric {|
    80→    type M = { text: string }
    81→    let main : !M -> !M = plumb(input, output) {
    82→      spawn id(input, output)
    83→    }
    84→  |} (fun push_in pull_out ->
    85→    Eio.Fiber.both
    86→      (fun () ->
    87→        T.send push_in ~topic:"input" {|{"text":"hello"}|})
    88→      (fun () ->
    89→        let (_topic, payload) = T.recv pull_out in
    90→        let json = Yojson.Safe.from_string payload in
    91→        match json with
    92→        | `Assoc fields ->
    93→          let text = match List.assoc_opt "text" fields with
    94→            | Some (`String s) -> s | _ -> "MISSING" in
    95→          Alcotest.(check string) "text preserved" "hello" text
    96→        | _ -> Alcotest.fail "expected JSON object"))
    97→
    98→(* --- Copy + merge pipeline (each message appears twice) --- *)
    99→
   100→let test_copy_merge () =
   101→  with_fabric {|
   102→    type M = { text: string }
   103→    let main : !M -> !M = plumb(input, output) {
   104→      let ch_a : !M = channel
   105→      let ch_b : !M = channel
   106→      spawn copy(input, ch_a, ch_b)
   107→      spawn merge(ch_a, ch_b, output)
   108→    }
   109→  |} (fun push_in pull_out ->
   110→    let received = ref 0 in
   111→    Eio.Fiber.both
   112→      (fun () ->
   113→        T.send push_in ~topic:"input" {|{"text":"hello"}|})
   114→      (fun () ->
   115→        for _ = 1 to 2 do
   116→          let (_topic, payload) = T.recv pull_out in
   117→          let json = Yojson.Safe.from_string payload in
   118→          (match json with
   119→           | `Assoc fields ->
   120→             (match List.assoc_opt "text" fields with
   121→              | Some (`String "hello") -> incr received
   122→              | _ -> Alcotest.fail "wrong text")
   123→           | _ -> Alcotest.fail "expected JSON object")
   124→        done);
   125→    Alcotest.(check int) "2 copies received" 2 !received)
   126→
   127→(* --- Agent subprocess via bridge --- *)
   128→
   129→(* Agent identity: send a message through an ELIZA agent, expect a response. *)
   130→let test_agent_identity () =
   131→  with_fabric_agent {|
   132→    let main : !string -> !string = plumb(input, output) {
   133→      spawn echo(input, output)
   134→    }
   135→    let echo : !string -> !string = agent {
   136→      provider: "eliza",
   137→      model: "doctor"
   138→    }
   139→  |} (fun push_in pull_out ->
   140→    Eio.Fiber.both
   141→      (fun () ->
   142→        T.send push_in ~topic:"input" {|"hello"|};
   143→        Unix.sleepf 2.0;
   144→        Fabric.Bridge.send_eof push_in ~topic:"input")
   145→      (fun () ->
   146→        let (_topic, payload) = T.recv pull_out in
   147→        let response = Yojson.Safe.from_string payload in
   148→        match response with
   149→        | `String s ->
   150→          Alcotest.(check bool) "non-empty response" true (String.length s > 0)
   151→        | _ -> Alcotest.fail "expected string response"))
   152→
   153→(** Agent EOF: data + EOF → agent processes, output + EOF propagates. *)
   154→let test_agent_eof () =
   155→  with_fabric_agent {|
   156→    let main : !string -> !string = plumb(input, output) {
   157→      spawn echo(input, output)
   158→    }
   159→    let echo : !string -> !string = agent {
   160→      provider: "eliza",
   161→      model: "doctor"
   162→    }
   163→  |} (fun push_in pull_out ->
   164→    Eio.Fiber.both
   165→      (fun () ->
   166→        T.send push_in ~topic:"input" {|"hello"|};
   167→        Unix.sleepf 1.0;
   168→        Fabric.Bridge.send_eof push_in ~topic:"input")
   169→      (fun () ->
   170→        (* First recv: the agent's response *)
   171→        let (_topic, payload) = T.recv pull_out in
   172→        Alcotest.(check bool) "got response" true (String.length payload > 0);
   173→        (* Second recv: should be the EOF sentinel *)
   174→        let (_topic2, payload2) = T.recv pull_out in
   175→        Alcotest.(check bool) "got EOF" true (Fabric.Bridge.is_eof payload2)))
   176→
   177→let () =
   178→  Alcotest.run "fabric" [
   179→    "inline", [
   180→      Alcotest.test_case "identity" `Quick test_identity;
   181→      Alcotest.test_case "copy + merge" `Quick test_copy_merge;
   182→    ];
   183→    "agent", [
   184→      Alcotest.test_case "agent identity" `Slow test_agent_identity;
   185→      Alcotest.test_case "agent EOF" `Slow test_agent_eof;
   186→    ];
   187→  ]
   188→
Read
{
  "file_path": "/Users/ww/src/plumbing/test/test_agent_control.ml"
}
     1→(** Integration tests for agent control protocol over muxed stdin/stdout.
     2→
     3→    Tests pause/resume, get_memory, set_memory, and their interaction.
     4→    All communication uses port-multiplexed JSON envelopes on
     5→    stdin/stdout: [{"__port": "ctrl_in", "msg": {...}}] for commands,
     6→    [{"__port": "ctrl_out", "msg": {...}}] for responses.
     7→
     8→    Data EOF is signalled via [{"__port": "input", "__eof": true}],
     9→    which closes the data input pipe while keeping control active. *)
    10→
    11→(** Resolve agent binary path — works from both dune test
    12→    (cwd = _build/default/test) and dune exec (cwd = project root) *)
    13→let agent_bin =
    14→  let candidates = [
    15→    "../bin/agent/main.exe";
    16→    "_build/default/bin/agent/main.exe";
    17→  ] in
    18→  match List.find_opt Sys.file_exists candidates with
    19→  | Some p ->
    20→    if Filename.is_relative p then Filename.concat (Sys.getcwd ()) p
    21→    else p
    22→  | None -> failwith "cannot find agent binary"
    23→
    24→let fixture path =
    25→  let candidates = [
    26→    "fixtures/" ^ path;
    27→    "test/fixtures/" ^ path;
    28→  ] in
    29→  match List.find_opt Sys.file_exists candidates with
    30→  | Some p ->
    31→    if Filename.is_relative p then Filename.concat (Sys.getcwd ()) p
    32→    else p
    33→  | None -> failwith ("cannot find fixture: " ^ path)
    34→
    35→(** Spawn the agent binary.  Returns (stdin_w, stdout_r, stderr_r, pid).
    36→    All control, data, and tool communication goes through stdin/stdout
    37→    using the muxed envelope protocol. *)
    38→let spawn_agent ?(spec = "agent_explicit_provider.plumb") env =
    39→  Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
    40→  let spec_path = fixture spec in
    41→  let stdin_r, stdin_w = Unix.pipe ~cloexec:true () in
    42→  let stdout_r, stdout_w = Unix.pipe ~cloexec:true () in
    43→  let stderr_r, stderr_w = Unix.pipe ~cloexec:true () in
    44→  let base_env = List.filter_map (fun k ->
    45→    match Sys.getenv_opt k with
    46→    | Some v -> Some (k ^ "=" ^ v)
    47→    | None -> None
    48→  ) ["PATH"; "HOME"; "LANG"; "TERM"; "TMPDIR"] in
    49→  let full_env = Array.of_list (base_env @ Array.to_list env) in
    50→  let pid = Unix.fork () in
    51→  if pid = 0 then begin
    52→    Unix.dup2 stdin_r Unix.stdin;
    53→    Unix.dup2 stdout_w Unix.stdout;
    54→    Unix.dup2 stderr_w Unix.stderr;
    55→    (* Close all pipe fds except 0, 1, 2 *)
    56→    List.iter (fun fd ->
    57→      if fd <> Unix.stdin && fd <> Unix.stdout && fd <> Unix.stderr then
    58→        (try Unix.close fd with Unix.Unix_error _ -> ())
    59→    ) [stdin_r; stdin_w; stdout_r; stdout_w; stderr_r; stderr_w];
    60→    (try Unix.execve agent_bin [| agent_bin; spec_path |] full_env
    61→     with _ -> Unix._exit 2)
    62→  end else begin
    63→    Unix.close stdin_r; Unix.close stdout_w; Unix.close stderr_w;
    64→    (stdin_w, stdout_r, stderr_r, pid)
    65→  end
    66→
    67→(** Read a line from a fd using Unix.select with timeout.
    68→    Returns None on timeout or EOF. *)
    69→let read_line_timeout ?(timeout=2.0) fd =
    70→  let buf = Buffer.create 256 in
    71→  let byte = Bytes.create 1 in
    72→  let deadline = Unix.gettimeofday () +. timeout in
    73→  let rec loop () =
    74→    let remaining = deadline -. Unix.gettimeofday () in
    75→    if remaining <= 0.0 then None
    76→    else begin
    77→      let readable, _, _ = Unix.select [fd] [] [] remaining in
    78→      if readable = [] then None
    79→      else begin
    80→        let n = try Unix.read fd byte 0 1 with Unix.Unix_error _ -> 0 in
    81→        if n = 0 then
    82→          if Buffer.length buf > 0 then Some (Buffer.contents buf)
    83→          else None
    84→        else begin
    85→          let c = Bytes.get byte 0 in
    86→          if c = '\n' then Some (Buffer.contents buf)
    87→          else begin Buffer.add_char buf c; loop () end
    88→        end
    89→      end
    90→    end
    91→  in
    92→  loop ()
    93→
    94→(** Write a string to a fd, ignoring EPIPE *)
    95→let write_fd fd s =
    96→  let b = Bytes.of_string s in
    97→  let len = Bytes.length b in
    98→  let rec loop off =
    99→    if off >= len then ()
   100→    else begin
   101→      let n = try Unix.write fd b off (len - off)
   102→        with Unix.Unix_error (Unix.EPIPE, _, _) -> len - off in
   103→      loop (off + n)
   104→    end
   105→  in
   106→  loop 0
   107→
   108→(** Send a control message wrapped in a mux envelope *)
   109→let send_control stdin_w msg =
   110→  let envelope = Printf.sprintf "{\"__port\":\"ctrl_in\",\"msg\":%s}\n" msg in
   111→  write_fd stdin_w envelope
   112→
   113→(** Send a null data message (triggers pause loop entry without data).
   114→    The demux thread writes a blank line to the data pipe, which the
   115→    conversation loop skips. *)
   116→let send_blank_data stdin_w =
   117→  write_fd stdin_w "{\"__port\":\"input\",\"msg\":null}\n"
   118→
   119→(** Signal data input EOF (closes data pipe, keeps control alive) *)
   120→let send_data_eof stdin_w =
   121→  write_fd stdin_w "{\"__port\":\"input\",\"__eof\":true}\n"
   122→
   123→(** Read a muxed envelope from stdout, filtering by port.
   124→    Skips envelopes for other ports (e.g. telemetry).
   125→    Returns the unwrapped msg JSON string, or None on timeout. *)
   126→let read_port ?(timeout=3.0) stdout_r port =
   127→  let deadline = Unix.gettimeofday () +. timeout in
   128→  let rec loop () =
   129→    let remaining = deadline -. Unix.gettimeofday () in
   130→    if remaining <= 0.0 then None
   131→    else match read_line_timeout ~timeout:remaining stdout_r with
   132→    | None -> None
   133→    | Some line ->
   134→      (match Yojson.Safe.from_string line with
   135→       | `Assoc fields ->
   136→         let p = match List.assoc_opt "__port" fields with
   137→           | Some (`String s) -> s | _ -> "" in
   138→         if p = port then
   139→           match List.assoc_opt "msg" fields with
   140→           | Some m -> Some (Yojson.Safe.to_string m)
   141→           | None -> loop ()
   142→         else
   143→           loop ()  (* skip other ports *)
   144→       | _ -> loop ()
   145→       | exception _ -> loop ())
   146→  in
   147→  loop ()
   148→
   149→(** Read a control response (ctrl_out port) *)
   150→let read_ctrl_out ?(timeout=3.0) stdout_r =
   151→  read_port ~timeout stdout_r "ctrl_out"
   152→
   153→(** Read all available stderr *)
   154→let drain_stderr stderr_r =
   155→  let buf = Buffer.create 1024 in
   156→  let chunk = Bytes.create 4096 in
   157→  let rec loop () =
   158→    let readable, _, _ = Unix.select [stderr_r] [] [] 0.1 in
   159→    if readable <> [] then begin
   160→      let n = try Unix.read stderr_r chunk 0 4096 with Unix.Unix_error _ -> 0 in
   161→      if n > 0 then begin
   162→        Buffer.add_subbytes buf chunk 0 n; loop ()
   163→      end
   164→    end
   165→  in
   166→  loop (); Buffer.contents buf
   167→
   168→(** Check that a string contains a substring *)
   169→let contains haystack needle =
   170→  let re = Str.regexp_string needle in
   171→  try ignore (Str.search_forward re haystack 0); true
   172→  with Not_found -> false
   173→
   174→(** Wait for child with timeout. Sends SIGKILL on timeout. *)
   175→let wait_child ?(timeout=5.0) pid =
   176→  let deadline = Unix.gettimeofday () +. timeout in
   177→  let rec poll () =
   178→    let remaining = deadline -. Unix.gettimeofday () in
   179→    if remaining <= 0.0 then begin
   180→      (try Unix.kill pid Sys.sigkill with Unix.Unix_error _ -> ());
   181→      let _, status = Unix.waitpid [] pid in
   182→      match status with
   183→      | Unix.WEXITED c -> c
   184→      | Unix.WSIGNALED s -> 128 + s
   185→      | Unix.WSTOPPED _ -> 128
   186→    end else begin
   187→      match Unix.waitpid [Unix.WNOHANG] pid with
   188→      | 0, _ ->
   189→        Unix.sleepf 0.05;
   190→        poll ()
   191→      | _, status ->
   192→        (match status with
   193→         | Unix.WEXITED c -> c
   194→         | Unix.WSIGNALED s -> 128 + s
   195→         | Unix.WSTOPPED _ -> 128)
   196→    end
   197→  in
   198→  poll ()
   199→
   200→(** Clean up fds from a spawn *)
   201→let cleanup stdin_w stdout_r stderr_r =
   202→  (try Unix.close stdin_w with _ -> ());
   203→  (try Unix.close stdout_r with _ -> ());
   204→  (try Unix.close stderr_r with _ -> ())
   205→
   206→(** {1 Stop tests} *)
   207→
   208→let test_stop_via_control () =
   209→  let stdin_w, stdout_r, stderr_r, pid =
   210→    spawn_agent
   211→      ~spec:"agent_explicit_provider.plumb"
   212→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   213→  in
   214→  Unix.sleepf 0.3;
   215→  send_control stdin_w {|{"stop": true}|};
   216→  Unix.sleepf 0.1;
   217→  Unix.close stdin_w;
   218→  let code = wait_child pid in
   219→  let stderr = drain_stderr stderr_r in
   220→  cleanup stdin_w stdout_r stderr_r;
   221→  if code <> 0 then
   222→    Alcotest.failf "agent exit %d; stderr: %s" code stderr
   223→
   224→let test_stdin_eof_sets_stop () =
   225→  let stdin_w, stdout_r, stderr_r, pid =
   226→    spawn_agent
   227→      ~spec:"agent_explicit_provider.plumb"
   228→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   229→  in
   230→  Unix.sleepf 0.3;
   231→  (* Close stdin — triggers EOF on all mux channels, agent exits *)
   232→  Unix.close stdin_w;
   233→  let code = wait_child pid in
   234→  cleanup stdin_w stdout_r stderr_r;
   235→  Alcotest.(check int) "agent exits cleanly" 0 code
   236→
   237→(** {1 Memory tests} *)
   238→
   239→let test_get_memory_empty () =
   240→  let stdin_w, stdout_r, stderr_r, pid =
   241→    spawn_agent
   242→      ~spec:"agent_explicit_provider.plumb"
   243→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   244→  in
   245→  Unix.sleepf 0.3;
   246→  (* Pause, send blank data to unblock input_line, enter pause loop *)
   247→  send_control stdin_w {|{"pause": true}|};
   248→  Unix.sleepf 0.1;
   249→  send_blank_data stdin_w;
   250→  Unix.sleepf 0.2;
   251→  (* Consume pause_ack *)
   252→  let pause_ack = read_ctrl_out ~timeout:3.0 stdout_r in
   253→  (match pause_ack with
   254→   | None -> Alcotest.fail "no pause_ack on ctrl_out"
   255→   | Some line ->
   256→     let json = Yojson.Safe.from_string line in
   257→     let open Yojson.Safe.Util in
   258→     Alcotest.(check string) "pause_ack kind"
   259→       "pause_ack" (json |> member "kind" |> to_string));
   260→  send_control stdin_w {|{"get_memory": true}|};
   261→  let response = read_ctrl_out ~timeout:3.0 stdout_r in
   262→  send_control stdin_w {|{"stop": true}|};
   263→  Unix.close stdin_w;
   264→  let _ = wait_child pid in
   265→  cleanup stdin_w stdout_r stderr_r;
   266→  match response with
   267→  | None -> Alcotest.fail "no response for get_memory"
   268→  | Some line ->
   269→    let json = Yojson.Safe.from_string line in
   270→    let open Yojson.Safe.Util in
   271→    Alcotest.(check string) "kind is memory"
   272→      "memory" (json |> member "kind" |> to_string);
   273→    Alcotest.(check int) "empty history"
   274→      0 (json |> member "messages" |> to_list |> List.length)
   275→
   276→let test_set_then_get_memory () =
   277→  let stdin_w, stdout_r, stderr_r, pid =
   278→    spawn_agent
   279→      ~spec:"agent_explicit_provider.plumb"
   280→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   281→  in
   282→  Unix.sleepf 0.3;
   283→  send_control stdin_w {|{"pause": true}|};
   284→  Unix.sleepf 0.1;
   285→  send_blank_data stdin_w;
   286→  Unix.sleepf 0.2;
   287→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* pause_ack *)
   288→  send_control stdin_w
   289→    {|{"set_memory": [{"role":"user","content":"\"hello\""},{"role":"assistant","content":"\"world\""}]}|};
   290→  let ack = read_ctrl_out ~timeout:3.0 stdout_r in
   291→  (match ack with
   292→   | None -> Alcotest.fail "no ack for set_memory"
   293→   | Some line ->
   294→     let json = Yojson.Safe.from_string line in
   295→     let open Yojson.Safe.Util in
   296→     Alcotest.(check string) "kind is memory_set"
   297→       "memory_set" (json |> member "kind" |> to_string);
   298→     Alcotest.(check int) "old_messages is 0"
   299→       0 (json |> member "old_messages" |> to_int);
   300→     Alcotest.(check int) "new_messages is 2"
   301→       2 (json |> member "new_messages" |> to_int));
   302→  send_control stdin_w {|{"get_memory": true}|};
   303→  let response = read_ctrl_out ~timeout:3.0 stdout_r in
   304→  send_control stdin_w {|{"stop": true}|};
   305→  Unix.close stdin_w;
   306→  let _ = wait_child pid in
   307→  cleanup stdin_w stdout_r stderr_r;
   308→  match response with
   309→  | None -> Alcotest.fail "no response for get_memory"
   310→  | Some line ->
   311→    let json = Yojson.Safe.from_string line in
   312→    let open Yojson.Safe.Util in
   313→    Alcotest.(check string) "kind is memory"
   314→      "memory" (json |> member "kind" |> to_string);
   315→    let msgs = json |> member "messages" |> to_list in
   316→    Alcotest.(check int) "2 messages" 2 (List.length msgs);
   317→    Alcotest.(check string) "first role"
   318→      "user" (List.nth msgs 0 |> member "role" |> to_string);
   319→    Alcotest.(check string) "second role"
   320→      "assistant" (List.nth msgs 1 |> member "role" |> to_string)
   321→
   322→let test_set_memory_empty () =
   323→  let stdin_w, stdout_r, stderr_r, pid =
   324→    spawn_agent
   325→      ~spec:"agent_explicit_provider.plumb"
   326→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   327→  in
   328→  Unix.sleepf 0.3;
   329→  send_control stdin_w {|{"pause": true}|};
   330→  Unix.sleepf 0.1;
   331→  send_blank_data stdin_w;
   332→  Unix.sleepf 0.2;
   333→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* pause_ack *)
   334→  send_control stdin_w
   335→    {|{"set_memory": [{"role":"user","content":"\"hello\""}]}|};
   336→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* memory_set ack *)
   337→  send_control stdin_w {|{"set_memory": []}|};
   338→  let ack = read_ctrl_out ~timeout:3.0 stdout_r in
   339→  (match ack with
   340→   | None -> Alcotest.fail "no ack for empty set_memory"
   341→   | Some line ->
   342→     let json = Yojson.Safe.from_string line in
   343→     let open Yojson.Safe.Util in
   344→     Alcotest.(check int) "old_messages is 1"
   345→       1 (json |> member "old_messages" |> to_int);
   346→     Alcotest.(check int) "new_messages is 0"
   347→       0 (json |> member "new_messages" |> to_int));
   348→  send_control stdin_w {|{"get_memory": true}|};
   349→  let response = read_ctrl_out ~timeout:3.0 stdout_r in
   350→  send_control stdin_w {|{"stop": true}|};
   351→  Unix.close stdin_w;
   352→  let _ = wait_child pid in
   353→  cleanup stdin_w stdout_r stderr_r;
   354→  match response with
   355→  | None -> Alcotest.fail "no get_memory response"
   356→  | Some line ->
   357→    let json = Yojson.Safe.from_string line in
   358→    let open Yojson.Safe.Util in
   359→    Alcotest.(check int) "history cleared"
   360→      0 (json |> member "messages" |> to_list |> List.length)
   361→
   362→let test_set_memory_malformed_rejected () =
   363→  let stdin_w, stdout_r, stderr_r, pid =
   364→    spawn_agent
   365→      ~spec:"agent_explicit_provider.plumb"
   366→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   367→  in
   368→  Unix.sleepf 0.3;
   369→  send_control stdin_w {|{"pause": true}|};
   370→  Unix.sleepf 0.1;
   371→  send_blank_data stdin_w;
   372→  Unix.sleepf 0.2;
   373→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* pause_ack *)
   374→  send_control stdin_w {|{"set_memory": [{"role":"user"}]}|};
   375→  let response = read_ctrl_out ~timeout:1.0 stdout_r in
   376→  send_control stdin_w {|{"get_memory": true}|};
   377→  let memory = read_ctrl_out ~timeout:3.0 stdout_r in
   378→  send_control stdin_w {|{"stop": true}|};
   379→  Unix.close stdin_w;
   380→  let _ = wait_child pid in
   381→  let stderr = drain_stderr stderr_r in
   382→  cleanup stdin_w stdout_r stderr_r;
   383→  Alcotest.(check bool) "no ack for malformed payload"
   384→    true (response = None);
   385→  Alcotest.(check bool) "stderr warns about malformed"
   386→    true (contains stderr "set_memory_malformed");
   387→  (match memory with
   388→   | None -> Alcotest.fail "no get_memory response"
   389→   | Some line ->
   390→     let json = Yojson.Safe.from_string line in
   391→     let open Yojson.Safe.Util in
   392→     Alcotest.(check int) "history unchanged (empty)"
   393→       0 (json |> member "messages" |> to_list |> List.length))
   394→
   395→(** {1 Pause tests} *)
   396→
   397→let test_pause_get_set_resume () =
   398→  let stdin_w, stdout_r, stderr_r, pid =
   399→    spawn_agent
   400→      ~spec:"agent_explicit_provider.plumb"
   401→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   402→  in
   403→  Unix.sleepf 0.3;
   404→  send_control stdin_w {|{"pause": true}|};
   405→  Unix.sleepf 0.1;
   406→  send_blank_data stdin_w;
   407→  Unix.sleepf 0.2;
   408→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* pause_ack *)
   409→  send_control stdin_w {|{"get_memory": true}|};
   410→  let response = read_ctrl_out ~timeout:3.0 stdout_r in
   411→  (match response with
   412→   | None -> Alcotest.fail "no get_memory response while paused"
   413→   | Some line ->
   414→     let json = Yojson.Safe.from_string line in
   415→     let open Yojson.Safe.Util in
   416→     Alcotest.(check string) "kind is memory"
   417→       "memory" (json |> member "kind" |> to_string));
   418→  send_control stdin_w
   419→    {|{"set_memory": [{"role":"user","content":"\"compacted\""}]}|};
   420→  let ack = read_ctrl_out ~timeout:3.0 stdout_r in
   421→  (match ack with
   422→   | None -> Alcotest.fail "no ack while paused"
   423→   | Some line ->
   424→     let json = Yojson.Safe.from_string line in
   425→     let open Yojson.Safe.Util in
   426→     Alcotest.(check string) "kind is memory_set"
   427→       "memory_set" (json |> member "kind" |> to_string));
   428→  send_control stdin_w {|{"resume": true}|};
   429→  Unix.sleepf 0.1;
   430→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* resume_ack *)
   431→  send_control stdin_w {|{"stop": true}|};
   432→  Unix.close stdin_w;
   433→  let code = wait_child pid in
   434→  cleanup stdin_w stdout_r stderr_r;
   435→  Alcotest.(check int) "agent exits cleanly" 0 code
   436→
   437→let test_resume_without_pause () =
   438→  let stdin_w, stdout_r, stderr_r, pid =
   439→    spawn_agent
   440→      ~spec:"agent_explicit_provider.plumb"
   441→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   442→  in
   443→  Unix.sleepf 0.3;
   444→  send_control stdin_w {|{"resume": true}|};
   445→  Unix.sleepf 0.1;
   446→  send_control stdin_w {|{"pause": true}|};
   447→  Unix.sleepf 0.1;
   448→  send_blank_data stdin_w;
   449→  Unix.sleepf 0.2;
   450→  let _ = read_ctrl_out ~timeout:3.0 stdout_r in  (* pause_ack *)
   451→  send_control stdin_w {|{"get_memory": true}|};
   452→  let response = read_ctrl_out ~timeout:3.0 stdout_r in
   453→  send_control stdin_w {|{"stop": true}|};
   454→  Unix.close stdin_w;
   455→  let code = wait_child pid in
   456→  cleanup stdin_w stdout_r stderr_r;
   457→  Alcotest.(check bool) "got memory response" true (response <> None);
   458→  Alcotest.(check int) "agent exits cleanly" 0 code
   459→
   460→let test_stop_while_paused () =
   461→  let stdin_w, stdout_r, stderr_r, pid =
   462→    spawn_agent
   463→      ~spec:"agent_explicit_provider.plumb"
   464→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   465→  in
   466→  Unix.sleepf 0.3;
   467→  send_control stdin_w {|{"pause": true}|};
   468→  Unix.sleepf 0.1;
   469→  send_blank_data stdin_w;
   470→  Unix.sleepf 0.2;
   471→  send_control stdin_w {|{"stop": true}|};
   472→  Unix.close stdin_w;
   473→  let code = wait_child pid in
   474→  cleanup stdin_w stdout_r stderr_r;
   475→  Alcotest.(check int) "exits while paused" 0 code
   476→
   477→let test_stdin_eof_while_paused () =
   478→  let stdin_w, stdout_r, stderr_r, pid =
   479→    spawn_agent
   480→      ~spec:"agent_explicit_provider.plumb"
   481→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   482→  in
   483→  Unix.sleepf 0.3;
   484→  send_control stdin_w {|{"pause": true}|};
   485→  Unix.sleepf 0.1;
   486→  send_blank_data stdin_w;
   487→  Unix.sleepf 0.2;
   488→  (* Close stdin — closes all mux channels, agent exits *)
   489→  Unix.close stdin_w;
   490→  let code = wait_child pid in
   491→  cleanup stdin_w stdout_r stderr_r;
   492→  Alcotest.(check int) "exits on EOF while paused" 0 code
   493→
   494→(** {1 Drain tests — protocol handling after data EOF} *)
   495→
   496→let test_drain_completes_protocol () =
   497→  let stdin_w, stdout_r, stderr_r, pid =
   498→    spawn_agent
   499→      ~spec:"agent_explicit_provider.plumb"
   500→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   501→  in
   502→  Unix.sleepf 0.3;
   503→  (* Signal data EOF — agent enters drain loop, but ctrl stays open *)
   504→  send_data_eof stdin_w;
   505→  Unix.sleepf 0.2;
   506→  (* Send pause — agent should handle it in drain loop *)
   507→  send_control stdin_w {|{"pause": true}|};
   508→  let ack = read_ctrl_out ~timeout:3.0 stdout_r in
   509→  (match ack with
   510→   | None -> Alcotest.fail "no pause_ack in drain mode"
   511→   | Some line ->
   512→     let json = Yojson.Safe.from_string line in
   513→     let open Yojson.Safe.Util in
   514→     Alcotest.(check string) "pause_ack kind"
   515→       "pause_ack" (json |> member "kind" |> to_string));
   516→  send_control stdin_w {|{"get_memory": true}|};
   517→  let mem = read_ctrl_out ~timeout:3.0 stdout_r in
   518→  (match mem with
   519→   | None -> Alcotest.fail "no get_memory response in drain mode"
   520→   | Some line ->
   521→     let json = Yojson.Safe.from_string line in
   522→     let open Yojson.Safe.Util in
   523→     Alcotest.(check string) "memory kind"
   524→       "memory" (json |> member "kind" |> to_string));
   525→  send_control stdin_w {|{"resume": true}|};
   526→  let resume = read_ctrl_out ~timeout:3.0 stdout_r in
   527→  (match resume with
   528→   | None -> Alcotest.fail "no resume_ack in drain mode"
   529→   | Some line ->
   530→     let json = Yojson.Safe.from_string line in
   531→     let open Yojson.Safe.Util in
   532→     Alcotest.(check string) "resume_ack kind"
   533→       "resume_ack" (json |> member "kind" |> to_string));
   534→  (* Close stdin — ctrl EOF triggers agent exit *)
   535→  Unix.close stdin_w;
   536→  let code = wait_child pid in
   537→  cleanup stdin_w stdout_r stderr_r;
   538→  Alcotest.(check int) "drain completes protocol" 0 code
   539→
   540→let test_drain_exits_on_stdin_eof () =
   541→  let stdin_w, stdout_r, stderr_r, pid =
   542→    spawn_agent
   543→      ~spec:"agent_explicit_provider.plumb"
   544→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   545→  in
   546→  Unix.sleepf 0.3;
   547→  (* Close stdin — EOF on all channels, agent exits *)
   548→  Unix.close stdin_w;
   549→  let code = wait_child pid in
   550→  cleanup stdin_w stdout_r stderr_r;
   551→  Alcotest.(check int) "drain exits on stdin EOF" 0 code
   552→
   553→let test_drain_handles_memory_without_pause () =
   554→  let stdin_w, stdout_r, stderr_r, pid =
   555→    spawn_agent
   556→      ~spec:"agent_explicit_provider.plumb"
   557→      [| "ANTHROPIC_API_KEY=test-key-not-real" |]
   558→  in
   559→  Unix.sleepf 0.3;
   560→  (* Signal data EOF *)
   561→  send_data_eof stdin_w;
   562→  Unix.sleepf 0.2;
   563→  (* Send get_memory without pause — exercises non-paused drain path *)
   564→  send_control stdin_w {|{"get_memory": true}|};
   565→  let mem = read_ctrl_out ~timeout:3.0 stdout_r in
   566→  (match mem with
   567→   | None -> Alcotest.fail "no get_memory response in drain (no pause)"
   568→   | Some line ->
   569→     let json = Yojson.Safe.from_string line in
   570→     let open Yojson.Safe.Util in
   571→     Alcotest.(check string) "memory kind"
   572→       "memory" (json |> member "kind" |> to_string));
   573→  Unix.close stdin_w;
   574→  let code = wait_child pid in
   575→  cleanup stdin_w stdout_r stderr_r;
   576→  Alcotest.(check int) "drain handles memory without pause" 0 code
   577→
   578→let () =
   579→  Alcotest.run "agent-control" [
   580→    "stop", [
   581→      Alcotest.test_case "stop via control" `Quick
   582→        test_stop_via_control;
   583→      Alcotest.test_case "stdin EOF sets stop" `Quick
   584→        test_stdin_eof_sets_stop;
   585→    ];
   586→    "memory", [
   587→      Alcotest.test_case "get_memory empty history" `Quick
   588→        test_get_memory_empty;
   589→      Alcotest.test_case "set_memory then get_memory" `Quick
   590→        test_set_then_get_memory;
   591→      Alcotest.test_case "set_memory empty clears history" `Quick
   592→        test_set_memory_empty;
   593→      Alcotest.test_case "set_memory malformed rejected" `Quick
   594→        test_set_memory_malformed_rejected;
   595→    ];
   596→    "pause", [
   597→      Alcotest.test_case "pause get set resume flow" `Quick
   598→        test_pause_get_set_resume;
   599→      Alcotest.test_case "resume without pause is no-op" `Quick
   600→        test_resume_without_pause;
   601→      Alcotest.test_case "stop while paused" `Quick
   602→        test_stop_while_paused;
   603→      Alcotest.test_case "stdin EOF while paused" `Quick
   604→        test_stdin_eof_while_paused;
   605→    ];
   606→    "drain", [
   607→      Alcotest.test_case "drain completes protocol" `Quick
   608→        test_drain_completes_protocol;
   609→      Alcotest.test_case "drain exits on stdin EOF" `Quick
   610→        test_drain_exits_on_stdin_eof;
   611→      Alcotest.test_case "drain handles memory without pause" `Quick
   612→        test_drain_handles_memory_without_pause;
   613→    ];
   614→  ]
   615→
Read
{
  "file_path": "/Users/ww/src/plumbing/lib/fabric/dune"
}
     1→(library
     2→ (name fabric)
     3→ (public_name plumbing.fabric)
     4→ (libraries transport supervisor runtime language spawn_lib yojson eio eio.unix unix threads.posix))
     5→
Read
{
  "file_path": "/Users/ww/src/plumbing/lib/transport/transport_zmq.ml"
}
     1→(* ZeroMQ implementation of the TRANSPORT signature.
     2→   Uses Eio_unix.run_in_systhread for non-blocking recv/send,
     3→   NOT zmq-eio (which has bugs and cleanup issues). See PLMB-037.
     4→
     5→   Each blocking ZMQ operation runs in a system thread via
     6→   Eio_unix.run_in_systhread, yielding the Eio scheduler while
     7→   waiting. No persistent event loop fibres — clean shutdown. *)
     8→
     9→open Transport_intf
    10→
    11→(* All sockets are wrapped in an existential. The phantom type
    12→   parameter is enforced by the module type ascription. *)
    13→
    14→type zmq_socket =
    15→  | Zmq_push : [`Push] Zmq.Socket.t -> zmq_socket
    16→  | Zmq_pull : [`Pull] Zmq.Socket.t -> zmq_socket
    17→  | Zmq_pub  : [`Pub]  Zmq.Socket.t -> zmq_socket
    18→  | Zmq_sub  : [`Sub]  Zmq.Socket.t -> zmq_socket
    19→  | Zmq_pair : [`Pair] Zmq.Socket.t -> zmq_socket
    20→
    21→type _ socket = zmq_socket
    22→
    23→type context = Zmq.Context.t
    24→
    25→let wrap_zmq_exn f =
    26→  try f () with
    27→  | Zmq.ZMQ_exception (_, msg) ->
    28→    raise (Transport_error (Internal msg, msg))
    29→  | Unix.Unix_error (code, fn, arg) ->
    30→    let msg = Printf.sprintf "%s: %s(%s)" (Unix.error_message code) fn arg in
    31→    raise (Transport_error (Internal msg, msg))
    32→
    33→let create_context () = wrap_zmq_exn Zmq.Context.create
    34→
    35→let destroy_context ctx =
    36→  try Zmq.Context.terminate ctx with
    37→  | Zmq.ZMQ_exception _ -> ()
    38→
    39→(* Socket creation.
    40→   Sets linger=0 so close doesn't block waiting for pending messages.
    41→   Sets receive timeout=1000ms so recv unblocks periodically, allowing
    42→   clean shutdown (the recv loop retries until the socket is closed). *)
    43→let make_socket ctx kind =
    44→  let s = Zmq.Socket.create ctx kind in
    45→  Zmq.Socket.set_linger_period s 0;
    46→  Zmq.Socket.set_receive_timeout s 1000;
    47→  s
    48→
    49→let push_socket ctx ~sw:_ = Zmq_push (make_socket ctx Zmq.Socket.push)
    50→let pull_socket ctx ~sw:_ = Zmq_pull (make_socket ctx Zmq.Socket.pull)
    51→let pub_socket  ctx ~sw:_ = Zmq_pub  (make_socket ctx Zmq.Socket.pub)
    52→let sub_socket  ctx ~sw:_ = Zmq_sub  (make_socket ctx Zmq.Socket.sub)
    53→let pair_socket ctx ~sw:_ = Zmq_pair (make_socket ctx Zmq.Socket.pair)
    54→
    55→(* Bind/connect/disconnect — non-blocking, run directly *)
    56→let bind_raw : type z. z Zmq.Socket.t -> endpoint -> unit =
    57→  fun s ep -> wrap_zmq_exn (fun () -> Zmq.Socket.bind s ep)
    58→
    59→let connect_raw : type z. z Zmq.Socket.t -> endpoint -> unit =
    60→  fun s ep -> wrap_zmq_exn (fun () -> Zmq.Socket.connect s ep)
    61→
    62→let disconnect_raw : type z. z Zmq.Socket.t -> endpoint -> unit =
    63→  fun s ep -> wrap_zmq_exn (fun () -> Zmq.Socket.disconnect s ep)
    64→
    65→let bind sock ep = match sock with
    66→  | Zmq_push s -> bind_raw s ep | Zmq_pull s -> bind_raw s ep
    67→  | Zmq_pub s  -> bind_raw s ep | Zmq_sub s  -> bind_raw s ep
    68→  | Zmq_pair s -> bind_raw s ep
    69→
    70→let connect sock ep = match sock with
    71→  | Zmq_push s -> connect_raw s ep | Zmq_pull s -> connect_raw s ep
    72→  | Zmq_pub s  -> connect_raw s ep | Zmq_sub s  -> connect_raw s ep
    73→  | Zmq_pair s -> connect_raw s ep
    74→
    75→let disconnect sock ep = match sock with
    76→  | Zmq_push s -> disconnect_raw s ep | Zmq_pull s -> disconnect_raw s ep
    77→  | Zmq_pub s  -> disconnect_raw s ep | Zmq_sub s  -> disconnect_raw s ep
    78→  | Zmq_pair s -> disconnect_raw s ep
    79→
    80→let subscribe sock topic = match sock with
    81→  | Zmq_sub s -> wrap_zmq_exn (fun () -> Zmq.Socket.subscribe s topic)
    82→  | _ -> invalid_arg "subscribe: not a SUB socket"
    83→
    84→let unsubscribe sock topic = match sock with
    85→  | Zmq_sub s -> wrap_zmq_exn (fun () -> Zmq.Socket.unsubscribe s topic)
    86→  | _ -> invalid_arg "unsubscribe: not a SUB socket"
    87→
    88→(* Single-frame envelope encoding: <4-byte topic length><topic><payload> *)
    89→
    90→let encode_frame ~topic payload =
    91→  let tlen = String.length topic in
    92→  let buf = Buffer.create (4 + tlen + String.length payload) in
    93→  Buffer.add_char buf (Char.chr ((tlen lsr 24) land 0xff));
    94→  Buffer.add_char buf (Char.chr ((tlen lsr 16) land 0xff));
    95→  Buffer.add_char buf (Char.chr ((tlen lsr 8) land 0xff));
    96→  Buffer.add_char buf (Char.chr (tlen land 0xff));
    97→  Buffer.add_string buf topic;
    98→  Buffer.add_string buf payload;
    99→  Buffer.contents buf
   100→
   101→let decode_frame frame =
   102→  let len = String.length frame in
   103→  if len < 4 then
   104→    raise (Transport_error (Protocol_error, "frame too short for envelope header"));
   105→  let tlen =
   106→    (Char.code frame.[0] lsl 24)
   107→    lor (Char.code frame.[1] lsl 16)
   108→    lor (Char.code frame.[2] lsl 8)
   109→    lor (Char.code frame.[3])
   110→  in
   111→  if 4 + tlen > len then
   112→    raise (Transport_error (Protocol_error,
   113→      Printf.sprintf "topic length %d exceeds frame size %d" tlen len));
   114→  let topic = String.sub frame 4 tlen in
   115→  let payload = String.sub frame (4 + tlen) (len - 4 - tlen) in
   116→  (topic, payload)
   117→
   118→(* Send: non-blocking send with fd-based waiting for backpressure. *)
   119→let send_raw : type z. z Zmq.Socket.t -> string -> unit =
   120→  fun s frame ->
   121→    let fd = Zmq.Socket.get_fd s in
   122→    let rec loop () =
   123→      try Zmq.Socket.send ~block:false s frame
   124→      with Unix.Unix_error (Unix.EAGAIN, _, _) ->
   125→        Eio_unix.await_writable fd;
   126→        loop ()
   127→    in
   128→    loop ()
   129→
   130→let send sock ~topic payload =
   131→  let frame = encode_frame ~topic payload in
   132→  match sock with
   133→  | Zmq_push s -> send_raw s frame
   134→  | Zmq_pub s  -> send_raw s frame
   135→  | Zmq_pair s -> send_raw s frame
   136→  | _ -> invalid_arg "send: not a sendable socket"
   137→
   138→(* Recv: uses ZMQ's fd + Eio_unix.await_readable for proper
   139→   cancellation support. The ZMQ fd becomes readable when there
   140→   are events to process; we then try a non-blocking recv. *)
   141→let recv_raw : type z. z Zmq.Socket.t -> string =
   142→  fun s ->
   143→    let fd = Zmq.Socket.get_fd s in
   144→    let rec loop () =
   145→      (* Check if data is available without blocking *)
   146→      match Zmq.Socket.events s with
   147→      | Zmq.Socket.Poll_in | Zmq.Socket.Poll_in_out ->
   148→        (try Zmq.Socket.recv ~block:false s
   149→         with Unix.Unix_error (Unix.EAGAIN, _, _) ->
   150→           (* Spurious — fd was ready but recv got EAGAIN.
   151→              Wait for next notification. *)
   152→           Eio_unix.await_readable fd;
   153→           loop ())
   154→      | _ ->
   155→        (* No data ready — wait for the fd to become readable.
   156→           This yields to Eio and is cancellable. *)
   157→        Eio_unix.await_readable fd;
   158→        loop ()
   159→    in
   160→    loop ()
   161→
   162→let recv sock =
   163→  let frame = match sock with
   164→    | Zmq_pull s -> recv_raw s
   165→    | Zmq_sub s  -> recv_raw s
   166→    | Zmq_pair s -> recv_raw s
   167→    | _ -> invalid_arg "recv: not a receivable socket"
   168→  in
   169→  decode_frame frame
   170→
   171→let try_recv sock =
   172→  let try_on : type z. z Zmq.Socket.t -> (topic * string) option = fun s ->
   173→    try
   174→      let frame = Zmq.Socket.recv ~block:false s in
   175→      Some (decode_frame frame)
   176→    with Unix.Unix_error (Unix.EAGAIN, _, _) -> None
   177→  in
   178→  match sock with
   179→  | Zmq_pull s -> try_on s
   180→  | Zmq_sub s  -> try_on s
   181→  | Zmq_pair s -> try_on s
   182→  | _ -> invalid_arg "try_recv: not a receivable socket"
   183→
   184→(* Multipart encoding as single frame *)
   185→let send_parts sock parts =
   186→  let buf = Buffer.create 256 in
   187→  let n = List.length parts in
   188→  Buffer.add_char buf (Char.chr ((n lsr 8) land 0xff));
   189→  Buffer.add_char buf (Char.chr (n land 0xff));
   190→  List.iter (fun part ->
   191→    let plen = String.length part in
   192→    Buffer.add_char buf (Char.chr ((plen lsr 24) land 0xff));
   193→    Buffer.add_char buf (Char.chr ((plen lsr 16) land 0xff));
   194→    Buffer.add_char buf (Char.chr ((plen lsr 8) land 0xff));
   195→    Buffer.add_char buf (Char.chr (plen land 0xff));
   196→    Buffer.add_string buf part
   197→  ) parts;
   198→  let frame = Buffer.contents buf in
   199→  match sock with
   200→  | Zmq_push s -> send_raw s frame
   201→  | Zmq_pub s  -> send_raw s frame
   202→  | Zmq_pair s -> send_raw s frame
   203→  | _ -> invalid_arg "send_parts: not a sendable socket"
   204→
   205→let recv_parts sock =
   206→  let frame = match sock with
   207→    | Zmq_pull s -> recv_raw s
   208→    | Zmq_sub s  -> recv_raw s
   209→    | Zmq_pair s -> recv_raw s
   210→    | _ -> invalid_arg "recv_parts: not a receivable socket"
   211→  in
   212→  let len = String.length frame in
   213→  if len < 2 then raise (Transport_error (Protocol_error, "parts frame too short"));
   214→  let n = (Char.code frame.[0] lsl 8) lor (Char.code frame.[1]) in
   215→  let pos = ref 2 in
   216→  let parts = ref [] in
   217→  for _ = 1 to n do
   218→    if !pos + 4 > len then
   219→      raise (Transport_error (Protocol_error, "truncated parts frame"));
   220→    let plen =
   221→      (Char.code frame.[!pos] lsl 24)
   222→      lor (Char.code frame.[!pos + 1] lsl 16)
   223→      lor (Char.code frame.[!pos + 2] lsl 8)
   224→      lor (Char.code frame.[!pos + 3])
   225→    in
   226→    pos := !pos + 4;
   227→    if !pos + plen > len then
   228→      raise (Transport_error (Protocol_error, "truncated part data"));
   229→    parts := String.sub frame !pos plen :: !parts;
   230→    pos := !pos + plen
   231→  done;
   232→  List.rev !parts
   233→
   234→(* Monitor — simplified, uses polling instead of zmq-eio *)
   235→let monitor _ctx ~sw sock : event Eio.Stream.t =
   236→  let stream = Eio.Stream.create 64 in
   237→  let poll_raw : type z. z Zmq.Socket.t -> unit = fun raw ->
   238→    let mon = Zmq.Monitor.create raw in
   239→    let mon_sock = Zmq.Monitor.connect _ctx mon in
   240→    Eio.Fiber.fork_daemon ~sw (fun () ->
   241→      let rec loop () =
   242→        let zmq_event =
   243→          Eio_unix.run_in_systhread (fun () ->
   244→            Zmq.Monitor.recv mon_sock)
   245→        in
   246→        let our_event = match zmq_event with
   247→          | Zmq.Monitor.Connected (addr, _) -> Some (Peer_connected addr)
   248→          | Zmq.Monitor.Disconnected (addr, _) -> Some (Peer_disconnected addr)
   249→          | Zmq.Monitor.Connect_delayed addr ->
   250→            Some (Connect_failed (addr, "connect delayed"))
   251→          | Zmq.Monitor.Bind_failed (addr, errno, msg) ->
   252→            Some (Bind_failed_event (addr,
   253→              Printf.sprintf "errno=%d: %s" errno msg))
   254→          | Zmq.Monitor.Monitor_stopped _ -> None
   255→          | _ -> None
   256→        in
   257→        (match our_event with
   258→         | Some e -> Eio.Stream.add stream e
   259→         | None -> ());
   260→        match zmq_event with
   261→        | Zmq.Monitor.Monitor_stopped _ -> `Stop_daemon
   262→        | _ -> loop ()
   263→      in
   264→      loop ())
   265→  in
   266→  (match sock with
   267→   | Zmq_push s -> poll_raw s | Zmq_pull s -> poll_raw s
   268→   | Zmq_pub s  -> poll_raw s | Zmq_sub s  -> poll_raw s
   269→   | Zmq_pair s -> poll_raw s);
   270→  stream
   271→
   272→let close sock =
   273→  let close_raw : type z. z Zmq.Socket.t -> unit = fun s ->
   274→    (try Zmq.Socket.close s with Zmq.ZMQ_exception _ -> ())
   275→  in
   276→  match sock with
   277→  | Zmq_push s -> close_raw s | Zmq_pull s -> close_raw s
   278→  | Zmq_pub s  -> close_raw s | Zmq_sub s  -> close_raw s
   279→  | Zmq_pair s -> close_raw s
   280→
   281→let force_close = close  (* Same as close — no event loop to bypass *)
   282→
   283→let set_hwm_raw : type z. z Zmq.Socket.t -> unit =
   284→  fun s ->
   285→    Zmq.Socket.set_send_high_water_mark s 0;
   286→    Zmq.Socket.set_receive_high_water_mark s 0
   287→
   288→let set_unlimited_hwm sock = match sock with
   289→  | Zmq_push s -> set_hwm_raw s | Zmq_pull s -> set_hwm_raw s
   290→  | Zmq_pub s  -> set_hwm_raw s | Zmq_sub s  -> set_hwm_raw s
   291→  | Zmq_pair s -> set_hwm_raw s
   292→
Read
{
  "file_path": "/Users/ww/src/plumbing/doc/agent-runtime.md"
}
     1→# Agent runtime
     2→
     3→Runtime behaviour of the agent process. For primitive semantics and
     4→configuration, see [processes.md](processes.md). For the multiplexed I/O
     5→envelope format, see [processes.md § Multiplexed I/O](processes.md#multiplexed-io).
     6→
     7→## Environment variables
     8→
     9→| Variable | Description |
    10→|----------|-------------|
    11→| `PLUMB_PROVIDER` | Default LLM provider for agents without explicit `provider` in config. Accepted values: `"anthropic"`, `"openai"`. Set via `--provider` flag on `chat`/`plumb`. |
    12→| `PLUMB_MODEL` | Default LLM model for agents without explicit `model` in config. Set via `--model` flag on `chat`/`plumb`. |
    13→| `ANTHROPIC_API_KEY` | Required when `provider = "anthropic"`. API key for the Anthropic Messages API. |
    14→| `OPENAI_API_KEY` | Required when `provider = "openai"`. API key for the OpenAI Chat Completions API. |
    15→| `PIPELINE_DEBUG` | Set to `1` to enable debug logging. Per-message conversation transcript and API metadata are written as JSON Lines to stderr. Default: off (only warnings and errors). |
    16→| `PLUMB_MCP_TOOLS` | Path to temp JSON file containing MCP tool schemas. Set by plumb runtime when MCP servers are configured. Internal — not user-facing. |
    17→
    18→When `PIPELINE_DEBUG=1`, the agent logs each new message in the conversation (including retry messages that don't carry forward into history) and API call metadata:
    19→
    20→```sh
    21→echo '"hello"' | PIPELINE_DEBUG=1 agent spec.plumb
    22→```
    23→
    24→Each log line is a JSON object with `"log"` (level) and `"event"` (name) fields, distinct from both error output and thinking output. Example:
    25→
    26→```json
    27→{"log":"debug","event":"message","role":"user","content":"\"hello\""}
    28→{"log":"debug","event":"api_request","model":"claude-sonnet-4-5-20250514","max_tokens":8192,"thinking_budget":null,"message_count":1}
    29→{"log":"debug","event":"message","role":"assistant","content":[{"type":"text","text":"{\"greeting\":\"hi\"}"}]}
    30→{"log":"debug","event":"api_response","status":200,"content_blocks":1,"stop_reason":"end_turn","input_tokens":150,"output_tokens":42}
    31→```
    32→
    33→Retry messages are tagged with `"retry":true` so they can be distinguished from messages that persist in history.
    34→
    35→The environment variable propagates through pipelines — all processes in a composed pipeline inherit it.
    36→
    37→## Telemetry
    38→
    39→The agent writes structured telemetry as `__port: "telemetry"` envelopes
    40→on stdout (see [processes.md § Multiplexed I/O](processes.md#multiplexed-io)).
    41→Every telemetry event has a `kind` field that identifies the event type.
    42→The event kinds are:
    43→
    44→| Kind | Description | Key fields |
    45→|------|-------------|------------|
    46→| `config` | Agent configuration at startup | `model`, `provider`, `max_tokens`, `thinking_budget`, `tools` |
    47→| `usage` | Token usage per API call | `input_tokens`, `output_tokens`, `cache_creation_input_tokens`, `cache_read_input_tokens` |
    48→| `thinking` | Extended thinking block | `thinking`, `signature` |
    49→| `tool_call` | Tool invocation summary | `name`, `input`, `summary`, `is_error` |
    50→| `pin_state` | Absolute pin state after pin/unpin | `action`, `count`, `keys` |
    51→| `output` | Validated output value | `content` |
    52→
    53→Example events:
    54→
    55→```json
    56→{"kind":"config","model":"claude-haiku-4-5-20251001","provider":"anthropic","max_tokens":8192}
    57→{"kind":"usage","input_tokens":150,"output_tokens":42}
    58→{"kind":"thinking","thinking":"Let me consider...","signature":"abc123"}
    59→{"kind":"tool_call","name":"doc_read","input":{"path":"notes.md"},"summary":"ok","is_error":false}
    60→{"kind":"pin_state","action":"pin","count":2,"keys":["doc_read({\"path\":\"notes.md\"})","search({\"query\":\"cats\"})"]}
    61→{"kind":"output","content":{"greeting":"hello"}}
    62→```
    63→
    64→### The `type` → `kind` rewrite
    65→
    66→The LLM API returns thinking blocks with a `"type": "thinking"` field.
    67→Since `type` is a reserved keyword in the plumbing language (used for
    68→type declarations), it cannot appear in filter expressions like
    69→`filter(type = "thinking")`. The agent binary rewrites `"type"` to
    70→`"kind"` on all thinking blocks before emitting them as telemetry, so
    71→filters and observers can reference the field:
    72→
    73→```
    74→agent.telemetry ; filter(kind = "thinking") ; observer
    75→```
    76→
    77→All other telemetry events (`usage`, `config`, `tool_call`, `output`)
    78→are constructed by the agent binary and use `"kind"` natively — no
    79→rewriting needed.
    80→
    81→### Wiring telemetry
    82→
    83→In the plumbing language, telemetry maps to the agent's `.telemetry`
    84→port. By default this port is implicitly discarded (`; discard`). To
    85→observe an agent's reasoning, wire the telemetry stream to an
    86→observer:
    87→
    88→```
    89→composer.telemetry ; angel ; mux(source: angel) ; composer
    90→```
    91→
    92→Example config with thinking:
    93→
    94→```
    95→summarizer : Article → Summary = agent {
    96→  model: "claude-sonnet-4-5-20250514"
    97→  prompt: "Summarize the article."
    98→  max_tokens: 16384
    99→  thinking_budget: 10000
   100→}
   101→```
   102→
   103→Thinking blocks are preserved in conversation history for multi-turn
   104→consistency. Each thinking block includes a `signature` field required
   105→by the API for multi-turn pass-through.
   106→
   107→## stderr
   108→
   109→Stderr (fd 2) carries debug logs and errors, distinguished by format:
   110→
   111→- **Debug logs**: `{"log": "debug", "event": "...", ...}` — conversation messages, API metadata. Only emitted when `PIPELINE_DEBUG=1`.
   112→- **Errors**: `{"error": "...", "code": "...", ...}` — structured error output. Always emitted.
   113→
   114→Thinking blocks go to the telemetry port, not stderr. Debug logs and
   115→errors go to stderr.
   116→
   117→## Control channel
   118→
   119→Agents with product input types receive a control stream on a second input channel:
   120→
   121→```
   122→type Control = { set_temp: float } | { set_model: string } | { stop: bool }
   123→
   124→let composer : (!Draft, !Control) → !Verdict = agent {
   125→  prompt: "Review the draft."
   126→}
   127→```
   128→
   129→The control stream arrives as `__port: "ctrl_in"` envelopes on stdin. A background thread reads from the internal ctrl pipe (fed by the stdin demux) and updates the agent's mutable state under a mutex. The agent writes responses (memory dumps, acknowledgements) as `__port: "ctrl_out"` envelopes on stdout.
   130→
   131→| Field | Type | Effect |
   132→|-------|------|--------|
   133→| `set_temp` | float \| null | Set or clear temperature override |
   134→| `set_model` | string \| null | Set or clear model override |
   135→| `stop` | bool | Signal clean exit after current message |
   136→| `pause` | bool | Stop reading data from stdin; control and telemetry stay live |
   137→| `resume` | bool | Resume reading data from stdin |
   138→| `get_memory` | bool | Emit conversation history on ctrl_out (kind: `memory`) |
   139→| `set_memory` | array | Replace conversation history; emit ack on ctrl_out (kind: `memory_set`) |
   140→
   141→Sending `null` clears an override, reverting to the spec default:
   142→
   143→```json
   144→{"set_temp": 0.9}
   145→{"set_temp": null}
   146→{"set_model": "claude-opus-4-20250514"}
   147→{"set_model": null}
   148→{"stop": true}
   149→{"pause": true}
   150→{"get_memory": true}
   151→{"set_memory": [{"role":"user","content":"\"hello\""},{"role":"assistant","content":"\"world\""}]}
   152→{"resume": true}
   153→```
   154→
   155→Unknown fields are silently ignored (forward compatibility).
   156→
   157→### Control output (fd 7)
   158→
   159→The agent writes JSON Lines to fd 7 in response to memory operations:
   160→
   161→**get_memory response:**
   162→```json
   163→{"kind":"memory","messages":[{"role":"user","content":"\"hello\""},{"role":"assistant","content":"{\"greeting\":\"hi\"}"}],"pinned":[]}
   164→```
   165→
   166→The `pinned` array contains tool results the agent has marked for
   167→preservation across compaction cycles. Each entry has `tool`,
   168→`arguments`, and `content` fields. See [Document pinning](#document-pinning).
   169→
   170→**set_memory acknowledgement:**
   171→```json
   172→{"kind":"memory_set","old_messages":42,"new_messages":2}
   173→```
   174→
   175→**pause acknowledgement:**
   176→```json
   177→{"kind":"pause_ack"}
   178→```
   179→
   180→**resume acknowledgement:**
   181→```json
   182→{"kind":"resume_ack"}
   183→```
   184→
   185→**Note**: `pause_ack` and `resume_ack` are emitted on fd 7 (ctrl_out), which flows through the session tagger if a `<->` wire is active. The tagger wraps all ctrl_out messages in `{__step: N, payload: msg}` envelopes. For linear protocols (terminated by `end`), the protocol structure provides one-shot trigger semantics — no additional gating is needed.
   186→
   187→### When changes take effect
   188→
   189→Model and temperature changes take effect on the **next API call** — not just the next input message. The conversation loop reads the current values via closures on every iteration, including retries and tool-use rounds within a single input. No HTTP connection restart is needed: `model` and `temperature` are per-request body parameters, not connection-level config.
   190→
   191→The `stop` signal is checked between input messages. It does not interrupt a running API call.
   192→
   193→### Pause and memory protocol
   194→
   195→When paused, the agent stops reading from stdin but continues handling control messages. Memory requests (`get_memory`, `set_memory`) are processed at message boundaries — between input lines when not paused, or immediately when paused.
   196→
   197→The `set_memory` operation replaces conversation history. The system prompt is never affected. If any message in the `set_memory` array lacks `role` or `content`, the entire payload is rejected with a warning.
   198→
   199→When pins are active, `set_memory` automatically prepends a user message containing all pinned tool results in `<pinned-documents>` blocks. This happens transparently — the compactor does not need to preserve pinned content in its output.
   200→
   201→**Protocol constraint**: `memory_request` is a single slot. The controller must wait for a response on fd 7 before sending the next memory request. If two requests arrive before the main loop handles either, the first is silently overwritten.
   202→
   203→## System prompt construction
   204→
   205→The system prompt is assembled from the agent's config:
   206→
   207→1. **Inline prompt** (`prompt: "..."`) — added first as a plain text block.
   208→2. **Prompt files** (`prompts: ["system.md", "./role.md"]`) — each file is read at startup and wrapped in document tags:
   209→
   210→   ```
   211→   <doc id="system.md">
   212→   file contents here
   213→   </doc>
   214→   ```
   215→
   216→   File paths are resolved by the executor before the agent starts:
   217→
   218→   - **Bare name** (no `/`) — searched via the resource path
   219→     (`PLUMB_RESOURCES` + installed resource directories).
   220→     Use for shared files: `"system.md"`, `"grammar.peg"`.
   221→   - **Path** (contains `/`) — resolved relative to the `.plumb`
   222→     file that defines the agent. Use for local files:
   223→     `"./composer.md"`, `"./heat.plumb"`.
   224→
   225→3. **Output instruction** — appended automatically. Describes the expected JSON output type and instructs the model to respond with only the JSON value.
   226→
   227→Each part becomes a separate system block. The output instruction has `cache_break: true` for prompt caching efficiency.
   228→
   229→## Streaming
   230→
   231→API responses are received via Server-Sent Events (SSE). This prevents read timeouts during extended thinking and avoids buffering large responses in memory. Streaming is an internal transport detail — the process interface remains synchronous JSON Lines on stdin/stdout.
   232→
   233→## Document pinning
   234→
   235→Agents can mark tool results as important so they survive context
   236→compaction. The agent calls the synthetic `pin` tool with the name and
   237→arguments of a previous tool call; the result is stored in a session-local
   238→pin set. On the next compaction cycle, pinned content is excluded from
   239→the compressible conversation, provided to the compactor as read-only
   240→reference, and reinjected at the top of the post-compaction history.
   241→
   242→### Pin and unpin tools
   243→
   244→The agent binary injects two synthetic tools into every agent's tool
   245→list:
   246→
   247→- **`pin`** `{tool: string, arguments: object}` — pin the result of a
   248→  previous tool call. If the pin limit (20) is reached, the oldest pin
   249→  is evicted (FIFO). Idempotent — pinning an already-pinned call is a
   250→  no-op.
   251→
   252→- **`unpin`** `{tool: string, arguments: object}` — remove a pin,
   253→  allowing the result to be compacted normally. Idempotent.
   254→
   255→### Canonical keys
   256→
   257→Pins are identified by a canonical form of `(tool_name,
   258→sorted_arguments_json)`. Object keys are recursively sorted;
   259→integer-valued floats are normalised to integers. This means
   260→`{"b":1,"a":2}` and `{"a":2,"b":1}` produce the same pin key.
   261→
   262→### Cache-hit interception
   263→
   264→When the agent calls a tool whose canonical key matches an active pin,
   265→the request is intercepted. Instead of dispatching to the tool, the
   266→agent returns a note telling the LLM where to find the pinned content:
   267→
   268→```json
   269→{"pinned":true,"note":"This tool result is already pinned in your context. Look for <pinned key=\"fetch({\\\"path\\\":\\\"foo.md\\\"})\"> in the pinned-documents block at the top of your conversation."}
   270→```
   271→
   272→### Reinjection format
   273→
   274→After compaction, pinned content is prepended to the new history as a
   275→single user message:
   276→
   277→```xml
   278→<pinned-documents>
   279→The following tool results are pinned in your context. They survive
   280→compaction and are always available. Do not re-fetch them.
   281→
   282→<pinned key="fetch({&quot;path&quot;:&quot;foo.md&quot;})">
   283→file contents here
   284→</pinned>
   285→</pinned-documents>
   286→```
   287→
   288→Each `<pinned>` element carries a `key` attribute — the canonical tool
   289→call form — so the agent can cross-reference it when told a result is
   290→already in context.
   291→
   292→### Persistence
   293→
   294→Pins are session-local. They survive compaction but not process
   295→restart.
   296→
   297→## Runtime context
   298→
   299→When `runtime_context: true` is set in the agent config, the agent
   300→appends a dynamic system block before each API call:
   301→
   302→```
   303→[Runtime context]
   304→Time: 2026-03-08T14:23:17Z
   305→Session input tokens: 82341
   306→Session output tokens: 12045
   307→API calls: 23
   308→Max output tokens per turn: 8192
   309→History messages: 47
   310→Pinned documents: 3
   311→```
   312→
   313→This is not cached (no `cache_break`) — it changes on every call.
   314→Token counts are cumulative for the session. The agent uses this to
   315→plan: how much context budget remains, whether to be concise, when to
   316→summarise.
   317→
   318→## Context window
   319→
   320→Unbounded accumulation will eventually exceed the model's context window. The [compaction protocol](protocols.md) provides automated context management via pause/get_memory/set_memory/resume. Document pinning ensures critical tool results survive compaction. Runtime context injection gives the agent visibility into its own resource consumption.
   321→
Read
{
  "file_path": "/Users/ww/src/plumbing/lib/runtime/routing.ml"
}
     1→(** Routing table — compiler output for the ZMQ fabric.
     2→
     3→    Two-phase compilation:
     4→    1. The compiler walks the plumb body's spawn list and emits
     5→       intermediate routes that reference channel names as nodes.
     6→    2. [resolve_channels] eliminates channel intermediaries by
     7→       cross-joining writer and reader routes into direct
     8→       process-to-process routes. This is the operational realisation
     9→       of ν-restriction in composition (f ; g = (νc)(f(in,c) | g(c,out))).
    10→
    11→    The fabric reads the resolved routing table to create ZMQ sockets.
    12→
    13→    Design decisions D1 (types in lib/runtime) and D10 (routing_table
    14→    terminology). See PLMB-037. *)
    15→
    16→open Language
    17→
    18→(** Channel transport pattern, determined statically by the compiler. *)
    19→type channel_kind =
    20→  | Point_to_point
    21→  (** PUSH/PULL — reliable, blocking backpressure. Default for all
    22→      algebraic channels except copy fan-out. *)
    23→  | Fan_out
    24→  (** PUB/SUB — copy morphism only. HWM set to unlimited. *)
    25→  | Bidirectional
    26→  (** PAIR — control channels between supervisor and inline morphisms. *)
    27→
    28→(** Whether a process runs inline (Eio fibre) or as a subprocess. *)
    29→type process_kind =
    30→  | Inline
    31→  (** Structural morphisms: id, copy, merge, barrier, map, filter, etc.
    32→      Run as Eio fibres within the fabric. *)
    33→  | Subprocess
    34→  (** Agents and nested plumb pipelines. Fork/exec'd by the supervisor. *)
    35→
    36→(** A route: one directional connection between two processes. *)
    37→type route = {
    38→  src : string;           (** source process name *)
    39→  src_port : string;      (** source port name *)
    40→  dst : string;           (** destination process name *)
    41→  dst_port : string;      (** destination port name *)
    42→  kind : channel_kind;    (** transport pattern *)
    43→  topic : string;         (** provenance tag for PUB/SUB filtering *)
    44→  via : string option;    (** original channel name, if resolved *)
    45→}
    46→
    47→(** A process entry in the routing table. *)
    48→type process_entry = {
    49→  proc_name : string;
    50→  proc_kind : process_kind;
    51→  proc_impl : process_impl;
    52→  proc_binding : binding;
    53→}
    54→
    55→(** The routing table: compiler output consumed by the fabric.
    56→
    57→    Invariant: all process names are unique within a table.
    58→    The compiler guarantees this via per-binding instance numbering
    59→    (e.g. [copy0], [copy1], [merge0]). *)
    60→type routing_table = {
    61→  processes : process_entry list;
    62→  (** All processes, in dependency order (leaves first). *)
    63→  routes : route list;
    64→  (** All wiring instructions. *)
    65→  seeded_channels : string list;
    66→  (** Channels pre-loaded with a null token (Petri net initial markings). *)
    67→}
    68→
    69→(** Classify a process implementation as inline or subprocess. *)
    70→let process_kind_of_impl = function
    71→  | Agent _ | Plumb _ -> Subprocess
    72→  | _ -> Inline
    73→
    74→(** Determine the channel kind for a (src, src_port) pair.
    75→    If the same source port appears in multiple routes, it's Fan_out
    76→    (copy morphism). Control channels are Bidirectional. Otherwise
    77→    Point_to_point. *)
    78→let classify_channel_kinds routes =
    79→  (* Count how many routes share each (src, src_port) pair *)
    80→  let tbl = Hashtbl.create 16 in
    81→  List.iter (fun r ->
    82→    let key = (r.src, r.src_port) in
    83→    let n = try Hashtbl.find tbl key with Not_found -> 0 in
    84→    Hashtbl.replace tbl key (n + 1)
    85→  ) routes;
    86→  List.map (fun r ->
    87→    if r.kind = Bidirectional then r
    88→    else
    89→      let key = (r.src, r.src_port) in
    90→      let n = try Hashtbl.find tbl key with Not_found -> 1 in
    91→      if n > 1 then { r with kind = Fan_out }
    92→      else r
    93→  ) routes
    94→
    95→(** Look up the per-message type for a channel by name.
    96→    Checks boundary ports first, then declared channels. *)
    97→let find_channel_type ~ports ~boundary_in_types ~boundary_out_types
    98→    ~n_in ~pb ch_name =
    99→  match List.find_index (fun p -> p = ch_name) ports with
   100→  | Some i when i < n_in -> Some (List.nth boundary_in_types i)
   101→  | Some i -> Some (List.nth boundary_out_types (i - n_in))
   102→  | None ->
   103→    match List.find_opt (fun (ch : channel_decl) ->
   104→      ch.ch_name = ch_name) pb.pb_channels with
   105→    | Some ch -> Some ch.ch_type
   106→    | None -> None
   107→
   108→(** Resolve channel intermediaries into direct process-to-process routes.
   109→
   110→    Channels are bound names (ν-restriction). In the intermediate routing
   111→    table, they appear as route endpoints — but they are not processes and
   112→    have no state. This pass eliminates them by cross-joining writers and
   113→    readers through each channel, producing direct routes.
   114→
   115→    After resolution, no declared channel name appears as a route src or dst.
   116→    The original channel name is preserved in the [via] field for debugging. *)
   117→let resolve_channels ~processes ~boundary_ports routes ~channels =
   118→  let channel_names =
   119→    List.map (fun (ch : Language.channel_decl) -> ch.ch_name) channels in
   120→  let is_channel name =
   121→    List.mem name channel_names in
   122→  let is_process name =
   123→    List.exists (fun (pe : process_entry) -> pe.proc_name = name) processes in
   124→  let is_boundary name =
   125→    List.mem name boundary_ports in
   126→  let is_real_node name =
   127→    is_process name || is_boundary name in
   128→
   129→  (* Partition routes *)
   130→  let direct = ref [] in
   131→  let writers = Hashtbl.create 8 in   (* channel → routes writing to it *)
   132→  let readers = Hashtbl.create 8 in   (* channel → routes reading from it *)
   133→
   134→  List.iter (fun r ->
   135→    let dst_is_ch = is_channel r.dst in
   136→    let src_is_ch = is_channel r.src in
   137→    match dst_is_ch, src_is_ch with
   138→    | false, false ->
   139→      (* Both endpoints are real nodes — keep as-is *)
   140→      direct := r :: !direct
   141→    | true, false ->
   142→      (* Process writes to channel *)
   143→      let ch = r.dst in
   144→      let existing = try Hashtbl.find writers ch with Not_found -> [] in
   145→      Hashtbl.replace writers ch (r :: existing)
   146→    | false, true ->
   147→      (* Process reads from channel *)
   148→      let ch = r.src in
   149→      let existing = try Hashtbl.find readers ch with Not_found -> [] in
   150→      Hashtbl.replace readers ch (r :: existing)
   151→    | true, true ->
   152→      (* Channel-to-channel: should not happen *)
   153→      failwith (Printf.sprintf
   154→        "resolve_channels: channel-to-channel route %s → %s" r.src r.dst)
   155→  ) routes;
   156→
   157→  (* Cross-join writers and readers for each channel *)
   158→  let resolved = ref (List.rev !direct) in
   159→  List.iter (fun ch_name ->
   160→    let ch_writers = try Hashtbl.find writers ch_name with Not_found -> [] in
   161→    let ch_readers = try Hashtbl.find readers ch_name with Not_found -> [] in
   162→    let n_writers = List.length ch_writers in
   163→    let n_readers = List.length ch_readers in
   164→
   165→    match n_writers, n_readers with
   166→    | 0, 0 -> ()  (* unused channel *)
   167→    | _, 0 ->
   168→      failwith (Printf.sprintf
   169→        "channel %s has %d writer(s) but no readers — \
   170→         if the output is intentionally unused, add an explicit discard"
   171→        ch_name n_writers)
   172→    | 0, _ ->
   173→      failwith (Printf.sprintf
   174→        "channel %s has readers but no writer — \
   175→         add 'spawn empty(%s)' or check your wiring"
   176→        ch_name ch_name)
   177→    | _ when n_writers > 1 && n_readers > 1 ->
   178→      failwith (Printf.sprintf
   179→        "resolve_channels: channel %s has %d writers and %d readers \
   180→         — no categorical meaning" ch_name n_writers n_readers)
   181→    | _ ->
   182→      List.iter (fun w ->
   183→        List.iter (fun rd ->
   184→          resolved := {
   185→            src = w.src;
   186→            src_port = w.src_port;
   187→            dst = rd.dst;
   188→            dst_port = rd.dst_port;
   189→            kind = Point_to_point;
   190→            topic = rd.topic;
   191→            via = Some ch_name;
   192→          } :: !resolved
   193→        ) ch_readers
   194→      ) ch_writers
   195→  ) channel_names;
   196→
   197→  (* Validate: no channel names remain as src or dst *)
   198→  List.iter (fun r ->
   199→    if is_channel r.src && is_real_node r.dst then
   200→      failwith (Printf.sprintf
   201→        "resolve_channels: channel %s still appears as route src" r.src);
   202→    if is_channel r.dst && is_real_node r.src then
   203→      failwith (Printf.sprintf
   204→        "resolve_channels: channel %s still appears as route dst" r.dst)
   205→  ) !resolved;
   206→
   207→  List.rev !resolved
   208→
   209→let compile ~prog ~ports ~pb ~type_env ~main_binding ~seeded =
   210→  let boundary_in_types = Validate.port_element_types type_env main_binding.input in
   211→  let boundary_out_types = Validate.port_element_types type_env main_binding.output in
   212→  let n_in = List.length boundary_in_types in
   213→
   214→  let ch_type name =
   215→    match find_channel_type ~ports ~boundary_in_types ~boundary_out_types
   216→            ~n_in ~pb name with
   217→    | Some t -> t
   218→    | None -> Unit
   219→  in
   220→
   221→  (* Type-check a spawn statement's channel types against binding
   222→     expectations.  Extracted from the main iteration loop for clarity:
   223→     this is validation, not routing.  Raises [Failure] on mismatch —
   224→     caught by main.ml as exit 2. *)
   225→  let check_spawn_types ~sp ~binding ~channels =
   226→    let check_ch idx expected label =
   227→      let ch_t = ch_type (List.nth channels idx) in
   228→      if not (Validate.type_eq type_env ch_t expected) then
   229→        failwith (Printf.sprintf "spawn %s: %s type is %s, expected %s"
   230→          sp.sp_process label
   231→          (Pp.pp_type_expr type_env ch_t)
   232→          (Pp.pp_type_expr type_env expected))
   233→    in
   234→    let n_ch = List.length channels in
   235→    (try match binding.impl with
   236→     | Id ->
   237→       let inp = ch_type (List.nth channels 0) in
   238→       let outp = ch_type (List.nth channels 1) in
   239→       if not (Validate.type_eq type_env inp outp) then
   240→         failwith (Printf.sprintf "spawn id: input type %s != output type %s"
   241→           (Pp.pp_type_expr type_env inp)
   242→           (Pp.pp_type_expr type_env outp))
   243→     | Copy ->
   244→       (* Validate binding output type is a pair *)
   245→       let out_t = Validate.resolve type_env
   246→         (Validate.unwrap_stream binding.output) in
   247→       (match out_t with
   248→        | Product [_; _] -> ()
   249→        | Product ts ->
   250→          failwith (Printf.sprintf
   251→            "copy: output product must have exactly 2 elements, got %d"
   252→            (List.length ts))
   253→        | _ -> ());
   254→       let inp = ch_type (List.nth channels 0) in
   255→       check_ch 1 inp "output0";
   256→       check_ch 2 inp "output1"
   257→     | Merge ->
   258→       let in0_t = ch_type (List.nth channels 0) in
   259→       let in1_t = ch_type (List.nth channels 1) in
   260→       let out_t = ch_type (List.nth channels 2) in
   261→       let expected_out =
   262→         if Validate.type_eq type_env in0_t in1_t then in0_t
   263→         else Sum [in0_t; in1_t]
   264→       in
   265→       if not (Validate.type_eq type_env out_t expected_out) then
   266→         failwith (Printf.sprintf
   267→           "spawn merge: output type %s, expected %s"
   268→           (Pp.pp_type_expr type_env out_t)
   269→           (Pp.pp_type_expr type_env expected_out))
   270→     | Barrier ->
   271→       let n = n_ch - 1 in
   272→       let in_types = List.init n (fun i -> ch_type (List.nth channels i)) in
   273→       let expected_out = Product in_types in
   274→       check_ch n expected_out "output"
   275→     | Discard ->
   276→       if n_ch >= 2 then check_ch 1 Unit "output"
   277→     | Project n_proj ->
   278→       let inp = ch_type (List.nth channels 0) in
   279→       (match Validate.resolve type_env inp with
   280→        | Product ts ->
   281→          if n_proj >= List.length ts then
   282→            failwith (Printf.sprintf
   283→              "spawn project(%d): index out of bounds (product has %d elements)"
   284→              n_proj (List.length ts));
   285→          check_ch 1 (List.nth ts n_proj) "output"
   286→        | _ ->
   287→          failwith (Printf.sprintf
   288→            "spawn project(%d): input type %s is not a product"
   289→            n_proj (Pp.pp_type_expr type_env inp)))
   290→     | FormatJson ->
   291→       check_ch 0 (Primitive Json) "input";
   292→       check_ch 1 (Primitive String) "output"
   293→     | ParseJson ->
   294→       check_ch 0 (Primitive String) "input";
   295→       check_ch 1 (Primitive Json) "output"
   296→     | Agent _ ->
   297→       (* Agent type validation: check channel types against binding ports.
   298→          Named args determine which ports are present. *)
   299→       let has_labeled lbl = List.exists (function
   300→         | Labeled (l, _) -> l = lbl | _ -> false) sp.sp_args in
   301→       let binding_in_t = Validate.resolve type_env binding.input in
   302→       let binding_out_t = Validate.resolve type_env binding.output in
   303→       (* Check non-product agent doesn't get extra ports *)
   304→       let has_control = match binding_in_t with
   305→         | Product [Stream _; Stream _] -> true | _ -> false in
   306→       let has_telemetry = match binding_out_t with
   307→         | Product [Stream _; Stream _] -> true | _ -> false in
   308→       if has_labeled "telemetry" && not has_telemetry then
   309→         failwith (Printf.sprintf
   310→           "spawn %s: agent output is not a product, cannot have telemetry port"
   311→           sp.sp_process);
   312→       if has_labeled "ctrl_in" && not has_control then
   313→         failwith (Printf.sprintf
   314→           "spawn %s: agent input is not a product, cannot have ctrl_in port"
   315→           sp.sp_process);
   316→       (* Check ctrl_in type *)
   317→       if has_labeled "ctrl_in" && has_control then begin
   318→         let ctrl_ch = List.find_map (function
   319→           | Labeled ("ctrl_in", ch) -> Some ch | _ -> None) sp.sp_args in
   320→         match ctrl_ch with
   321→         | Some ch ->
   322→           let ctrl_t = match binding_in_t with
   323→             | Product [_; Stream t] -> t
   324→             | _ -> Unit in
   325→           let ch_t = ch_type ch in
   326→           if not (Validate.type_eq type_env ch_t ctrl_t) then
   327→             failwith (Printf.sprintf
   328→               "spawn %s: ctrl_in channel type %s, expected %s"
   329→               sp.sp_process
   330→               (Pp.pp_type_expr type_env ch_t)
   331→               (Pp.pp_type_expr type_env ctrl_t))
   332→         | None -> ()
   333→       end;
   334→       (* Check telemetry type *)
   335→       if has_labeled "telemetry" && has_telemetry then begin
   336→         let telem_ch = List.find_map (function
   337→           | Labeled ("telemetry", ch) -> Some ch | _ -> None) sp.sp_args in
   338→         match telem_ch with
   339→         | Some ch ->
   340→           let telem_t = match binding_out_t with
   341→             | Product [_; Stream t] -> t
   342→             | _ -> Unit in
   343→           let ch_t = ch_type ch in
   344→           if not (Validate.type_eq type_env ch_t telem_t) then
   345→             failwith (Printf.sprintf
   346→               "spawn %s: telemetry channel type %s, expected %s"
   347→               sp.sp_process
   348→               (Pp.pp_type_expr type_env ch_t)
   349→               (Pp.pp_type_expr type_env telem_t))
   350→         | None -> ()
   351→       end
   352→     | Empty ->
   353→       (* Single channel (output); type inferred from channel decl *)
   354→       ()
   355→     | MergeAny | Tagger _ | Log _ ->
   356→       (* Compiler-generated morphisms — well-typed by construction *)
   357→       ()
   358→     | Map _ | Filter _ ->
   359→       (* Type consistency checked by validator at bind time *)
   360→       ()
   361→     | Tool _ | Lower _ | Builtin _ ->
   362→       (* Configured externally; types from binding signature *)
   363→       ()
   364→     | Plumb _ ->
   365→       (* Nested plumb: channel types checked when sub-pipeline compiles *)
   366→       ()
   367→     with Failure msg when msg = "nth" || msg = "List.nth" ->
   368→       failwith (Printf.sprintf "spawn %s: wrong number of channels \
   369→         (got %d)" sp.sp_process n_ch))
   370→  in
   371→
   372→  let find_binding name =
   373→    match List.find_opt (fun (b : binding) -> b.bd_name = name) pb.pb_bindings with
   374→    | Some b -> Some b
   375→    | None ->
   376→      List.find_opt (fun (b : binding) -> b.bd_name = name) prog.prog_bindings
   377→  in
   378→
   379→  (* Synthesise a binding for a direct structural morphism spawn
   380→     with proper types inferred from channel declarations. *)
   381→  let synthesise_structural sp_process channels =
   382→    let impl = match sp_process with
   383→      | "id" -> Id | "copy" -> Copy | "merge" -> Merge
   384→      | "discard" -> Discard | "empty" -> Empty | "barrier" -> Barrier
   385→      | "_format_json" -> FormatJson | "_parse_json" -> ParseJson
   386→      | name -> failwith (Printf.sprintf "unknown structural morphism: %s" name)
   387→    in
   388→    match impl with
   389→    | Id ->
   390→      let t = ch_type (List.nth channels 0) in
   391→      { bd_name = sp_process; bd_attrs = [];
   392→        input = Stream t; output = Stream t; impl }
   393→    | Copy ->
   394→      let t = ch_type (List.nth channels 0) in
   395→      { bd_name = sp_process; bd_attrs = [];
   396→        input = Stream t;
   397→        output = Stream (Product [t; t]); impl }
   398→    | Merge ->
   399→      let t0 = ch_type (List.nth channels 0) in
   400→      let t1 = ch_type (List.nth channels 1) in
   401→      let out_t = if Validate.type_eq type_env t0 t1 then t0 else Sum [t0; t1] in
   402→      { bd_name = sp_process; bd_attrs = [];
   403→        input = Stream (Product [t0; t1]);
   404→        output = Stream out_t; impl }
   405→    | Barrier ->
   406→      let n = List.length channels - 1 in
   407→      let in_types = List.init n (fun i -> ch_type (List.nth channels i)) in
   408→      { bd_name = sp_process; bd_attrs = [];
   409→        input = Stream (Product in_types);
   410→        output = Stream (Product in_types); impl }
   411→    | Discard ->
   412→      let t = ch_type (List.nth channels 0) in
   413→      { bd_name = sp_process; bd_attrs = [];
   414→        input = Stream t; output = Stream Unit; impl }
   415→    | Empty ->
   416→      let t = ch_type (List.nth channels 0) in
   417→      { bd_name = sp_process; bd_attrs = [];
   418→        input = Stream Unit; output = Stream t; impl }
   419→    | FormatJson ->
   420→      { bd_name = sp_process; bd_attrs = [];
   421→        input = Stream (Primitive Json);
   422→        output = Stream (Primitive String); impl }
   423→    | ParseJson ->
   424→      { bd_name = sp_process; bd_attrs = [];
   425→        input = Stream (Primitive String);
   426→        output = Stream (Primitive Json); impl }
   427→    | _ ->
   428→      { bd_name = sp_process; bd_attrs = [];
   429→        input = Unit; output = Unit; impl }
   430→  in
   431→
   432→  let processes = ref [] in
   433→  let routes = ref [] in
   434→
   435→  (* Per-binding-name counter for unique instance names.
   436→     Every spawn gets a numbered name: copy0, copy1, merge0, etc. *)
   437→  let name_counters = Hashtbl.create 8 in
   438→  let fresh_proc_name base =
   439→    let n = try Hashtbl.find name_counters base with Not_found -> 0 in
   440→    Hashtbl.replace name_counters base (n + 1);
   441→    Printf.sprintf "%s%d" base n
   442→  in
   443→
   444→  List.iter (fun (sp : spawn_stmt) ->
   445→    (* For named args, validate port names and resolve to positional
   446→       order before extracting channels.  For positional args,
   447→       pass through. *)
   448→    let has_named = List.exists (function Labeled _ -> true | _ -> false)
   449→      sp.sp_args in
   450→    let raw_channels = List.map (function
   451→      | Positional ch -> ch
   452→      | Labeled (_, ch) -> ch
   453→    ) sp.sp_args in
   454→
   455→    let binding = match find_binding sp.sp_process with
   456→      | Some b -> b
   457→      | None -> synthesise_structural sp.sp_process raw_channels
   458→    in
   459→
   460→    (* Validate named arg port names and resolve to positional order. *)
   461→    let channels =
   462→      if has_named then begin
   463→        let valid_ports = match binding.impl with
   464→          | Empty -> [| "output" |]
   465→          | Id | Map _ | Filter _ | Tagger _ | FormatJson | ParseJson
   466→          | Log _ | Project _ ->
   467→            [| "input"; "output" |]
   468→          | Copy -> [| "input"; "out0"; "out1" |]
   469→          | Merge | MergeAny -> [| "in0"; "in1"; "output" |]
   470→          | Discard -> [| "input"; "output" |]
   471→          | Barrier ->
   472→            let n = List.length raw_channels - 1 in
   473→            Array.init (n + 1) (fun i ->
   474→              if i < n then Printf.sprintf "in%d" i else "output")
   475→          | Agent _ ->
   476→            let has_control = match Validate.resolve type_env binding.input with
   477→              | Product [Stream _; Stream _] -> true | _ -> false in
   478→            let has_telemetry = match Validate.resolve type_env binding.output with
   479→              | Product [Stream _; Stream _] -> true | _ -> false in
   480→            (match has_control, has_telemetry with
   481→             | false, false -> [| "input"; "output" |]
   482→             | false, true  -> [| "input"; "output"; "telemetry" |]
   483→             | true,  false -> [| "input"; "ctrl_in"; "output"; "ctrl_out" |]
   484→             | true,  true  -> [| "input"; "ctrl_in"; "output"; "ctrl_out"; "telemetry" |])
   485→          | Plumb (ports, _) -> Array.of_list ports
   486→          | Tool _ | Lower _ | Builtin _ -> [||]
   487→        in
   488→        let named = List.filter_map (function
   489→          | Labeled (p, ch) -> Some (p, ch) | _ -> None) sp.sp_args in
   490→        List.iter (fun (p, _) ->
   491→          if not (Array.exists (fun n -> n = p) valid_ports) then
   492→            failwith (Printf.sprintf "spawn %s: unknown port '%s'"
   493→              sp.sp_process p)
   494→        ) named;
   495→        (* Reorder to positional based on port names *)
   496→        Array.to_list valid_ports |> List.map (fun port_name ->
   497→          match List.assoc_opt port_name named with
   498→          | Some ch -> ch
   499→          | None ->
   500→            failwith (Printf.sprintf
   501→              "spawn %s: missing required port '%s'"
   502→              sp.sp_process port_name))
   503→      end else
   504→        raw_channels
   505→    in
   506→
   507→    check_spawn_types ~sp ~binding ~channels;
   508→
   509→    let proc_name = fresh_proc_name sp.sp_process in
   510→    let binding = { binding with bd_name = proc_name } in
   511→    let kind = process_kind_of_impl binding.impl in
   512→
   513→    processes := {
   514→      proc_name;
   515→      proc_kind = kind;
   516→      proc_impl = binding.impl;
   517→      proc_binding = binding;
   518→    } :: !processes;
   519→
   520→    (* Determine port layout from the implementation kind.
   521→       Each channel argument maps to a port. Convention:
   522→       - Input ports: read from the named channel
   523→       - Output ports: write to the named channel
   524→
   525→       A route is created for each channel connecting it to the
   526→       process. The src/dst naming follows the data flow direction:
   527→       the channel's writer is the src, the reader is the dst. *)
   528→    let add_route ~src ~src_port ~dst ~dst_port ~channel_name =
   529→      routes := {
   530→        src; src_port; dst; dst_port;
   531→        kind = Point_to_point;
   532→        topic = channel_name;
   533→        via = None;
   534→      } :: !routes
   535→    in
   536→
   537→    (* For inline morphisms, the process reads from input channels and
   538→       writes to output channels. We model this as:
   539→       - Input channel: route from channel → process (process reads)
   540→       - Output channel: route from process → channel (process writes)
   541→
   542→       Since channels are named entities shared between processes, the
   543→       route connects the producing process's output to the consuming
   544→       process's input via the channel name. *)
   545→    match binding.impl with
   546→    | Id | FormatJson | ParseJson | Map _ | Filter _ | Tagger _ ->
   547→      (* 2 channels: [input, output] *)
   548→      let ch_in = List.nth channels 0 in
   549→      let ch_out = List.nth channels 1 in
   550→      add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
   551→        ~channel_name:ch_in;
   552→      add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
   553→        ~channel_name:ch_out
   554→
   555→    | Discard ->
   556→      (* 2 channels: [input, output(unit)] *)
   557→      let ch_in = List.nth channels 0 in
   558→      add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
   559→        ~channel_name:ch_in
   560→
   561→    | Empty ->
   562→      (* 1 channel: [output] *)
   563→      let ch_out = List.nth channels 0 in
   564→      add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
   565→        ~channel_name:ch_out
   566→
   567→    | Log _ ->
   568→      (* 1 channel: [input] *)
   569→      let ch_in = List.nth channels 0 in
   570→      add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
   571→        ~channel_name:ch_in
   572→
   573→    | Copy ->
   574→      (* 3 channels: [input, output0, output1] *)
   575→      let ch_in = List.nth channels 0 in
   576→      let ch_out0 = List.nth channels 1 in
   577→      let ch_out1 = List.nth channels 2 in
   578→      add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
   579→        ~channel_name:ch_in;
   580→      add_route ~src:proc_name ~src_port:"out0" ~dst:ch_out0 ~dst_port:"in"
   581→        ~channel_name:ch_out0;
   582→      add_route ~src:proc_name ~src_port:"out1" ~dst:ch_out1 ~dst_port:"in"
   583→        ~channel_name:ch_out1
   584→
   585→    | Merge | MergeAny ->
   586→      (* 3 channels: [input0, input1, output] *)
   587→      let ch_in0 = List.nth channels 0 in
   588→      let ch_in1 = List.nth channels 1 in
   589→      let ch_out = List.nth channels 2 in
   590→      add_route ~src:ch_in0 ~src_port:"out" ~dst:proc_name ~dst_port:"in0"
   591→        ~channel_name:ch_in0;
   592→      add_route ~src:ch_in1 ~src_port:"out" ~dst:proc_name ~dst_port:"in1"
   593→        ~channel_name:ch_in1;
   594→      add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
   595→        ~channel_name:ch_out
   596→
   597→    | Barrier ->
   598→      (* N+1 channels: [input0..inputN-1, output] *)
   599→      let n = List.length channels - 1 in
   600→      for i = 0 to n - 1 do
   601→        let ch = List.nth channels i in
   602→        add_route ~src:ch ~src_port:"out" ~dst:proc_name
   603→          ~dst_port:(Printf.sprintf "in%d" i)
   604→          ~channel_name:ch
   605→      done;
   606→      let ch_out = List.nth channels n in
   607→      add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
   608→        ~channel_name:ch_out
   609→
   610→    | Project n_proj ->
   611→      (* 2 channels: [input, output] *)
   612→      ignore n_proj;
   613→      let ch_in = List.nth channels 0 in
   614→      let ch_out = List.nth channels 1 in
   615→      add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
   616→        ~channel_name:ch_in;
   617→      add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
   618→        ~channel_name:ch_out
   619→
   620→    | Agent _ ->
   621→      (* Variable channels: data_in, [ctrl_in], data_out, [ctrl_out], [telemetry] *)
   622→      let n = List.length channels in
   623→      (* Minimum: input, output (2 channels)
   624→         With control: input, ctrl_in, output, ctrl_out (4)
   625→         With telemetry: +1 *)
   626→      if n >= 2 then begin
   627→        let ch_in = List.nth channels 0 in
   628→        add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"data_in"
   629→          ~channel_name:ch_in;
   630→        (* The output channel is at index 1 (no ctrl) or 2 (with ctrl) *)
   631→        let out_idx = if n >= 4 then 2 else 1 in
   632→        let ch_out = List.nth channels out_idx in
   633→        add_route ~src:proc_name ~src_port:"data_out" ~dst:ch_out ~dst_port:"in"
   634→          ~channel_name:ch_out;
   635→        (* Control channels if present *)
   636→        if n >= 4 then begin
   637→          let ch_ctrl_in = List.nth channels 1 in
   638→          let ch_ctrl_out = List.nth channels 3 in
   639→          add_route ~src:ch_ctrl_in ~src_port:"out" ~dst:proc_name ~dst_port:"ctrl_in"
   640→            ~channel_name:ch_ctrl_in;
   641→          add_route ~src:proc_name ~src_port:"ctrl_out" ~dst:ch_ctrl_out ~dst_port:"in"
   642→            ~channel_name:ch_ctrl_out
   643→        end;
   644→        (* Telemetry if present *)
   645→        let telem_idx = if n >= 4 then 4 else 2 in
   646→        if telem_idx < n then begin
   647→          let ch_telem = List.nth channels telem_idx in
   648→          add_route ~src:proc_name ~src_port:"telemetry" ~dst:ch_telem ~dst_port:"in"
   649→            ~channel_name:ch_telem
   650→        end
   651→      end
   652→
   653→    | Plumb (plumb_ports, _) ->
   654→      (* N port channels, matching the plumb body's port list *)
   655→      List.iteri (fun i ch ->
   656→        let port = List.nth plumb_ports i in
   657→        (* Determine direction from port index relative to input/output *)
   658→        add_route ~src:ch ~src_port:"out" ~dst:proc_name ~dst_port:port
   659→          ~channel_name:ch
   660→      ) channels
   661→
   662→    | Tool _ | Lower _ | Builtin _ ->
   663→      (* These are not spawned directly — they're invoked via tool dispatch *)
   664→      ()
   665→  ) pb.pb_spawns;
   666→
   667→  (* Channel-node routes are intermediate — resolve them into direct
   668→     process-to-process routes before the fabric sees them. *)
   669→  let intermediate_routes = List.rev !routes in
   670→  let resolved = resolve_channels
   671→    ~processes:(List.rev !processes)
   672→    ~boundary_ports:ports
   673→    ~channels:pb.pb_channels
   674→    intermediate_routes in
   675→  let routes = classify_channel_kinds resolved in
   676→  {
   677→    processes = List.rev !processes;
   678→    routes;
   679→    seeded_channels = seeded;
   680→  }
   681→
   682→(** Pretty-print a resolved routing table for debugging.
   683→    Expects channel intermediaries to be resolved; prints the [via]
   684→    field when present to show channel provenance. *)
   685→let pp_routing_table rt =
   686→  let buf = Buffer.create 256 in
   687→  Buffer.add_string buf "Processes:\n";
   688→  List.iter (fun pe ->
   689→    let impl_str = match pe.proc_impl with
   690→      | Id -> "id" | Copy -> "copy" | Merge -> "merge"
   691→      | MergeAny -> "merge_any" | Discard -> "discard" | Empty -> "empty"
   692→      | Barrier -> "barrier" | Map _ -> "map" | Filter _ -> "filter"
   693→      | Tagger _ -> "tagger" | Log _ -> "log" | FormatJson -> "format_json"
   694→      | ParseJson -> "parse_json" | Project n -> Printf.sprintf "project(%d)" n
   695→      | Agent _ -> "agent" | Plumb _ -> "plumb" | Tool _ -> "tool"
   696→      | Lower _ -> "lower" | Builtin _ -> "builtin"
   697→    in
   698→    Buffer.add_string buf (Printf.sprintf "  %s (%s, %s)\n" pe.proc_name
   699→      (match pe.proc_kind with Inline -> "inline" | Subprocess -> "subprocess")
   700→      impl_str)
   701→  ) rt.processes;
   702→  Buffer.add_string buf "Routes:\n";
   703→  List.iter (fun r ->
   704→    let via_str = match r.via with
   705→      | Some ch -> Printf.sprintf " via=%s" ch
   706→      | None -> ""
   707→    in
   708→    Buffer.add_string buf (Printf.sprintf "  %s:%s -> %s:%s [%s] topic=%s%s\n"
   709→      r.src r.src_port r.dst r.dst_port
   710→      (match r.kind with
   711→       | Point_to_point -> "push/pull"
   712→       | Fan_out -> "pub/sub"
   713→       | Bidirectional -> "pair")
   714→      r.topic via_str)
   715→  ) rt.routes;
   716→  if rt.seeded_channels <> [] then begin
   717→    Buffer.add_string buf "Seeded: ";
   718→    Buffer.add_string buf (String.concat ", " rt.seeded_channels);
   719→    Buffer.add_char buf '\n'
   720→  end;
   721→  Buffer.contents buf
   722→
Thinking

Let me check the test dune files and a few more references.

Read
{
  "file_path": "/Users/ww/src/plumbing/test/dune"
}
     1→(tests
     2→ (names test_parser test_validate test_plumb test_expr
     3→        test_session_compile)
     4→ (libraries language runtime alcotest yojson unix)
     5→ (deps
     6→  (glob_files fixtures/*.plumb)
     7→  ../bin/plumb/main.exe))
     8→
     9→(test
    10→ (name test_session_exec)
    11→ (libraries language runtime plumb alcotest yojson unix threads.posix)
    12→ (deps
    13→  ../bin/plumb/main.exe))
    14→
    15→(test
    16→ (name test_tool)
    17→ (libraries language runtime plumb alcotest yojson unix str threads.posix)
    18→ (deps
    19→  (glob_files fixtures/*.plumb)
    20→  ../bin/agent/main.exe))
    21→
    22→(test
    23→ (name test_agent_config)
    24→ (libraries alcotest unix str)
    25→ (deps
    26→  (glob_files fixtures/*.plumb)
    27→  ../bin/agent/main.exe))
    28→
    29→(test
    30→ (name test_agent_control)
    31→ (libraries alcotest unix str yojson)
    32→ (deps
    33→  (glob_files fixtures/*.plumb)
    34→  ../bin/agent/main.exe))
    35→
    36→(test
    37→ (name test_agent_eliza)
    38→ (libraries alcotest unix str yojson)
    39→ (deps
    40→  (glob_files fixtures/*.plumb)
    41→  ../bin/agent/main.exe))
    42→
    43→(test
    44→ (name test_pinning)
    45→ (libraries agent client alcotest yojson str))
    46→
    47→(test
    48→ (name test_docstore)
    49→ (libraries docstore alcotest unix))
    50→
    51→(test
    52→ (name test_render_group)
    53→ (libraries render alcotest))
    54→
    55→(test
    56→ (name test_error)
    57→ (libraries runtime alcotest yojson))
    58→
    59→(test
    60→ (name test_transport)
    61→ (libraries transport alcotest eio_main unix))
    62→
    63→(test
    64→ (name test_routing)
    65→ (libraries language runtime alcotest))
    66→
    67→(test
    68→ (name test_supervisor)
    69→ (libraries supervisor alcotest unix))
    70→
    71→(test
    72→ (name test_fabric)
    73→ (libraries fabric transport language runtime spawn_lib alcotest eio_main eio.unix unix yojson threads.posix)
    74→ (deps
    75→  ../bin/agent/main.exe))
    76→
Read
{
  "file_path": "/Users/ww/src/plumbing/doc/processes.md",
  "limit": 50
}
     1→# Processes
     2→
     3→Processes are the executable units of the plumbing language. Each process is a native binary that reads a `.plumb` spec file, validates I/O against the declared types, and communicates via JSON Lines on stdin/stdout.
     4→
     5→See [tools.md](tools.md) for tool semantics and the dispatch protocol. See [agent-runtime.md](agent-runtime.md) for environment variables, control channel, and telemetry.
     6→
     7→## Stream transducers
     8→
     9→A process with type `A → B` consumes a stream of `A`-typed messages and produces a stream of `B`-typed messages. The type signature governs the per-message schema, not the inter-message dependency.
    10→
    11→An agent is a *stateful* stream transducer: the n-th output may depend on all prior inputs, not just the n-th. Conversation context accumulates across the input stream by default. This is essential for pipelines with review loops — a composer receiving a revision request must remember the original task.
    12→
    13→The structural processes (`id`, `copy`, `merge`, `discard`, `empty`) are stateless.
    14→
    15→## Algebraic structure
    16→
    17→The category is a **copy-discard (CD) category**, not a Markov category. Every object is equipped with a commutative comonoid structure (copy and discard) compatible with tensor products. Discard is not natural over our morphisms — agents are nondeterministic and filters are not total — so the stricter Markov structure does not hold.
    18→
    19→Merge is additional structure: a monoid (many to one) interacting with copy's comonoid (one to many). Whether this interaction forms a bialgebra or something weaker remains open.
    20→
    21→**Session types.** Protocol declarations (see [protocols.md](protocols.md)) add a specification layer for channel ordering constraints. The session type primitives correspond to linear logic connectives: `send T` to ⊗ (tensor), `recv T` to ⅋ (par), `select` to ⊕ (sum), `offer` to & (with), `end` to the monoidal unit. The duality of session types (send ↔ recv, select ↔ offer) maps to the duality between copy/filter (selection) and merge (being-selected-for) in the existing algebra. This situates the plumbing category within *-autonomous categories.
    22→
    23→**`!(A, B)` vs `(!A, !B)`**: `!(A, B)` is a stream of synchronised pairs (one channel, barrier output). `(!A, !B)` is a pair of independent streams on separate file descriptors (agent telemetry, control channels). The distinction is load-bearing for understanding barrier, project, and agent port decomposition.
    24→
    25→## Composition
    26→
    27→### Sequential composition (`;`)
    28→
    29→`f ; g` pipes the output stream of `f` into the input stream of `g`. Each process maintains independent internal state. Type-checked: the output type of `f` must match the input type of `g`. Associative: `(f ; g) ; h = f ; (g ; h)`. Unit: `f ; id = id ; f = f`.
    30→
    31→### Tensor product (`⊗`)
    32→
    33→`f ⊗ g` runs `f` and `g` on independent channels with independent state. No shared state, no interference. `(A → B) ⊗ (C → D) = A ⊗ C → B ⊗ D`. Associative, with unit `I` (the monoidal unit).
    34→
    35→## Primitives
    36→
    37→### id : A → A
    38→
    39→The identity morphism. Validates each input line against type `A` and writes it to stdout. Stateless — each line is independent. Total.
    40→
    41→### agent : A → (T, B)
    42→
    43→An LLM conversation loop. Each input line becomes a user message; the model's response is validated against type `B`. If validation fails, the error is fed back to the model for retry (up to `max_retries`). The agent also produces a telemetry stream `T` — reasoning traces from extended thinking, timing data, etc. The `.telemetry` port is silently discarded unless explicitly wired to an observer.
    44→
    45→By default, conversation context accumulates: each input builds on the prior exchange. The model sees the full history of user inputs and its own successful responses.
    46→
    47→Retry messages (validation failures and re-prompts) are internal reductions — they are visible to the model during the current exchange but do not carry forward into the history. The clean history contains only: user input, successful assistant response.
    48→
    49→Config options:
    50→
Read
{
  "file_path": "/Users/ww/src/plumbing/doc/architecture.md",
  "limit": 80
}
     1→# Architecture
     2→
     3→Internal architecture for developers working on the plumbing language and runtime. For user-facing documentation, see the [README](../README.md).
     4→
     5→See [extending.md](extending.md) for how to add language features. See [executor.md](executor.md) for executor internals. See [mcp-server.md](mcp-server.md) for the MCP server. See [code-map.md](code-map.md) for a function-level reference.
     6→
     7→## Pipeline: parse → load → execute
     8→
     9→Every `.plumb` file follows a three-stage pipeline:
    10→
    11→```
    12→source text → parse → program AST
    13→                         ↓
    14→                    spec load    (validate types, find binding, desugar)
    15→                         ↓
    16→                      execute    (create channels, spawn processes)
    17→```
    18→
    19→For multi-file programs with `use` declarations, the pipeline includes module resolution:
    20→
    21→```
    22→source file → load_file → parse_raw → raw_program
    23→                             ↓
    24→                     resolve use decls   (recursive, with cycle detection)
    25→                             ↓
    26→                     prefix names        (rewrite_program: Mod.name)
    27→                             ↓
    28→                     merge imports       → flat program AST
    29→                             ↓
    30→                        spec load        (validate, find binding, desugar)
    31→```
    32→
    33→1. **Parse** (`lib/language/language.ml:parse`). Menhir parser produces a `program` AST (type declarations + protocol declarations + bindings). Value references (e.g., `agent my_config`) are resolved during parsing — the parser's `collect_program` resolves all `PI_value_ref` names before returning. Protocol well-formedness checking (guardedness, message type validity, label uniqueness, name uniqueness) also runs in `collect_program` (Pass 2b).
    34→
    35→   For multi-file programs, `parse_raw` returns a `raw_program` with `use` declarations and `module` name. The loader (`lib/language/loader.ml`) resolves imports recursively, prefixes all names with the module name, and merges into a flat `program`. Module-qualified names (e.g., `Fs.read`) are flat dotted strings — the dot has no structural significance outside the loader.
    36→
    37→2. **Spec load** (`lib/runtime/spec.ml`). Entry points:
    38→   - `load_binding` — finds a binding by impl kind (id, agent, discard). Used by standalone process binaries.
    39→   - `load_main` — finds the `main` binding from source text. Used for synthesised specs.
    40→   - `load_main_from_file` — loads from a file path, resolving `use` declarations. Used by the plumb and chat binaries.
    41→   - `load_main_from_string` — loads from source text (not a file), resolving `use` declarations against the search path. Used by the MCP server's `call` tool to execute pipelines submitted as source text.
    42→   - `load_program` — returns the full parsed program + type_env without selecting a binding. Used by the agent binary to access all bindings (its own Agent binding + Tool bindings in the spec).
    43→
    44→   All entry points: parse, build `type_env`, validate sum disjointness, check reserved names, enforce bare types on tool bindings. `load_main` additionally desugars wiring statements (see [extending.md § Desugaring](extending.md#desugaring-static--dynamic-form)).
    45→
    46→3. **Execute** (`lib/plumb/exec.ml:execute`). Takes a program and a main binding (with `Plumb` impl). Creates Unix pipes for all channels. Two-pass execution:
    47→   - Pass 1 (validate): resolve each spawn to a binding, type-check channels, track claimed fds.
    48→   - Pass 2 (execute): create threads (structural morphisms) and fork subprocesses (agents, nested plumbs).
    49→
    50→## Module structure
    51→
    52→```
    53→lib/language/          (library: language)
    54→  ast.ml                 AST types — the single source of truth for the IR
    55→  lexer.mll              ocamllex tokenizer → tokens
    56→  parser.mly             Menhir grammar → program AST
    57→  pp.ml                  Pretty-printer (AST → source text)
    58→  expr.ml                Expression evaluator for map/filter
    59→  validate.ml            JSON validation, type_env, type equality, disjointness
    60→  desugar_utils.ml       Shared helpers — fresh names, debug taps, agent port info
    61→  desugar_session.ml     Session wire desugaring — protocols → barrier chains + named channels
    62→  desugar.ml             Wiring statement desugaring (static → dynamic form)
    63→  loader.ml              Module loader — resolve use decls, prefix names, merge
    64→  compile_session.ml     Session type compiler — linear protocols → barrier chains
    65→  session_names.ml       Channel naming conventions for session protocols
    66→  language.ml            Public API — re-exports AST, exposes parse/pp/modules
    67→
    68→lib/runtime/           (library: runtime, depends on: language)
    69→  error.ml               ProcessError type + JSON serialization
    70→  log.ml                 Structured JSON logging to stderr
    71→  entry.ml               CLI envelope — read spec file, run main, exit codes
    72→  spec.ml                Spec loading — parse, validate, desugar, find binding
    73→
    74→lib/plumb/             (library: plumb, depends on: language, runtime, dune-site)
    75→  exec.ml                Pipeline executor — channels, spawns, inline morphisms
    76→  stdlib_path.ml         Search path assembly — PLUMB_PATH env + dune sites
    77→  resource_path.ml       Resource file search — PLUMB_RESOURCES env + dune sites
    78→  sites.ml               (generated) Dune sites module — stdlib + resource discovery
    79→
    80→lib/mcp_client/        (library: mcp_client, depends on: yojson, unix, threads, runtime)
Read
{
  "file_path": "/Users/ww/src/plumbing/lib/fabric/inline.ml",
  "limit": 30
}
     1→(** Inline morphism fibres — structural stream transformations over ZMQ.
     2→
     3→    Adapted from [Forward] (lib/plumb/forward.ml) to use the transport
     4→    layer instead of Unix file descriptors. Each morphism runs as an
     5→    Eio fibre, reading from PULL sockets and writing to PUSH sockets.
     6→
     7→    Fatal errors raise [Morphism_fatal] instead of calling [exit 1].
     8→    The supervisor catches this at the fibre boundary. *)
     9→
    10→open Language
    11→
    12→exception Morphism_fatal of string * (string * Yojson.Safe.t) list
    13→exception Stream_eof
    14→
    15→(** Truncate a JSON value for logging. *)
    16→let truncate_json ?(max_str=200) ?(max_items=5) json =
    17→  let rec trunc = function
    18→    | `String s when String.length s > max_str ->
    19→      `String (String.sub s 0 max_str ^ "...")
    20→    | `List items when List.length items > max_items ->
    21→      let kept = List.filteri (fun i _ -> i < max_items) items in
    22→      `List (List.map trunc kept @
    23→        [`String (Printf.sprintf "...(%d more)" (List.length items - max_items))])
    24→    | `List items -> `List (List.map trunc items)
    25→    | `Assoc fields -> `Assoc (List.map (fun (k, v) -> (k, trunc v)) fields)
    26→    | other -> other
    27→  in
    28→  trunc json
    29→
    30→let morphism_fatal label extras =
Thinking

Now I have a thorough understanding of the codebase. Let me set up a worktree and start implementing.

$ Bash
Create worktree for step 5c
git worktree add /tmp/plmb-037-step5c plumbing-1 2>&1 && echo "---OK---"