Implement the following plan:
The fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ
sockets, but agent subprocesses still communicate via stdin/stdout
with __port JSON envelopes. A bridge module (bridge.ml,
~445 lines) translates between the two.
Step 5c eliminates the bridge for agents by having the agent binary speak ZMQ directly — one socket per logical port, no envelopes.
Worktree: create from plumbing-1 branch
Writer-binds, reader-connects (architect + theory-critic). For agent output endpoints, the agent PUSH binds; the fabric's downstream PULL connects directly to it. For agent input endpoints, the upstream PUSH binds (as today); the agent PULL connects. Fabric skips creating any socket at agent-side endpoints. No intermediate sockets, no forwarding fibres, no close-and-rebind.
ZMQ-only — no transport abstraction. Loop.config is already
transport-agnostic via callbacks. The bridge IS the stdio compat
layer. A conditional branch in main() falls back to stdio if
PLUMB_ENDPOINT_INPUT is absent (for standalone/bridge use during
transition).
Thread→fibre simultaneous with ZMQ migration. No hybrid state.
Mutex/Condition → Eio.Mutex/Eio.Condition. Critical
invariant: never hold Eio.Mutex across ZMQ recv. One fibre per
socket, no sharing.
Startup handshake: agent sends {"status":"ready"} on ctrl_out
after connecting all sockets. Fabric waits before sending data.
Reaper fibre starts BEFORE ready-wait (theory-critic: crash before
ready → deadlock otherwise). Ready signal consumed by handshake,
not forwarded to ctrl_out consumer. 30s timeout.
Shutdown: (1) fabric sends EOF on data_in; (2) agent enters
drain phase; (3) agent handles pending control; (4) fabric sends
{"stop":true} then EOF on ctrl_in; (5) agent sends EOF on all
output sockets; (6) agent closes sockets, destroys context, exits;
(7) fabric detects EOF on PULLs, reaps child. Per-socket EOF
timeouts. Tool sockets remain active until ctrl_out EOF or agent
death.
Crash handling: concurrent reaper fibre (waitpid) races
normal shutdown via Eio.Fiber.first. On agent death, reaper
force-closes all fabric-side sockets for that agent. Hard backstop:
SIGTERM after N seconds, SIGKILL after N+5.
Back-pressure: ZMQ_SNDHWM = 0 (unlimited buffering) on all
agent sockets — deliberate simplification for LLM-speed workloads.
Not equivalent to pipe semantics (pipes block on full 64KB buffer;
HWM=0 never blocks, unbounded memory). Follow-on for telemetry
queue depth observability.
ZMQ handler in zmq_agent.ml — not in bridge.ml (architect:
bridge is a natural transformation; ZMQ handler is a direct
embedding).
PUSH/PULL for tool dispatch (not REQ/REP) — no strict
alternation; supports future parallel tool calls. Pipe adapters
to existing tool_dispatch_loop. Follow-on ticket for
correlation-ID routing.
PLUMB_ENDPOINT_* (matching implementation plan D8). Presence of
PLUMB_ENDPOINT_INPUT selects ZMQ mode.
Fabric Agent process
PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop
PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop
Fabric Agent process
PUSH ─bind──(ZMQ)──connect─ PULL → main fibre
PULL ────(ZMQ)──connect─bind─ PUSH ← main fibre
Writer binds, reader connects. No pipes, no envelopes, no demux thread.
lib/fabric/fabric.ml — modify socket creation loop (lines
100–150):
For routes involving Subprocess processes (checked via
proc_kind on process_entry), skip creating the agent-side
socket. Specifically:
src = subprocess: skip creating PUSH at
endpoint(src, src_port). Agent will bind its own PUSH there.dst = subprocess: skip creating PULL at
endpoint(dst, dst_port). Agent will connect its own PULL to
the upstream PUSH endpoint.The connect phase also skips these routes (no PULL to connect).
Extend process_handler callback:
?process_handler:(
sw:Eio.Switch.t ->
proc_entry:process_entry ->
push_sockets:(string * [`Push] T.socket) list ->
pull_sockets:(string * [`Pull] T.socket) list ->
endpoints:(string * string) list ->
unit)
push_sockets/pull_sockets: fabric-side sockets only (for
bridge path). endpoints: all port→URI mappings (for ZMQ path).
For bridge handlers, endpoints is ignored. For ZMQ handlers,
socket lists are [].
Update bridge's make_handler to accept ~endpoints:_.
lib/fabric/zmq_agent.ml (new)make_handler function:
Bridge.synthesise_spec or
extract shared helper)ZMQ_SNDHWM = 0.
Run pipe-adapter fibres to tool_dispatch_loop.PLUMB_ENDPOINT_* env vars + credentialswaitpid) — BEFORE ready-wait{"status":"ready"} on ctrl_out PULL (30s timeout;
if agent dies first, reaper cancels the wait)Use Eio.Fiber.first to race normal shutdown vs crash reaper.
On crash: force-close all fabric-side sockets for this agent.
Hard backstop: SIGTERM after 30s of no output EOF, SIGKILL +5s.
bin/agent/main.mlstdout_mutex, mux_write, start_stdin_demux (~90
lines)PLUMB_ENDPOINT_* env vars. If PLUMB_ENDPOINT_INPUT
absent, fall back to existing stdio path (preserves bridge
compatibility during transition).ZMQ_SNDHWM = 0 on all sockets{"status":"ready"} on ctrl_out PUSHT.recv data_in_sock instead of input_line;
raise End_of_file on empty payload (EOF sentinel)T.send data_out_sock, T.send telemetry_sock, etc.T.send tool_req_sock / T.recv tool_resp_sockT.recv ctrl_in_sockcontrol_state: Eio.Mutex.t + Eio.Condition.tlib/spawn/spawn_lib.ml:
- Add PLUMB_ENDPOINT_* to safe_env_keys
- Helper to compute env var list from port→endpoint mapping
bin/agent/dune: add transport to libraries.
test/test_fabric.ml — ZMQ agent tests:
1. test_agent_identity_zmq — identity response
2. test_agent_eof_zmq — EOF propagation
3. test_agent_eof_cascade_zmq — empty ; agent ; id terminates
(clean exit code + no zombies)
4. test_agent_telemetry_zmq — kind:config on telemetry socket,
NOT on data socket (channel separation)
5. test_agent_tool_dispatch_zmq — full chain through pipe adapter
6. test_agent_multi_message_zmq — 3 messages in, 3 responses in
order
7. test_agent_concurrent_data_ctrl_zmq — data + pause arrive
simultaneously; fibre synchronisation works
test/test_agent_control.ml — port key tests to ZMQ:
8. test_stop_zmq
9. test_stdin_eof_zmq — data EOF triggers drain
10. test_pause_resume_zmq — pause/get/set/resume flow
11. test_drain_zmq — drain completes protocol
12. test_set_memory_malformed_zmq
13. test_basic_response_zmq
Error/lifecycle tests:
14. test_missing_endpoint_env — clear error on missing env var
15. test_startup_ready_signal — white-box: assert
send_count_before_ready == 0
16. test_ipc_cleanup — no socket files remain after agent exit
Test infrastructure:
- New with_fabric_zmq_agent helper (NOT parameterised — separate
from with_fabric_agent)
- Extract transport-independent helpers to test_helpers.ml:
parse_and_compile, find_agent_binary, fresh_id, fixture,
JSON assertion helpers
- Set ZMQ_LINGER = 0 on all test sockets (prevent 30s hangs)
- Fun.protect wrapper for IPC cleanup on test failure
- Existing stdio tests serve as regression for fallback path
doc/agent-runtime.md — draft at step 2 (contracts before
code):
- New "ZMQ transport mode" subsection: socket summary table
(name, ZMQ type, bind/connect, message format), env var table
with detection logic, ready handshake contract (D5), shutdown
protocol contract (D6), back-pressure policy
- Update telemetry section (dedicated socket, no envelopes)
- Update streaming section (no stdin/stdout in ZMQ mode)
- Update control channel section (fibre, not thread; socket,
not fd)
doc/processes.md:
- Annotate "Multiplexed I/O" as stdio transport path
- Note Port names table applies to both paths (same logical
ports, different physical channels)
- Cross-reference agent-runtime.md ZMQ section
doc/architecture.md:
- Add lib/fabric/zmq_agent.ml to module table
- Update dependency diagram: bin/agent → transport
| File | Change |
|---|---|
lib/fabric/fabric.ml |
Socket bifurcation; extend callback |
lib/fabric/zmq_agent.ml |
New — ZMQ process handler |
bin/agent/main.ml |
ZMQ rewrite; threads→fibres |
bin/agent/dune |
Add transport dependency |
lib/spawn/spawn_lib.ml |
Endpoint env var helpers |
test/test_fabric.ml |
7 new ZMQ agent tests |
test/test_agent_control.ml |
6 control tests ported to ZMQ |
test/test_zmq_errors.ml |
3 error/lifecycle tests |
test/test_helpers.ml |
New — shared test infrastructure |
doc/agent-runtime.md |
ZMQ transport mode subsection |
doc/processes.md |
Annotate Multiplexed I/O |
doc/architecture.md |
Module table, dependency graph |
Documentation contracts — draft ZMQ transport mode subsection in agent-runtime.md: env vars, ready handshake, shutdown protocol, socket table. These are design contracts; writing them first reduces drift.
Fabric socket bifurcation — modify fabric.ml to skip
agent-side sockets for subprocess processes; extend callback
with endpoints. Update bridge's make_handler to accept new
arg. dune build && dune test — all existing tests pass.
ZMQ handler — zmq_agent.ml: spec synthesis, tool dispatch
wiring, endpoint env vars, agent spawn, reaper, ready handshake.
dune build.
Agent binary rewrite — main.ml: ZMQ sockets, daemon fibres,
Eio.Mutex/Condition, ready signal, shutdown protocol, stdio
fallback. dune build.
Integration + tests — wire ZMQ handler into fabric, extract
test helpers, run all 16 tests + existing suite.
dune build && dune test.
Documentation finalisation — update processes.md, architecture.md; review agent-runtime.md against implementation.
dune build && dune test — all existing bridge
tests pass with extended callbackdune build — agent binary compiles with ZMQempty ; agent ; id terminates cleanly (test 3)doc/executor.md scope note for bridge/stdio pathIf you need specific details from before exiting plan mode (like exact code snippets, error messages, or content you generated), read the full transcript at: /Users/ww/.claude/projects/-Users-ww-src-plumbing/e80e21f4-2105-4797-9e0e-4a90f6bb6fc8.jsonl
I'll start by reading the engineering state and key source files in parallel to understand the current codebase before setting up a worktree.
{
"query": "select:mcp__claude_ai_LDC__read,mcp__claude_ai_LDC__whoareyou",
"max_results": 2
}{
"type": "tool_reference",
"tool_name": "mcp__claude_ai_LDC__read"
}{
"type": "tool_reference",
"tool_name": "mcp__claude_ai_LDC__whoareyou"
}{"success":true,"content":"# Plumbing — Engineering State\n\nLast updated: 2026-03-14 by Research (opus) — PLMB-056 closed\n\n## Current Work\n\n**PLMB-037: ZeroMQ supervisor + router runtime** — replacing the\npipe-based runtime with ZMQ message fabric.\n\n**Branch:** `plmb-037-step5b` (pending merge to main)\n\n### Completed Steps\n\n**Step 0** (`6125c7a`): ZMQ transport layer\n- `lib/transport/` with `TRANSPORT` module type and ZMQ backend\n- 6 smoke tests (PUSH/PULL, PUB/SUB, PAIR, FIFO, concurrent, try_recv)\n- Custom nix derivation for zmq-eio at `nix/zmq-eio.nix`\n\n**Step 2** (`8bcab98`): Routing table compiler\n- `lib/runtime/routing.ml` — compiles plumb body spawn lists into\n routing tables with process entries and routes\n- Channel kind classification, proper type synthesis for structural\n morphisms\n- 5 tests\n\n**Step 3** (`f622b60`): Supervisor\n- `lib/supervisor/supervisor.ml` — spawn, kill (three-tier), ordered\n shutdown\n- 7 tests\n\n**Step 4 partial** (`6452f28`): Fabric and inline morphisms\n- `lib/fabric/fabric.ml` — wires ZMQ sockets from routing table,\n starts inline morphism fibres\n- `lib/fabric/inline.ml` — all morphisms from `forward.ml` adapted\n to ZMQ transport\n- **Identity pipeline works end-to-end through ZMQ**\n- Copy+merge test disabled — merge synchronisation needs investigation\n\n**Step 5a** (`ec37ed6`): Agent subprocess bridge\n- `lib/spawn/spawn_lib.ml` — extracted from `lib/plumb/spawn.ml`,\n dune-site-free (bin_dir and resource_find injected by callers)\n- `lib/plumb/spawn.ml` — thin wrapper injecting dune-site paths\n- `lib/fabric/bridge.ml` — stdio mux/demux adapter for agent\n subprocesses (natural transformation between ZMQ sockets and\n __port envelope protocol)\n- `fabric.ml` gains `?process_handler` callback, re-exports Bridge\n- **Agent subprocesses work end-to-end through the fabric**\n- 2 agent tests: identity response + EOF propagation (ELIZA mock)\n\n**Step 5b** (`3139c91`): Exec→Fabric switchover fixes\n- Three fixes: `List.nth` guard broadened (`\"nth\" || \"List.nth\"`),\n `send_eof` wrapped in transport error handler, test relocation\n for PLMB-053\n- 87 tests pass\n- Post-review follow-up: narrowed `with _ -> ()` to\n `Transport.Transport_error _` in fabric.ml, fixed ⊥ comment in\n routing.ml\n- Filed 6 follow-on tickets: PLMB-054..059\n\n**Step 5b follow-up** (`454f96f`): PLMB-054, 057, 058\n- bridge.ml: narrowed telemetry observation callback catch from\n `with _ ->` to `Yojson.Json_error` (PLMB-054)\n- routing.ml: named-arg validation now raises clear \"missing\n required port\" error instead of cryptic `List.nth` failure\n (PLMB-057)\n- test_routing.ml: 6 new validation tests — dead ports, discard\n output type, project non-product, wrong channel count, missing\n named port, ctrl_in on non-product agent (PLMB-058)\n- Routing tests now total 16 (6 compile + 6 validation + 4 resolution)\n\n**Step 5b follow-up** (`b88a546`): PLMB-055\n- routing.ml: extracted ~130-line type validation block from main\n spawn loop into named local function `check_spawn_types`\n- Replaced wildcard `| _ -> ()` with explicit match arms for all\n `process_impl` variants (exhaustiveness warning on new variants)\n- Pure refactor, no behavioural change; all 16 routing tests pass\n\n**PLMB-056** (`379f96b`): Empty morphism + EOF correctness\n- Added `Empty` variant to `process_impl` — dual of `Discard`\n- `forward_empty` in `inline.ml`: sends EOF via `Bridge.send_eof`\n- Fixed `recv_loop`: empty payload → `Stream_eof` (was silently\n skipping — contradicted Bridge's documented EOF protocol)\n- Added EOF propagation to all 12 output morphisms (id, copy, merge,\n merge_any, barrier, map, filter, project, tagger, format_json,\n parse_json, empty)\n- Desugarer: `insert_empty_for_writerless` auto-inserts empty for\n channels declared but never referenced in any spawn\n- Routing compiler: writer-only and reader-only channels promoted\n from warnings to compile-time errors with actionable messages\n- Removed `dead_ports` from `routing_table` type\n- Removed Phase 3b (dead port EOF seeding) from fabric\n- 3 new tests (empty auto-insert, writer-only error, explicit empty)\n- Routing tests now total 18 (6 compile + 8 validation + 4 resolution)\n- Documentation updated: processes.md, executor.md, architecture.md\n\n### Key Findings\n\n- **`List.nth` on OCaml 5.4.0**: raises `Failure \"nth\"`, NOT\n `Invalid_argument`. The broadened guard handles both for\n defensive compatibility.\n- **zmq-eio abandoned**: zmq-eio 5.3.0 has two fatal issues:\n (1) `send_all` pushes to receivers queue (deadlock), (2) event loop\n fibres are non-daemon and prevent Eio switch exit. Replaced with\n direct ZMQ + `Eio_unix.await_readable`/`await_writable` — simpler,\n properly cancellable, no persistent event loops.\n- **Socket cleanup**: with fd-based polling, sockets can be closed\n directly via `Zmq.Socket.close`. No event loop deadlock.\n `Context.terminate` should NOT be called while daemon fibres may\n still hold socket references — causes hangs.\n- **ipc:// directories**: all endpoint directories must be created\n before socket bind. The fabric creates dirs for all src/dst names\n in the routing table, not just process entries.\n- **Bridge Eio threading model**: ZMQ recv/send must run in Eio fibre\n context (uses `Eio_unix.await_readable/writable`). Pipe I/O must\n use `Eio_unix.run_in_systhread`. Mux fibres alternate: T.recv\n (Eio) → run_in_systhread(Mutex + pipe write). Demux:\n run_in_systhread(input_line) → T.send (Eio).\n- **Binary resolution in tests**: `Spawn_lib.resolve_binary` uses\n `Sys.executable_name` for build-tree sibling lookup, which fails\n from test binaries. Bridge accepts `?resolve_bin` override for\n tests to supply the agent binary path directly.\n- **EOF propagation**: `recv_loop` was silently skipping empty\n payloads — contradicting Bridge's documented protocol (bridge.ml:\n 20-21). Fixed: empty payload raises `Stream_eof`. All output\n morphisms now propagate EOF downstream via `Bridge.send_eof` after\n `recv_loop` returns.\n\n### Next Steps\n\n1. Merge step 5b branch to main\n2. **Step 5c**: Agent binary ZMQ rewrite (separate ticket)\n3. Step 5d: Cleanup — delete exec.ml, forward.ml\n4. Documentation (Step 6) — see PLMB-059\n\nPlan: [plmb-037-implementation-plan.md](research/plumbing/design/plmb-037-implementation-plan.md)\n\n## Build\n\n- Build: `nix develop --command dune build`\n- **CRITICAL**: Must run from **worktree directory**, not the repo\n root\n- Test: same prefix + `dune test`\n- **test_plumb.ml integration tests**: fail in worktrees because\n they hardcode `../bin/plumb/main.exe` relative path. Pre-existing\n issue, not related to PLMB-037.\n- **Dev binary**: `_build/default/bin/plumb/main.exe`\n- **Worktree build**: always use `--root=.`\n- **zmq-eio nix**: custom derivation at `nix/zmq-eio.nix` (still\n in deps for nix build, but transport lib no longer uses it —\n uses raw `zmq` + `eio.unix` instead)\n\nFor packaging, release, Docker, and Nix details see\n[build-reference.md](research/plumbing/build-reference.md).\n\n## Key Files\n\n- **AST**: `lib/language/ast.ml`\n- **Parser**: `lib/language/parser.mly` (Menhir)\n- **Session compiler**: `lib/language/compile_session.ml`\n- **Desugarer**: `lib/language/desugar.ml`\n- **Validator**: `lib/language/validate.ml`\n- **Spec loader**: `lib/runtime/spec.ml`\n- **Routing table**: `lib/runtime/routing.ml` (PLMB-037)\n- **Transport**: `lib/transport/transport_intf.ml`, `transport_zmq.ml`\n- **Supervisor**: `lib/supervisor/supervisor.ml`\n- **Fabric**: `lib/fabric/fabric.ml`, `inline.ml`\n- **Bridge**: `lib/fabric/bridge.ml` (Step 5a)\n- **Spawn lib**: `lib/spawn/spawn_lib.ml` (extracted Step 5a)\n- **Spawn wrapper**: `lib/plumb/spawn.ml` (thin dune-site wrapper)\n- **Tool dispatch**: `lib/plumb/tool_dispatch.ml` (~700 lines)\n- **Executor**: `lib/plumb/exec.ml` (~1540 lines) — being replaced\n- **Forwarding**: `lib/plumb/forward.ml` (~530 lines) — being replaced\n- **Tests**: transport (6), routing (18), supervisor (7), fabric (4),\n plus all existing tests\n\n## Active Tickets\n\n- **PLMB-037**: supervisor + router runtime (Step 5b done, 5c next)\n- **PLMB-041**: select+loop session protocol deadlock\n- **PLMB-043**: dead Copy for ST_end branches\n- **PLMB-059**: documentation for fabric switchover (low)\n"} 1→(** Fabric — wire ZMQ sockets from a routing table and run the pipeline.
2→
3→ The fabric is the runtime core. It:
4→ 1. Creates ZMQ sockets for all routes in the routing table
5→ 2. Binds and connects sockets per the wiring instructions
6→ 3. Seeds channels with initial tokens (D7)
7→ 4. Starts inline morphism fibres
8→ 5. Coordinates subprocess spawning via the supervisor
9→
10→ Startup sequence (D7, bind-before-connect for inproc://):
11→ 1. Create ZMQ context
12→ 2. Bind phase: create and bind all sockets, seed channels
13→ 3. Connect phase: connect all consumer sockets
14→ 4. Start phase: start inline fibres, supervisor spawns subprocesses
15→
16→ See PLMB-037 Step 4, design decisions D6–D9. *)
17→
18→open Language
19→
20→module T = Transport.Zmq
21→
22→(** A running pipeline handle. *)
23→type t = {
24→ ctx : T.context;
25→ supervisor : Supervisor.t;
26→ push_sockets : (string, [`Push] T.socket) Hashtbl.t;
27→ pull_sockets : (string, [`Pull] T.socket) Hashtbl.t;
28→ pipeline_id : string;
29→ cancel : unit -> unit;
30→ (** Cancel function: fails the inner switch, causing all socket
31→ fibres and morphism fibres to terminate. *)
32→}
33→
34→(** Endpoint naming for ipc:// transport.
35→ Format: ipc:///tmp/plumb-{pid}/{process}/{port} *)
36→let endpoint ~pipeline_id proc_name port_name =
37→ Printf.sprintf "ipc:///tmp/plumb-%s/%s/%s" pipeline_id proc_name port_name
38→
39→(** Get the PUSH socket for writing to a boundary input port.
40→ The caller sends data on this socket to feed the pipeline. *)
41→let boundary_input t port_name =
42→ let ep = endpoint ~pipeline_id:t.pipeline_id port_name "out" in
43→ Hashtbl.find t.push_sockets ep
44→
45→(** Get the PULL socket for reading from a boundary output port.
46→ The caller receives data from this socket. *)
47→let boundary_output t port_name =
48→ let ep = endpoint ~pipeline_id:t.pipeline_id port_name "in" in
49→ Hashtbl.find t.pull_sockets ep
50→
51→(** Create the fabric directory for ipc:// endpoints. *)
52→let ensure_fabric_dir pipeline_id =
53→ let dir = Printf.sprintf "/tmp/plumb-%s" pipeline_id in
54→ (try Unix.mkdir dir 0o700 with Unix.Unix_error (Unix.EEXIST, _, _) -> ());
55→ dir
56→
57→(** Clean up the fabric directory. *)
58→let cleanup_fabric_dir pipeline_id =
59→ let dir = Printf.sprintf "/tmp/plumb-%s" pipeline_id in
60→ let cleanup_dir d =
61→ try
62→ let entries = Sys.readdir d in
63→ Array.iter (fun f ->
64→ try Sys.remove (Filename.concat d f) with Sys_error _ -> ()
65→ ) entries;
66→ (try Unix.rmdir d with Unix.Unix_error _ -> ())
67→ with Sys_error _ -> ()
68→ in
69→ (* Remove per-process subdirectories *)
70→ (try
71→ let entries = Sys.readdir dir in
72→ Array.iter (fun sub ->
73→ let path = Filename.concat dir sub in
74→ if Sys.is_directory path then cleanup_dir path
75→ ) entries
76→ with Sys_error _ -> ());
77→ cleanup_dir dir
78→
79→(** Execute a pipeline from a routing table.
80→
81→ This is the main entry point. It replaces [Exec.execute]. *)
82→let execute ~sw ~routing_table ~type_env
83→ ?(pipeline_id = string_of_int (Unix.getpid ()))
84→ ?process_handler () =
85→ let ctx = T.create_context () in
86→ let _fabric_dir = ensure_fabric_dir pipeline_id in
87→ let sup = Supervisor.create () in
88→
89→ (* Create subdirectories for all endpoints (processes and channels) *)
90→ let all_names = Hashtbl.create 16 in
91→ List.iter (fun (r : Runtime.Routing.route) ->
92→ Hashtbl.replace all_names r.src ();
93→ Hashtbl.replace all_names r.dst ()
94→ ) routing_table.Runtime.Routing.routes;
95→ Hashtbl.iter (fun name () ->
96→ let dir = Printf.sprintf "/tmp/plumb-%s/%s" pipeline_id name in
97→ (try Unix.mkdir dir 0o700 with Unix.Unix_error (Unix.EEXIST, _, _) -> ())
98→ ) all_names;
99→
100→ (* === Phase 1: Create and bind sockets ===
101→
102→ For each route, create a socket pair (sender, receiver).
103→ The sender binds; the receiver connects.
104→ Routes are keyed by (src, src_port) → sender socket,
105→ and (dst, dst_port) → receiver socket. *)
106→
107→ (* Socket tables: endpoint → socket *)
108→ let push_sockets : (string, [`Push] T.socket) Hashtbl.t = Hashtbl.create 16 in
109→ let pull_sockets : (string, [`Pull] T.socket) Hashtbl.t = Hashtbl.create 16 in
110→
111→ (* For each route, ensure sender and receiver sockets exist *)
112→ List.iter (fun (r : Runtime.Routing.route) ->
113→ let ep = endpoint ~pipeline_id r.src r.src_port in
114→ if not (Hashtbl.mem push_sockets ep) then begin
115→ let sock = match r.kind with
116→ | Runtime.Routing.Fan_out ->
117→ (* PUB/SUB for copy — but we use PUSH/PULL per-destination
118→ since our single-frame encoding doesn't support ZMQ topic
119→ filtering. The routing table handles fan-out by creating
120→ multiple routes from the same source. *)
121→ let s = T.push_socket ctx ~sw in
122→ T.bind s ep;
123→ s
124→ | _ ->
125→ let s = T.push_socket ctx ~sw in
126→ T.bind s ep;
127→ s
128→ in
129→ Hashtbl.replace push_sockets ep sock
130→ end;
131→ let ep_dst = endpoint ~pipeline_id r.dst r.dst_port in
132→ if not (Hashtbl.mem pull_sockets ep_dst) then begin
133→ let sock = T.pull_socket ctx ~sw in
134→ Hashtbl.replace pull_sockets ep_dst sock
135→ end
136→ ) routing_table.Runtime.Routing.routes;
137→
138→ (* === Phase 2: Connect sockets === *)
139→
140→ List.iter (fun (r : Runtime.Routing.route) ->
141→ let ep = endpoint ~pipeline_id r.src r.src_port in
142→ let ep_dst = endpoint ~pipeline_id r.dst r.dst_port in
143→ match Hashtbl.find_opt pull_sockets ep_dst with
144→ | Some pull_sock -> T.connect pull_sock ep
145→ | None ->
146→ Runtime.Log.warn "fabric"
147→ ["event", `String "no_pull_socket";
148→ "dst", `String r.dst;
149→ "dst_port", `String r.dst_port]
150→ ) routing_table.Runtime.Routing.routes;
151→
152→ (* === Phase 3: Seed channels (D7) ===
153→ Send null token on seeded channels before starting fibres. *)
154→
155→ List.iter (fun ch_name ->
156→ (* Find the PUSH socket that writes to this channel *)
157→ let matching_routes = List.filter (fun (r : Runtime.Routing.route) ->
158→ r.topic = ch_name && r.src <> ch_name
159→ ) routing_table.Runtime.Routing.routes in
160→ match matching_routes with
161→ | r :: _ ->
162→ let ep = endpoint ~pipeline_id r.src r.src_port in
163→ (match Hashtbl.find_opt push_sockets ep with
164→ | Some sock -> T.send sock ~topic:ch_name "null"
165→ | None ->
166→ Runtime.Log.warn "fabric"
167→ ["event", `String "seed_no_socket";
168→ "channel", `String ch_name])
169→ | [] ->
170→ Runtime.Log.warn "fabric"
171→ ["event", `String "seed_no_route";
172→ "channel", `String ch_name]
173→ ) routing_table.Runtime.Routing.seeded_channels;
174→
175→ (* === Phase 4: Start inline morphism fibres ===
176→
177→ Each inline morphism reads from its input PULL socket(s) and
178→ writes to its output PUSH socket(s). Subprocess-boundary
179→ processes are handled by the supervisor (deferred — agents
180→ need the agent binary rewrite from Step 5). *)
181→
182→ let find_push proc_name port =
183→ let ep = endpoint ~pipeline_id proc_name port in
184→ match Hashtbl.find_opt push_sockets ep with
185→ | Some s -> s
186→ | None -> failwith (Printf.sprintf "fabric: no PUSH socket for %s:%s" proc_name port)
187→ in
188→ let find_pull proc_name port =
189→ let ep = endpoint ~pipeline_id proc_name port in
190→ match Hashtbl.find_opt pull_sockets ep with
191→ | Some s -> s
192→ | None -> failwith (Printf.sprintf "fabric: no PULL socket for %s:%s" proc_name port)
193→ in
194→
195→ List.iter (fun (pe : Runtime.Routing.process_entry) ->
196→ match pe.proc_kind with
197→ | Runtime.Routing.Subprocess ->
198→ (match process_handler with
199→ | Some handler ->
200→ (* Collect sockets for this subprocess.
201→ PUSH sockets: routes where src = proc_name (process writes)
202→ PULL sockets: routes where dst = proc_name (process reads) *)
203→ let proc_push = List.filter_map (fun (r : Runtime.Routing.route) ->
204→ if r.src = pe.proc_name then
205→ let ep = endpoint ~pipeline_id r.src r.src_port in
206→ match Hashtbl.find_opt push_sockets ep with
207→ | Some s -> Some (r.src_port, s)
208→ | None -> None
209→ else None
210→ ) routing_table.Runtime.Routing.routes in
211→ let proc_pull = List.filter_map (fun (r : Runtime.Routing.route) ->
212→ if r.dst = pe.proc_name then
213→ let ep = endpoint ~pipeline_id r.dst r.dst_port in
214→ match Hashtbl.find_opt pull_sockets ep with
215→ | Some s -> Some (r.dst_port, s)
216→ | None -> None
217→ else None
218→ ) routing_table.Runtime.Routing.routes in
219→ handler ~sw ~proc_entry:pe
220→ ~push_sockets:proc_push ~pull_sockets:proc_pull
221→ | None ->
222→ Runtime.Log.info "fabric"
223→ ["event", `String "subprocess_deferred";
224→ "name", `String pe.proc_name])
225→ | Runtime.Routing.Inline ->
226→ Eio.Fiber.fork_daemon ~sw (fun () ->
227→ (try match pe.proc_impl with
228→ | Id ->
229→ let msg_type = Validate.unwrap_stream pe.proc_binding.input in
230→ Inline.forward_id ~name:pe.proc_name type_env msg_type
231→ (find_pull pe.proc_name "in")
232→ (find_push pe.proc_name "out")
233→ | Copy ->
234→ let msg_type = Validate.unwrap_stream pe.proc_binding.input in
235→ Inline.forward_copy ~name:pe.proc_name type_env msg_type
236→ (find_pull pe.proc_name "in")
237→ (find_push pe.proc_name "out0")
238→ (find_push pe.proc_name "out1")
239→ | Merge ->
240→ let inp = Validate.unwrap_stream pe.proc_binding.input in
241→ let in0_t, in1_t = match Validate.resolve type_env inp with
242→ | Product [t0; t1] -> (t0, t1)
243→ | t -> (t, t)
244→ in
245→ Inline.forward_merge ~name:pe.proc_name type_env in0_t in1_t
246→ (find_pull pe.proc_name "in0")
247→ (find_pull pe.proc_name "in1")
248→ (find_push pe.proc_name "out")
249→ | MergeAny ->
250→ let inp = Validate.unwrap_stream pe.proc_binding.input in
251→ let in0_t, in1_t = match Validate.resolve type_env inp with
252→ | Product [t0; t1] -> (t0, t1)
253→ | t -> (t, t)
254→ in
255→ Inline.forward_merge_any ~name:pe.proc_name type_env in0_t in1_t
256→ (find_pull pe.proc_name "in0")
257→ (find_pull pe.proc_name "in1")
258→ (find_push pe.proc_name "out")
259→ | Barrier ->
260→ let inp = Validate.unwrap_stream pe.proc_binding.input in
261→ let in_types = match Validate.resolve type_env inp with
262→ | Product ts -> ts
263→ | _ -> failwith "barrier: expected product input"
264→ in
265→ let n = List.length in_types in
266→ let recv_socks = List.init n (fun i ->
267→ find_pull pe.proc_name (Printf.sprintf "in%d" i)) in
268→ Inline.forward_barrier ~name:pe.proc_name type_env
269→ in_types recv_socks
270→ (find_push pe.proc_name "out")
271→ | Discard ->
272→ let msg_type = Validate.unwrap_stream pe.proc_binding.input in
273→ Inline.drain_discard ~name:pe.proc_name type_env msg_type
274→ (find_pull pe.proc_name "in")
275→ | Empty ->
276→ Inline.forward_empty ~name:pe.proc_name
277→ (find_push pe.proc_name "out")
278→ | Log label ->
279→ Inline.forward_log label
280→ (find_pull pe.proc_name "in")
281→ | FormatJson ->
282→ Inline.forward_format_json ~name:pe.proc_name
283→ (find_pull pe.proc_name "in")
284→ (find_push pe.proc_name "out")
285→ | ParseJson ->
286→ Inline.forward_parse_json ~name:pe.proc_name
287→ (find_pull pe.proc_name "in")
288→ (find_push pe.proc_name "out")
289→ | Map expr ->
290→ let in_type = Validate.unwrap_stream pe.proc_binding.input in
291→ let out_type = Validate.unwrap_stream pe.proc_binding.output in
292→ Inline.forward_map ~name:pe.proc_name type_env
293→ in_type out_type expr
294→ (find_pull pe.proc_name "in")
295→ (find_push pe.proc_name "out")
296→ | Filter expr ->
297→ let msg_type = Validate.unwrap_stream pe.proc_binding.input in
298→ Inline.forward_filter ~name:pe.proc_name type_env msg_type expr
299→ (find_pull pe.proc_name "in")
300→ (find_push pe.proc_name "out")
301→ | Project n ->
302→ Inline.forward_project ~name:pe.proc_name n
303→ (find_pull pe.proc_name "in")
304→ (find_push pe.proc_name "out")
305→ | Tagger (indices, label) ->
306→ Inline.forward_tagger ~name:pe.proc_name indices label
307→ (find_pull pe.proc_name "in")
308→ (find_push pe.proc_name "out")
309→ | Agent _ | Plumb _ ->
310→ (* Should not reach here — filtered by proc_kind *)
311→ failwith "unreachable: subprocess in inline dispatch"
312→ | Tool _ | Lower _ | Builtin _ ->
313→ failwith "unreachable: tool binding in inline dispatch"
314→ with
315→ | Inline.Morphism_fatal (label, _extras) ->
316→ Runtime.Log.error "fabric"
317→ ["event", `String "morphism_fatal";
318→ "morphism", `String label]
319→ | Transport.Transport_error (_, msg) ->
320→ Runtime.Log.debug "fabric"
321→ ["event", `String "transport_closed";
322→ "morphism", `String pe.proc_name;
323→ "detail", `String msg]);
324→ `Stop_daemon)
325→ ) routing_table.Runtime.Routing.processes;
326→
327→ (* The cancel function: force-close all sockets. This closes the
328→ underlying ZMQ sockets directly, bypassing zmq-eio's close
329→ protocol. The zmq-eio event loops will fail on their next
330→ operation and terminate, allowing the Eio switch to exit. *)
331→ let cancel () =
332→ Hashtbl.iter (fun _ s ->
333→ (try T.force_close s with _ -> ())) push_sockets;
334→ Hashtbl.iter (fun _ s ->
335→ (try T.force_close s with _ -> ())) pull_sockets
336→ in
337→ { ctx; supervisor = sup; push_sockets; pull_sockets; pipeline_id; cancel }
338→
339→(** Re-export Bridge for external access. *)
340→module Bridge = Bridge
341→
342→(** Shut down the pipeline.
343→ Closes all sockets (triggering recv timeout/failure in system
344→ threads), then destroys the ZMQ context and cleans up. *)
345→let shutdown t =
346→ let _ = Supervisor.shutdown t.supervisor in
347→ t.cancel ();
348→ (* Wait for system threads blocked in recv to notice the socket
349→ closure (up to the 1s receive timeout) *)
350→ Unix.sleepf 1.5;
351→ T.destroy_context t.ctx;
352→ cleanup_fabric_dir t.pipeline_id
353→
354→(** Lightweight cleanup after orderly shutdown. Use this when
355→ [cancel()] was already called and all fibres have exited
356→ (e.g. after the Eio switch exits). Skips the 1.5s sleep
357→ since no system threads are outstanding. *)
358→let cleanup t =
359→ let _ = Supervisor.shutdown t.supervisor in
360→ T.destroy_context t.ctx;
361→ cleanup_fabric_dir t.pipeline_id
362→ 1→(** Bridge — stdio mux/demux adapter for agent subprocesses.
2→
3→ Natural transformation between ZMQ topic-framed sockets and the
4→ agent binary's port-multiplexed stdin/stdout protocol. The agent
5→ sees the same [{"__port":...,"msg":...}] envelopes as under exec.ml.
6→
7→ All pipe writes use [Mutex] + [Eio_unix.run_in_systhread] (R2).
8→
9→ The [__port] envelope protocol is confined to this module and the
10→ agent binary — it must never appear in [fabric.ml] or [inline.ml]
11→ (R7: the bridge is NOT a morphism, it is a natural transformation
12→ between two concrete stream representations). *)
13→
14→open Language
15→
16→module T = Transport.Zmq
17→
18→(** {1 EOF Sentinel}
19→
20→ An empty-payload message on a PUSH socket signals end-of-stream.
21→ The receiver exits its recv loop on seeing this sentinel. *)
22→
23→let send_eof sock ~topic =
24→ T.send sock ~topic ""
25→
26→let is_eof payload =
27→ String.length payload = 0
28→
29→(** {1 Process Handler Type}
30→
31→ The fabric calls a process handler for each subprocess entry.
32→ The handler receives the process's ZMQ socket endpoints and starts
33→ the subprocess. The fabric does not know how. *)
34→
35→type process_handler =
36→ sw:Eio.Switch.t ->
37→ proc_entry:Runtime.Routing.process_entry ->
38→ push_sockets:(string * [`Push] T.socket) list ->
39→ pull_sockets:(string * [`Pull] T.socket) list ->
40→ unit
41→
42→(** {1 Tool Dispatch Callback}
43→
44→ The bridge accepts an optional tool dispatch callback rather than
45→ depending on [Tool_dispatch] directly. The caller constructs
46→ this from [Tool_dispatch.tool_dispatch_loop]. *)
47→
48→type tool_dispatch =
49→ req_fd:Unix.file_descr ->
50→ resp_fd:Unix.file_descr ->
51→ agent_name:string ->
52→ unit
53→
54→(** {1 Observation Callback} *)
55→
56→type observation = {
57→ agent : string;
58→ payload : Yojson.Safe.t;
59→}
60→
61→(** {1 Pipe Write Helpers}
62→
63→ All writes to child stdin go through mux_envelope / mux_eof,
64→ protected by an OCaml [Mutex]. These MUST be called from within
65→ [Eio_unix.run_in_systhread] — [Mutex.lock] is not safe from
66→ Eio fibres. *)
67→
68→(** Write a port-tagged envelope to the child stdin.
69→ Caller must hold (or be inside run_in_systhread for) the mux mutex. *)
70→let write_envelope mux_m oc port json_str =
71→ match Yojson.Safe.from_string json_str with
72→ | _ ->
73→ let s = Printf.sprintf
74→ "{\"__port\":%S,\"msg\":%s}\n" port json_str in
75→ Mutex.lock mux_m;
76→ Fun.protect ~finally:(fun () -> Mutex.unlock mux_m) (fun () ->
77→ output_string oc s; flush oc)
78→ | exception Yojson.Json_error msg ->
79→ Runtime.Log.warn "bridge_mux"
80→ [("port", `String port); ("error", `String msg)]
81→
82→(** Write an EOF marker for a port. *)
83→let write_eof mux_m oc port =
84→ let s = Printf.sprintf
85→ "{\"__port\":%S,\"__eof\":true}\n" port in
86→ Mutex.lock mux_m;
87→ Fun.protect ~finally:(fun () -> Mutex.unlock mux_m) (fun () ->
88→ output_string oc s; flush oc)
89→
90→(** {1 Bridge Construction} *)
91→
92→(** Construct a process handler that spawns agent subprocesses with
93→ stdio bridge. *)
94→let make_handler
95→ ~(prog : program)
96→ ~type_env
97→ ~(bin_dir : string option)
98→ ~(resource_find : string -> string)
99→ ?(resolve_bin : (string -> string) option)
100→ ?(tool_dispatch_fn : tool_dispatch option)
101→ ?(on_observe : (observation -> unit) option)
102→ ()
103→ : process_handler =
104→ fun ~sw ~proc_entry ~push_sockets ~pull_sockets ->
105→ let pe = proc_entry in
106→ let binding = pe.Runtime.Routing.proc_binding in
107→ let config = match binding.impl with
108→ | Agent config -> config
109→ | _ -> failwith "bridge: expected Agent impl"
110→ in
111→
112→ (* Resolve prompt paths *)
113→ let config = Spawn_lib.resolve_prompts_config
114→ ~resource_find ~base_dir:(Sys.getcwd ()) config in
115→ let binding = { binding with impl = Agent config } in
116→
117→ (* Decompose input/output types *)
118→ let decomp_in = Spawn_lib.agent_input_decomposition type_env binding.input in
119→ let decomp_out = Spawn_lib.agent_output_decomposition type_env binding.output in
120→ let has_control = decomp_in <> None in
121→ let has_telemetry = decomp_out <> None in
122→
123→ (* Synthesize child spec with simple types (no products) *)
124→ let child_binding =
125→ let b = match decomp_in with
126→ | Some (data_t, _) -> { binding with input = Stream data_t }
127→ | None -> binding
128→ in
129→ match decomp_out with
130→ | Some (result_t, _) -> { b with output = Stream result_t }
131→ | None -> b
132→ in
133→
134→ (* Find tool bindings *)
135→ let tool_names = match List.assoc_opt "tools" config with
136→ | Some (Arr items) ->
137→ List.filter_map (fun item ->
138→ match item with Ident name -> Some name | _ -> None
139→ ) items
140→ | _ -> []
141→ in
142→ let tool_bindings = List.filter_map (fun name ->
143→ List.find_opt (fun (b : binding) ->
144→ b.bd_name = name && Validate.is_tool_binding b
145→ ) prog.prog_bindings
146→ ) tool_names in
147→ let has_tools = tool_bindings <> [] || tool_dispatch_fn <> None in
148→
149→ (* Synthesize spec *)
150→ let spec_source =
151→ if tool_bindings <> [] then
152→ Spawn_lib.synthesize_agent_spec prog child_binding tool_bindings
153→ else
154→ Spawn_lib.synthesize_spec prog child_binding
155→ in
156→ let spec_path = Spawn_lib.write_temp_spec spec_source in
157→
158→ (* Resolve binary *)
159→ let resolve = match resolve_bin with
160→ | Some f -> f
161→ | None -> Spawn_lib.resolve_binary ~bin_dir
162→ in
163→ let bin_path = resolve "plumb-agent" in
164→
165→ (* Create child stdin/stdout pipes *)
166→ let child_in_r, child_in_w = Unix.pipe ~cloexec:true () in
167→ let child_out_r, child_out_w = Unix.pipe ~cloexec:true () in
168→ let child_in_oc = Unix.out_channel_of_descr child_in_w in
169→
170→ (* MCP connections and env *)
171→ let mcp_connections = match List.assoc_opt "mcp" config with
172→ | Some (Arr items) ->
173→ List.filter_map (fun item ->
174→ match item with
175→ | Obj kvs ->
176→ (match Spawn_lib.connect_mcp_server kvs with
177→ | Ok conn -> Some conn
178→ | Error msg ->
179→ Runtime.Log.warn "mcp_connect_failed"
180→ [("detail", `String msg)];
181→ None)
182→ | _ -> None
183→ ) items
184→ | _ -> []
185→ in
186→ let mcp_env = if mcp_connections <> [] then begin
187→ let mcp_schemas = List.concat_map (fun conn ->
188→ List.map Mcp_client.tool_schema (Mcp_client.tools conn)
189→ ) mcp_connections in
190→ let mcp_path = Filename.temp_file "agentpi_mcp_" ".json" in
191→ let mcp_oc = open_out_gen
192→ [Open_wronly; Open_creat; Open_trunc] 0o600 mcp_path in
193→ Fun.protect ~finally:(fun () -> close_out mcp_oc) (fun () ->
194→ output_string mcp_oc
195→ (Yojson.Safe.to_string (`List mcp_schemas)));
196→ [("PLUMB_MCP_TOOLS", mcp_path)]
197→ end else [] in
198→
199→ (* Agent children need LLM credentials, docroot, and name *)
200→ let env = Spawn_lib.credential_env () @ Spawn_lib.provider_env ()
201→ @ [("PLUMB_AGENT_NAME", binding.bd_name)]
202→ @ mcp_env in
203→
204→ (* Spawn child — closes child_in_r and child_out_w in parent *)
205→ let pid = Spawn_lib.spawn_subprocess ~bin_path ~spec_path
206→ ~stdin_fd:child_in_r ~stdout_fd:child_out_w
207→ ~env () in
208→ Runtime.Log.info "bridge"
209→ ["event", `String "spawned";
210→ "agent", `String binding.bd_name;
211→ "pid", `Int pid;
212→ "has_control", `Bool has_control;
213→ "has_telemetry", `Bool has_telemetry;
214→ "has_tools", `Bool has_tools];
215→
216→ (* Mux: OCaml Mutex for system-thread safety (R2) *)
217→ let mux_m = Mutex.create () in
218→
219→ (* Track active mux sources; close child stdin when last exits *)
220→ let n_sources = 2 + (if has_tools && tool_dispatch_fn <> None then 1 else 0) in
221→ let mux_remaining = ref n_sources in
222→ let mux_rem_m = Mutex.create () in
223→ let mux_source_done () =
224→ Mutex.lock mux_rem_m;
225→ decr mux_remaining;
226→ let n = !mux_remaining in
227→ Mutex.unlock mux_rem_m;
228→ if n = 0 then close_out_noerr child_in_oc
229→ in
230→
231→ (* Find sockets by port name *)
232→ let find_pull port = List.assoc_opt port pull_sockets in
233→ let find_push port = List.assoc_opt port push_sockets in
234→
235→ (* === Tool dispatch setup === *)
236→ let tool_req_oc = ref None in
237→ let tool_resp_r = ref None in
238→ (match tool_dispatch_fn with
239→ | Some dispatch when has_tools ->
240→ let req_r, req_w = Unix.pipe ~cloexec:true () in
241→ let resp_r', resp_w = Unix.pipe ~cloexec:true () in
242→ tool_req_oc := Some (Unix.out_channel_of_descr req_w);
243→ tool_resp_r := Some resp_r';
244→ let _dispatch_thread = Thread.create (fun () ->
245→ dispatch ~req_fd:req_r ~resp_fd:resp_w
246→ ~agent_name:binding.bd_name
247→ ) () in
248→ ()
249→ | _ -> ());
250→
251→ (* === Mux: data input (ZMQ PULL → child stdin) ===
252→
253→ T.recv runs in Eio fibre context (uses Eio_unix.await_readable).
254→ Pipe writes use Eio_unix.run_in_systhread + Mutex. *)
255→ (match find_pull "data_in" with
256→ | Some data_in_sock ->
257→ Eio.Fiber.fork_daemon ~sw (fun () ->
258→ (try
259→ while true do
260→ let (_topic, payload) = T.recv data_in_sock in
261→ if is_eof payload then begin
262→ Eio_unix.run_in_systhread (fun () ->
263→ (try write_eof mux_m child_in_oc "input"
264→ with Sys_error _ -> ()));
265→ raise Exit
266→ end;
267→ Eio_unix.run_in_systhread (fun () ->
268→ try write_envelope mux_m child_in_oc "input" payload
269→ with Sys_error _ -> raise Exit)
270→ done
271→ with
272→ | Exit -> ()
273→ | Transport.Transport_error (_, _msg) -> ());
274→ (* After data EOF, send ctrl stop (R4) *)
275→ if has_control then
276→ Eio_unix.run_in_systhread (fun () ->
277→ try
278→ let s = "{\"__port\":\"ctrl_in\",\"msg\":{\"stop\":true}}\n" in
279→ Mutex.lock mux_m;
280→ Fun.protect ~finally:(fun () -> Mutex.unlock mux_m) (fun () ->
281→ output_string child_in_oc s; flush child_in_oc)
282→ with Sys_error _ -> ());
283→ Eio_unix.run_in_systhread (fun () -> mux_source_done ());
284→ `Stop_daemon)
285→ | None ->
286→ (* No data input socket — mux source done immediately *)
287→ mux_source_done ());
288→
289→ (* === Mux: control input (ZMQ PULL → child stdin) === *)
290→ (match find_pull "ctrl_in" with
291→ | Some ctrl_in_sock ->
292→ Eio.Fiber.fork_daemon ~sw (fun () ->
293→ (try
294→ while true do
295→ let (_topic, payload) = T.recv ctrl_in_sock in
296→ if is_eof payload then begin
297→ Eio_unix.run_in_systhread (fun () ->
298→ (try write_eof mux_m child_in_oc "ctrl_in"
299→ with Sys_error _ -> ()));
300→ raise Exit
301→ end;
302→ Eio_unix.run_in_systhread (fun () ->
303→ try write_envelope mux_m child_in_oc "ctrl_in" payload
304→ with Sys_error _ -> raise Exit)
305→ done
306→ with
307→ | Exit -> ()
308→ | Transport.Transport_error (_, _msg) -> ());
309→ Eio_unix.run_in_systhread (fun () -> mux_source_done ());
310→ `Stop_daemon)
311→ | None ->
312→ (* No control socket — mux source done immediately *)
313→ mux_source_done ());
314→
315→ (* === Mux: tool responses (pipe → child stdin) === *)
316→ (match !tool_resp_r with
317→ | Some resp_r' ->
318→ Eio.Fiber.fork_daemon ~sw (fun () ->
319→ Eio_unix.run_in_systhread (fun () ->
320→ let ic = Unix.in_channel_of_descr resp_r' in
321→ (try while true do
322→ let line = input_line ic in
323→ write_envelope mux_m child_in_oc "tool_resp" line
324→ done with End_of_file | Sys_error _ -> ());
325→ close_in_noerr ic;
326→ mux_source_done ());
327→ `Stop_daemon)
328→ | None -> ());
329→
330→ (* === Demux: child stdout → ZMQ PUSH sockets ===
331→
332→ Pipe reads use Eio_unix.run_in_systhread (blocking input_line).
333→ ZMQ sends run in Eio fibre context. *)
334→ let data_out_sock = find_push "data_out" in
335→ let ctrl_out_sock = find_push "ctrl_out" in
336→ let telemetry_sock = find_push "telemetry" in
337→
338→ Eio.Fiber.fork_daemon ~sw (fun () ->
339→ let ic = Unix.in_channel_of_descr child_out_r in
340→ (try while true do
341→ let line = Eio_unix.run_in_systhread (fun () ->
342→ input_line ic) in
343→ if String.length (String.trim line) > 0 then
344→ match Yojson.Safe.from_string line with
345→ | `Assoc fields ->
346→ let port = match List.assoc_opt "__port" fields with
347→ | Some (`String s) -> s | _ -> "output" in
348→ let msg = match List.assoc_opt "msg" fields with
349→ | Some m -> Yojson.Safe.to_string m
350→ | None -> line in
351→ (match port with
352→ | "output" ->
353→ Option.iter (fun sock ->
354→ T.send sock ~topic:binding.bd_name msg
355→ ) data_out_sock
356→ | "ctrl_out" ->
357→ Option.iter (fun sock ->
358→ T.send sock ~topic:binding.bd_name msg
359→ ) ctrl_out_sock
360→ | "telemetry" ->
361→ Option.iter (fun sock ->
362→ T.send sock ~topic:binding.bd_name msg
363→ ) telemetry_sock;
364→ (match on_observe with
365→ | Some cb ->
366→ (match Yojson.Safe.from_string msg with
367→ | json -> cb { agent = binding.bd_name; payload = json }
368→ | exception Yojson.Json_error _ -> ())
369→ | None -> ())
370→ | "tool_req" ->
371→ Eio_unix.run_in_systhread (fun () ->
372→ match !tool_req_oc with
373→ | Some oc ->
374→ output_string oc msg;
375→ output_char oc '\n';
376→ flush oc
377→ | None ->
378→ Runtime.Log.warn "bridge_demux"
379→ [("event", `String "tool_req_no_dispatch");
380→ ("agent", `String binding.bd_name)])
381→ | _ ->
382→ Runtime.Log.warn "bridge_demux"
383→ [("port", `String port);
384→ ("agent", `String binding.bd_name)])
385→ | _ ->
386→ (* Not an envelope — treat as raw output *)
387→ Option.iter (fun sock ->
388→ T.send sock ~topic:binding.bd_name line
389→ ) data_out_sock
390→ | exception exn ->
391→ Runtime.Log.debug "bridge_demux"
392→ ["agent", `String binding.bd_name;
393→ "event", `String "json_parse_failed";
394→ "error", `String (Printexc.to_string exn)];
395→ Option.iter (fun sock ->
396→ T.send sock ~topic:binding.bd_name line
397→ ) data_out_sock
398→ done with
399→ | End_of_file -> ()
400→ | Sys_error msg ->
401→ Runtime.Log.debug "bridge_demux"
402→ ["agent", `String binding.bd_name;
403→ "event", `String "pipe_closed";
404→ "detail", `String msg]);
405→ close_in_noerr ic;
406→ (* Send EOF sentinel on all output ZMQ sockets (R5) *)
407→ Option.iter (fun sock ->
408→ (try send_eof sock ~topic:binding.bd_name with _ -> ())
409→ ) data_out_sock;
410→ Option.iter (fun sock ->
411→ (try send_eof sock ~topic:binding.bd_name with _ -> ())
412→ ) ctrl_out_sock;
413→ Option.iter (fun sock ->
414→ (try send_eof sock ~topic:binding.bd_name with _ -> ())
415→ ) telemetry_sock;
416→ (* Close tool_req write-end to terminate dispatch loop *)
417→ Eio_unix.run_in_systhread (fun () ->
418→ (match !tool_req_oc with
419→ | Some oc -> close_out_noerr oc
420→ | None -> ());
421→ (* Reap child process (R6) *)
422→ let timeout_s = match Spawn_lib.timeout_of_binding binding with
423→ | Some t -> t | None -> 60.0 in
424→ let status, timed_out =
425→ Spawn_lib.waitpid_timed ~timeout_seconds:timeout_s pid in
426→ if timed_out then
427→ Runtime.Log.warn "bridge_timeout"
428→ ["agent", `String binding.bd_name;
429→ "timeout_seconds", `Float timeout_s];
430→ (match status with
431→ | Unix.WEXITED 0 -> ()
432→ | Unix.WEXITED code ->
433→ Runtime.Log.warn "bridge_exit"
434→ ["agent", `String binding.bd_name;
435→ "code", `Int code]
436→ | Unix.WSIGNALED sig_ ->
437→ Runtime.Log.warn "bridge_signal"
438→ ["agent", `String binding.bd_name;
439→ "signal", `Int sig_]
440→ | Unix.WSTOPPED _ -> ());
441→ (* Clean up temp files *)
442→ (try Sys.remove spec_path with Sys_error _ -> ()));
443→ `Stop_daemon);
444→
445→ ignore has_telemetry
446→ 1→(** Process spawning, spec synthesis, and subprocess infrastructure.
2→
3→ Extracted from [lib/plumb/spawn.ml] to break the dependency between
4→ the fabric and the plumb library. Dune-site paths (bin_dir,
5→ resource_find) are injected by callers — this library stays
6→ dune-site-free. *)
7→
8→open Language
9→
10→(** {1 Binary Resolution} *)
11→
12→let binary_of_impl = function
13→ | Agent _ -> "plumb-agent"
14→ | Plumb _ -> "plumb"
15→ | Tool _ | Lower _ | Builtin _ -> failwith "tools are not spawned as pipeline nodes"
16→ | Id | Copy | Merge | MergeAny | Discard | Empty | Barrier | Project _
17→ | Map _ | Filter _ | Tagger _ | Log _ | FormatJson | ParseJson ->
18→ failwith "structural morphisms do not need binaries"
19→
20→(** Resolve a sibling process binary. Tries two strategies:
21→ 1. Sibling in the same directory (works after [dune install])
22→ 2. Sibling in the dune _build tree (swaps the binary's subdirectory)
23→ Falls back to bare name for PATH lookup.
24→
25→ [~bin_dir] is the directory containing the current binary. Pass
26→ [Prefix_path.bin_dir] from the plumb library, or a test fixture. *)
27→let resolve_binary ~bin_dir name =
28→ let dir = match bin_dir with
29→ | Some d -> d
30→ | None -> Filename.dirname Sys.executable_name
31→ in
32→ let installed = Filename.concat dir name in
33→ if Sys.file_exists installed then installed
34→ else
35→ let parent = Filename.dirname dir in
36→ let base = Filename.basename Sys.executable_name in
37→ let build = Filename.concat (Filename.concat parent name) base in
38→ if Sys.file_exists build then build
39→ else
40→ (* In the dune build tree, directories use the short name (e.g. "agent")
41→ while installed binaries use the namespaced name ("plumb-agent").
42→ Strip the "plumb-" prefix for the build-tree lookup. *)
43→ let short =
44→ let prefix = "plumb-" in
45→ let plen = String.length prefix in
46→ if String.length name > plen && String.sub name 0 plen = prefix
47→ then String.sub name plen (String.length name - plen)
48→ else name
49→ in
50→ let build2 = Filename.concat (Filename.concat parent short) base in
51→ if Sys.file_exists build2 then build2
52→ else name
53→
54→(** Resolve a single prompt path entry.
55→
56→ - Contains [/] → filesystem path: absolute paths used as-is,
57→ relative paths resolved from [base_dir] (the pipeline file's directory).
58→ - No [/] → bare name: searched via [resource_find].
59→
60→ [~resource_find] is a function that resolves bare names to absolute
61→ paths. Pass [Resource_path.find] from the plumb library. *)
62→let resolve_prompt_path ~resource_find ~base_dir name =
63→ if String.contains name '/' then
64→ if not (Filename.is_relative name) then name
65→ else Filename.concat base_dir name
66→ else
67→ resource_find name
68→
69→(** Transform an agent config's [prompts] entries to absolute paths.
70→ Other config items pass through unchanged. *)
71→let resolve_prompts_config ~resource_find ~base_dir config =
72→ List.map (fun (key, value) ->
73→ match key, value with
74→ | "prompts", Arr items ->
75→ let resolved = List.map (fun item ->
76→ match item with
77→ | Str path ->
78→ Str (resolve_prompt_path ~resource_find ~base_dir path)
79→ | other -> other
80→ ) items in
81→ (key, Arr resolved)
82→ | _ -> (key, value)
83→ ) config
84→
85→(** {1 Environment Scrubbing}
86→
87→ Subprocess environment whitelist. Only these variables are inherited
88→ from the parent process. Everything else — including API keys — is
89→ stripped. Callers that need credentials (e.g. agent spawns) must pass
90→ them explicitly via the [~env] parameter.
91→
92→ This is a compile-time upper limit. Adjust as needed, but keep the
93→ list minimal: every entry is a potential information leak. *)
94→let safe_env_keys = [
95→ "PATH"; "HOME"; "LANG"; "LC_ALL"; "LC_CTYPE"; "LC_MESSAGES";
96→ "TERM"; "TMPDIR"; "USER"; "SHELL";
97→ "PLUMB_PATH"; "PLUMB_RESOURCES";
98→]
99→
100→(** Build a scrubbed environment array from the whitelist plus
101→ caller-supplied overrides. *)
102→let build_env env =
103→ let base = List.filter_map (fun k ->
104→ match Sys.getenv_opt k with
105→ | Some v -> Some (k ^ "=" ^ v)
106→ | None -> None
107→ ) safe_env_keys in
108→ let extra = List.map (fun (k, v) -> k ^ "=" ^ v) env in
109→ Array.of_list (base @ extra)
110→
111→(** Collect LLM provider API key env vars for agent subprocess spawns.
112→ Returns a list of [(key, value)] pairs for all set provider keys. *)
113→let credential_env () =
114→ List.filter_map (fun k ->
115→ match Sys.getenv_opt k with
116→ | Some v -> Some (k, v)
117→ | None -> None
118→ ) ["ANTHROPIC_API_KEY"; "OPENAI_API_KEY"]
119→
120→(** Collect provider/model override env vars for agent subprocess spawns.
121→ These allow CLI flags (--provider, --model) to propagate through pipelines. *)
122→let provider_env () =
123→ List.filter_map (fun k ->
124→ match Sys.getenv_opt k with
125→ | Some v -> Some (k, v)
126→ | None -> None
127→ ) ["PLUMB_PROVIDER"; "PLUMB_MODEL"]
128→
129→(** {1 Subprocess Spawning}
130→
131→ Unified subprocess spawn: fork, wire stdin/stdout/stderr, exec.
132→ Used by both agent and plumb process spawns.
133→
134→ Environment is scrubbed: only [safe_env_keys] plus caller-supplied
135→ [~env] pairs are visible to the child. API keys must be passed
136→ explicitly by callers that need them. *)
137→let spawn_subprocess ~bin_path ~spec_path
138→ ~stdin_fd ~stdout_fd ?stderr_fd
139→ ?(env=[]) ?cwd () =
140→ let env_array = build_env env in
141→ let pid = Unix.fork () in
142→ if pid = 0 then begin
143→ (* Child process: wire fds and exec. Use _exit (not exit) on all
144→ failure paths to avoid running at_exit handlers in the child. *)
145→ (match cwd with
146→ | Some d ->
147→ (try Unix.chdir d
148→ with Unix.Unix_error (err, _, _) ->
149→ Runtime.Log.error "spawn_subprocess"
150→ ["event", `String "chdir_failed";
151→ "path", `String d;
152→ "detail", `String (Unix.error_message err)];
153→ Unix._exit 2)
154→ | None -> ());
155→ Unix.dup2 stdin_fd Unix.stdin;
156→ Unix.close stdin_fd;
157→ Unix.dup2 stdout_fd Unix.stdout;
158→ Unix.close stdout_fd;
159→ (match stderr_fd with
160→ | Some fd -> Unix.dup2 fd Unix.stderr; Unix.close fd
161→ | None -> ());
162→ (try Unix.execve bin_path [| bin_path; spec_path |] env_array
163→ with exn ->
164→ Runtime.Log.error "spawn_subprocess"
165→ ["event", `String "execve_failed";
166→ "binary", `String bin_path;
167→ "detail", `String (Printexc.to_string exn)];
168→ Unix._exit 2)
169→ end else begin
170→ Unix.close stdin_fd;
171→ Unix.close stdout_fd;
172→ (match stderr_fd with Some fd -> Unix.close fd | None -> ());
173→ pid
174→ end
175→
176→(** {1 Type Dependency Collection} *)
177→
178→module StringSet = Set.Make(String)
179→
180→(** Collect all Named type references from a type expression. *)
181→let rec collect_named acc = function
182→ | Named name -> StringSet.add name acc
183→ | Record fields -> List.fold_left (fun a f -> collect_named a f.typ) acc fields
184→ | List t -> collect_named acc t
185→ | Stream t -> collect_named acc t
186→ | Sum variants -> List.fold_left collect_named acc variants
187→ | Product components -> List.fold_left collect_named acc components
188→ | Primitive _ | Unit -> acc
189→
190→(** Find the type declarations a binding depends on, transitively.
191→ Walks the binding's input/output types, then chases Named references
192→ through type declarations until reaching a fixed point. *)
193→let required_types (prog : program) (binding : binding) =
194→ let all_type_decls = prog.prog_types in
195→ let rec expand needed =
196→ let next = List.fold_left (fun acc td ->
197→ if StringSet.mem td.td_name acc
198→ then collect_named acc td.td_type
199→ else acc
200→ ) needed all_type_decls in
201→ if StringSet.equal next needed then needed
202→ else expand next
203→ in
204→ let seeds =
205→ collect_named (collect_named StringSet.empty binding.input) binding.output
206→ in
207→ let needed = expand seeds in
208→ List.filter (fun td -> StringSet.mem td.td_name needed) all_type_decls
209→
210→(** {1 Spec Synthesis} *)
211→
212→(** Synthesize a minimal .plumb spec for a child process.
213→ Contains only the binding and its transitive type dependencies. *)
214→let synthesize_spec (prog : program) (binding : binding) =
215→ let types = required_types prog binding in
216→ let mini = { prog_types = types; prog_protocols = [];
217→ prog_bindings = [binding] } in
218→ pp_program mini
219→
220→(** Collect required types from multiple bindings transitively. *)
221→let required_types_multi (prog : program) (bindings : binding list) =
222→ let all_type_decls = prog.prog_types in
223→ let rec expand needed =
224→ let next = List.fold_left (fun acc td ->
225→ if StringSet.mem td.td_name acc
226→ then collect_named acc td.td_type
227→ else acc
228→ ) needed all_type_decls in
229→ if StringSet.equal next needed then needed
230→ else expand next
231→ in
232→ let seeds = List.fold_left (fun acc b ->
233→ collect_named (collect_named acc b.input) b.output
234→ ) StringSet.empty bindings in
235→ let needed = expand seeds in
236→ List.filter (fun td -> StringSet.mem td.td_name needed) all_type_decls
237→
238→(** Synthesize a spec for an agent that has tool bindings.
239→ Includes the agent binding + tool bindings + shared type dependencies.
240→ Does NOT include backing agent bindings — the runtime dispatches those. *)
241→let synthesize_agent_spec (prog : program) (agent_binding : binding)
242→ (tool_bindings : binding list) =
243→ let all_bindings = agent_binding :: tool_bindings in
244→ let types = required_types_multi prog all_bindings in
245→ let mini = { prog_types = types; prog_protocols = [];
246→ prog_bindings = all_bindings } in
247→ pp_program mini
248→
249→(** Write a synthesized spec to a temp file. *)
250→let write_temp_spec source =
251→ let path = Filename.temp_file "agentpi_" ".plumb" in
252→ let oc = open_out_gen [Open_wronly; Open_creat; Open_trunc] 0o600 path in
253→ Fun.protect ~finally:(fun () -> close_out oc) (fun () ->
254→ output_string oc source);
255→ path
256→
257→(** {1 Agent Output Decomposition} *)
258→
259→(** If agent output is a product of streams [(!B, !T)], return [Some (B, T)].
260→ Non-product or non-stream-component output returns [None].
261→ Product with other than 2 stream components raises a config error —
262→ agent product output is always (result, telemetry).
263→
264→ Note: [!(A, B)] (stream of pairs) is NOT decomposed — that's a
265→ synchronized stream, not separate channels. Only [(!A, !B)] (pair
266→ of streams) maps to separate fds. *)
267→let agent_output_decomposition type_env output =
268→ match Validate.resolve type_env output with
269→ | Product [Stream result_t; Stream think_t] -> Some (result_t, think_t)
270→ | Product _ ->
271→ Runtime.Error.raise_ Config "config_error"
272→ "agent: product output must be a pair of streams, e.g. (!A, !B)"
273→ | _ -> None
274→
275→(** {1 Agent Input Decomposition} *)
276→
277→(** If agent input is a product of streams [(!A, !C)], return [Some (A, C)].
278→ Non-product or non-stream-component input returns [None].
279→ Product with other than 2 stream components raises a config error —
280→ agent product input is always (data, control).
281→
282→ Symmetric to [agent_output_decomposition]. *)
283→let agent_input_decomposition type_env input =
284→ match Validate.resolve type_env input with
285→ | Product [Stream data_t; Stream control_t] -> Some (data_t, control_t)
286→ | Product _ ->
287→ Runtime.Error.raise_ Config "config_error"
288→ "agent: product input must be a pair of streams, e.g. (!A, !C)"
289→ | _ -> None
290→
291→(** {1 Port Name Resolution} *)
292→
293→(** Port names for structural morphisms by keyword.
294→ Returns [None] if the keyword is not a structural morphism.
295→ [~n_args] is the spawn argument count, used by barrier to
296→ determine arity (N inputs = n_args - 1). *)
297→let port_names_of_structural ~n_args = function
298→ | "id" -> Some [| "input"; "output" |]
299→ | "copy" -> Some [| "input"; "out0"; "out1" |]
300→ | "merge" -> Some [| "in0"; "in1"; "output" |]
301→ | "discard" | "_format_json" | "_parse_json" -> Some [| "input"; "output" |]
302→ | "barrier" ->
303→ let n = n_args - 1 in
304→ Some (Array.init n_args (fun i ->
305→ if i < n then Printf.sprintf "in%d" i else "output"))
306→ | _ -> None
307→
308→(** Port names for a binding, determined by impl kind and input/output types.
309→ Structural morphisms use fixed ports; agents use up to 4 ports
310→ (input, control, output, telemetry); plumbs use declared ports. *)
311→let port_names_of_binding type_env (binding : binding) =
312→ match binding.impl with
313→ | Id -> [| "input"; "output" |]
314→ | Copy -> [| "input"; "out0"; "out1" |]
315→ | Merge | MergeAny -> [| "in0"; "in1"; "output" |]
316→ | Discard -> [| "input"; "output" |]
317→ | Empty -> [| "output" |]
318→ | Barrier ->
319→ let inp = match Validate.resolve type_env binding.input with
320→ | Stream t -> Validate.resolve type_env t
321→ | t -> t
322→ in
323→ let n = match inp with
324→ | Product ts -> List.length ts
325→ | _ -> 2 (* fallback for non-product; error caught later *)
326→ in
327→ Array.init (n + 1) (fun i ->
328→ if i < n then Printf.sprintf "in%d" i else "output")
329→ | Agent _ ->
330→ let has_control = agent_input_decomposition type_env binding.input <> None in
331→ let has_telemetry = agent_output_decomposition type_env binding.output <> None in
332→ (match has_control, has_telemetry with
333→ | false, false -> [| "input"; "output" |]
334→ | false, true -> [| "input"; "output"; "telemetry" |]
335→ | true, false -> [| "input"; "ctrl_in"; "output"; "ctrl_out" |]
336→ | true, true -> [| "input"; "ctrl_in"; "output"; "ctrl_out"; "telemetry" |])
337→ | Map _ | Filter _ | Project _ | Tagger _ | FormatJson | ParseJson -> [| "input"; "output" |]
338→ | Log _ -> [| "input"; "output" |]
339→ | Tool _ | Lower _ | Builtin _ -> failwith "tool bindings cannot be spawned as pipeline nodes"
340→ | Plumb (ports, _) -> Array.of_list ports
341→
342→(** Resolve spawn args to a positional channel list.
343→ Positional args pass through; named args are reordered to match
344→ the port_names array. Mixed positional/named is rejected. *)
345→let resolve_channels (sp : spawn_stmt) port_names =
346→ match sp.sp_args with
347→ | [] ->
348→ Runtime.Error.raise_ Config "config_error"
349→ (Printf.sprintf "spawn %s: no arguments" sp.sp_process)
350→ | (Positional _) :: _ ->
351→ List.map (function
352→ | Positional ch -> ch
353→ | Labeled _ ->
354→ Runtime.Error.raise_ Config "config_error"
355→ (Printf.sprintf "spawn %s: cannot mix positional and named arguments"
356→ sp.sp_process)
357→ ) sp.sp_args
358→ | (Labeled _) :: _ ->
359→ let named = List.map (function
360→ | Labeled (p, ch) -> (p, ch)
361→ | Positional _ ->
362→ Runtime.Error.raise_ Config "config_error"
363→ (Printf.sprintf "spawn %s: cannot mix positional and named arguments"
364→ sp.sp_process)
365→ ) sp.sp_args in
366→ (* Check for unknown port names *)
367→ List.iter (fun (p, _) ->
368→ if not (Array.exists (fun n -> n = p) port_names) then
369→ Runtime.Error.raise_ Config "config_error"
370→ (Printf.sprintf "spawn %s: unknown port '%s'" sp.sp_process p)
371→ ) named;
372→ (* Check for duplicate port names *)
373→ let seen = Hashtbl.create (Array.length port_names) in
374→ List.iter (fun (p, _) ->
375→ if Hashtbl.mem seen p then
376→ Runtime.Error.raise_ Config "config_error"
377→ (Printf.sprintf "spawn %s: duplicate port '%s'" sp.sp_process p);
378→ Hashtbl.add seen p ()
379→ ) named;
380→ (* Build channel list in positional order.
381→ All ports are required — no exemptions. *)
382→ Array.to_seq port_names |> Seq.filter_map (fun port_name ->
383→ match List.assoc_opt port_name named with
384→ | Some ch -> Some ch
385→ | None ->
386→ Runtime.Error.raise_ Config "config_error"
387→ (Printf.sprintf "spawn %s: missing required port '%s'"
388→ sp.sp_process port_name)
389→ ) |> List.of_seq
390→
391→(** Maximum output per stream for the [exec] builtin, in bytes.
392→ Compile-time upper limit — adjust as needed. 128 KB is enough for
393→ useful command output without overrunning LLM context windows. *)
394→let max_exec_output_bytes = 131_072
395→
396→(** Exec timeout in seconds. *)
397→let exec_timeout_seconds = 30.0
398→
399→(** Read from a file descriptor into a buffer, up to [max_bytes].
400→ Returns [(contents, truncated)]. Used by the [exec] builtin to
401→ capture stdout/stderr with a size cap. *)
402→let read_capped fd max_bytes =
403→ let buf = Buffer.create 4096 in
404→ let chunk = Bytes.create 4096 in
405→ let truncated = ref false in
406→ (try
407→ let continue = ref true in
408→ while !continue do
409→ let n = Unix.read fd chunk 0 4096 in
410→ if n = 0 then continue := false
411→ else begin
412→ let remaining = max_bytes - Buffer.length buf in
413→ if remaining <= 0 then begin
414→ truncated := true;
415→ continue := false
416→ end else begin
417→ let take = min n remaining in
418→ Buffer.add_subbytes buf chunk 0 take;
419→ if take < n then begin
420→ truncated := true;
421→ continue := false
422→ end
423→ end
424→ end
425→ done
426→ with Unix.Unix_error (err, _, _) ->
427→ (* Unexpected: EBADF or similar — child pipe should EOF cleanly. *)
428→ Runtime.Log.debug "read_capped"
429→ ["event", `String "read_error";
430→ "detail", `String (Unix.error_message err)]);
431→ (Buffer.contents buf, !truncated)
432→
433→(** {1 Timeout Helpers} *)
434→
435→(** Extract [@timeout N] annotation value from a binding's attributes.
436→ Returns [Some seconds] if the annotation exists, [None] otherwise. *)
437→let timeout_of_binding (b : binding) : float option =
438→ List.find_map (fun (key, value) ->
439→ if key = "timeout" then
440→ match value with
441→ | Num n -> Some n
442→ | _ -> None
443→ else None
444→ ) b.bd_attrs
445→
446→(** Timed wait for a child process. Polls with [Unix.waitpid WNOHANG]
447→ until the child exits or the deadline is reached.
448→
449→ On timeout: sends SIGTERM, waits [grace_seconds] (default 5s) with
450→ polling, then sends SIGKILL and reaps.
451→
452→ [timeout_seconds <= 0.0] means no timeout — blocks indefinitely.
453→ Returns [(status, timed_out)]. *)
454→let waitpid_timed ?(grace_seconds = 5.0) ~timeout_seconds pid =
455→ if timeout_seconds <= 0.0 then
456→ let _, status = Unix.waitpid [] pid in
457→ (status, false)
458→ else
459→ let deadline = Unix.gettimeofday () +. timeout_seconds in
460→ let rec poll () =
461→ match Unix.waitpid [Unix.WNOHANG] pid with
462→ | 0, _ ->
463→ if Unix.gettimeofday () >= deadline then begin
464→ (* Timeout: SIGTERM first *)
465→ (try Unix.kill pid Sys.sigterm with Unix.Unix_error _ -> ());
466→ let grace_deadline = Unix.gettimeofday () +. grace_seconds in
467→ let rec grace_poll () =
468→ match Unix.waitpid [Unix.WNOHANG] pid with
469→ | 0, _ ->
470→ if Unix.gettimeofday () >= grace_deadline then begin
471→ (* Grace period expired: SIGKILL *)
472→ (try Unix.kill pid Sys.sigkill with Unix.Unix_error _ -> ());
473→ let _, status = Unix.waitpid [] pid in
474→ (status, true)
475→ end else begin
476→ ignore (Unix.select [] [] [] 0.1);
477→ grace_poll ()
478→ end
479→ | _, status -> (status, true)
480→ in
481→ grace_poll ()
482→ end else begin
483→ ignore (Unix.select [] [] [] 0.1);
484→ poll ()
485→ end
486→ | _, status -> (status, false)
487→ in
488→ poll ()
489→
490→(** {1 MCP Connection} *)
491→
492→(** Parse MCP server config and connect. Returns [Ok conn] on success,
493→ [Error msg] on config or connection failure. *)
494→let connect_mcp_server (kvs : (string * config_value) list) =
495→ let get_str key =
496→ List.assoc_opt key kvs |> Option.map (function
497→ | Str s -> s | _ -> failwith (key ^ " must be a string"))
498→ in
499→ let get_str_list key =
500→ match List.assoc_opt key kvs with
501→ | Some (Arr items) ->
502→ Some (List.filter_map (function Str s -> Some s | _ -> None) items)
503→ | _ -> None
504→ in
505→ try
506→ let has_url = get_str "url" <> None in
507→ let has_command = get_str "command" <> None in
508→ if has_url && has_command then
509→ Error "mcp entry has both 'url' and 'command'"
510→ else if has_url then begin
511→ let url = Option.get (get_str "url") in
512→ let token = match get_str "token" with
513→ | Some t -> Some t
514→ | None ->
515→ match get_str "token_env" with
516→ | Some env_var ->
517→ (try Some (Sys.getenv env_var)
518→ with Not_found ->
519→ failwith (Printf.sprintf "env var '%s' not set" env_var))
520→ | None -> None
521→ in
522→ let tool_filter = get_str_list "tools" in
523→ let prefix = get_str "prefix" in
524→ Ok (Mcp_client.connect_http ~url ?token ?tool_filter ?prefix ())
525→ end else
526→ match get_str "command" with
527→ | None -> Error "mcp config requires 'url' or 'command'"
528→ | Some command ->
529→ let args = get_str_list "args" |> Option.value ~default:[] in
530→ let tool_filter = get_str_list "tools" in
531→ let env = match List.assoc_opt "env" kvs with
532→ | Some (Obj pairs) ->
533→ List.filter_map (fun (k, v) ->
534→ match v with Str s -> Some (k, s) | _ -> None
535→ ) pairs
536→ | _ -> []
537→ in
538→ let prefix = get_str "prefix" in
539→ Ok (Mcp_client.connect_stdio ~command ~args ~env ?tool_filter ?prefix ())
540→ with Failure msg -> Error msg
541→ 1→(** Agent process — LLM conversation with validated I/O.
2→
3→ Reads JSON Lines from stdin, runs one conversation loop per line
4→ with the configured LLM, validates the output, and writes
5→ JSON Lines to stdout. *)
6→
7→let get_config_str config key =
8→ List.assoc_opt key config |> Option.map (function
9→ | Language.Str s -> s
10→ | _ -> Runtime.Error.raise_ Config "config_error"
11→ (Printf.sprintf "%s must be a string" key))
12→
13→let get_config_num config key =
14→ List.assoc_opt key config |> Option.map (function
15→ | Language.Num n -> n
16→ | _ -> Runtime.Error.raise_ Config "config_error"
17→ (Printf.sprintf "%s must be a number" key))
18→
19→let get_config_bool config key =
20→ List.assoc_opt key config |> Option.map (function
21→ | Language.Bool b -> b
22→ | _ -> Runtime.Error.raise_ Config "config_error"
23→ (Printf.sprintf "%s must be a boolean" key))
24→
25→let get_config_str_list config key =
26→ List.assoc_opt key config |> Option.map (function
27→ | Language.Arr items ->
28→ List.map (function
29→ | Language.Str s -> s
30→ | _ -> Runtime.Error.raise_ Config "config_error"
31→ (Printf.sprintf "%s array must contain strings" key)
32→ ) items
33→ | _ -> Runtime.Error.raise_ Config "config_error"
34→ (Printf.sprintf "%s must be an array" key))
35→
36→(** {1 Port-multiplexed I/O}
37→
38→ All logical channels share stdin/stdout via tagged JSON envelopes:
39→ [{"__port": "name", "msg": <json>}].
40→
41→ Input: a single demux thread reads stdin, parses __port, and routes
42→ to internal pipes (one per logical input channel).
43→
44→ Output: a mutex-protected [mux_write] function wraps each message
45→ with its __port tag before writing to stdout. *)
46→
47→(** Mutex protecting stdout writes. Multiple threads (conversation loop,
48→ control handler, telemetry emitter) may write concurrently. *)
49→let stdout_mutex = Mutex.create ()
50→
51→(** Write a JSON line to stdout wrapped in a port envelope. *)
52→let mux_write port json_str =
53→ let envelope = Printf.sprintf "{\"__port\":%S,\"msg\":%s}\n" port json_str in
54→ Mutex.lock stdout_mutex;
55→ Fun.protect ~finally:(fun () -> Mutex.unlock stdout_mutex) (fun () ->
56→ output_string stdout envelope;
57→ flush stdout)
58→
59→(** Start the stdin demux thread. Reads JSON Lines from stdin,
60→ parses the [__port] field, and routes the [msg] payload to the
61→ appropriate internal pipe.
62→
63→ Port-specific EOF: when a message has [__eof: true], the demux
64→ thread closes that port's internal pipe without closing stdin.
65→ This allows the data input pipe to close (triggering the drain
66→ loop) while control messages continue flowing.
67→
68→ On stdin EOF, all remaining open pipes are closed. *)
69→let start_stdin_demux ~data_w ~ctrl_w ~tool_resp_w =
70→ let data_open = ref true in
71→ let ctrl_open = ref true in
72→ let tool_open = ref (tool_resp_w <> None) in
73→ let route_line line =
74→ if String.length (String.trim line) = 0 then ()
75→ else match Yojson.Safe.from_string line with
76→ | `Assoc fields ->
77→ let port = match List.assoc_opt "__port" fields with
78→ | Some (`String s) -> s | _ -> "input" in
79→ let is_eof = match List.assoc_opt "__eof" fields with
80→ | Some (`Bool true) -> true | _ -> false in
81→ if is_eof then begin
82→ (* Close the internal pipe for this port *)
83→ (match port with
84→ | "input" when !data_open ->
85→ close_out_noerr data_w; data_open := false
86→ | "ctrl_in" when !ctrl_open ->
87→ close_out_noerr ctrl_w; ctrl_open := false
88→ | "tool_resp" ->
89→ Option.iter close_out_noerr tool_resp_w; tool_open := false
90→ | _ -> ())
91→ end else begin
92→ let msg = match List.assoc_opt "msg" fields with
93→ | Some `Null -> "" (* null msg → blank line (skip/no-op) *)
94→ | Some m -> Yojson.Safe.to_string m
95→ | None -> line in
96→ let target = match port with
97→ | "input" when !data_open -> Some data_w
98→ | "ctrl_in" when !ctrl_open -> Some ctrl_w
99→ | "tool_resp" when !tool_open -> tool_resp_w
100→ | _ ->
101→ Runtime.Log.warn "mux_port_closed_or_unknown"
102→ [("port", `String port)];
103→ None
104→ in
105→ (match target with
106→ | Some oc ->
107→ output_string oc msg;
108→ output_char oc '\n';
109→ flush oc
110→ | None -> ())
111→ end
112→ | _ ->
113→ (* Not an envelope — treat as raw data input *)
114→ if !data_open then begin
115→ output_string data_w line;
116→ output_char data_w '\n';
117→ flush data_w
118→ end
119→ | exception Yojson.Json_error _ ->
120→ if !data_open then begin
121→ output_string data_w line;
122→ output_char data_w '\n';
123→ flush data_w
124→ end
125→ in
126→ Thread.create (fun () ->
127→ (try while true do
128→ let line = input_line stdin in
129→ route_line line
130→ done with End_of_file | Sys_error _ -> ());
131→ if !data_open then close_out_noerr data_w;
132→ if !ctrl_open then close_out_noerr ctrl_w;
133→ if !tool_open then Option.iter close_out_noerr tool_resp_w
134→ ) ()
135→
136→(** Memory request from the control channel, handled by the main loop. *)
137→type memory_request =
138→ | Get_memory
139→ | Set_memory of Client.message list
140→
141→(** Mutable state updated by the control channel reader thread.
142→ The conversation loop reads this before each API call. *)
143→type control_state = {
144→ mutable temperature : float option;
145→ mutable model_override : string option;
146→ mutable stop : bool;
147→ mutable paused : bool;
148→ mutable pause_acked : bool;
149→ mutable memory_request : memory_request option;
150→ memory_cond : Condition.t;
151→ mutex : Mutex.t;
152→}
153→
154→let make_control_state () = {
155→ temperature = None;
156→ model_override = None;
157→ stop = false;
158→ paused = false;
159→ pause_acked = false;
160→ memory_request = None;
161→ memory_cond = Condition.create ();
162→ mutex = Mutex.create ();
163→}
164→
165→(** Parse a JSON array into a message list. Returns None if any
166→ message is malformed (missing role or content). *)
167→let parse_messages json =
168→ match json with
169→ | `List items ->
170→ let msgs = List.filter_map (fun m ->
171→ match m with
172→ | `Assoc fields ->
173→ (match List.assoc_opt "role" fields,
174→ List.assoc_opt "content" fields with
175→ | Some (`String role), Some content ->
176→ Some { Client.role; content }
177→ | _ -> None)
178→ | _ -> None
179→ ) items in
180→ if List.length msgs <> List.length items then
181→ None (* reject if any message malformed *)
182→ else
183→ Some msgs
184→ | _ -> None
185→
186→(** Read control messages from the internal ctrl pipe. Each line is a JSON object;
187→ recognised fields update the control state. Unknown fields
188→ are silently ignored (forward compatibility). *)
189→let control_reader_thread (state : control_state) ic =
190→ (try while true do
191→ let line = input_line ic in
192→ if String.length (String.trim line) > 0 then begin
193→ let json = try Some (Yojson.Safe.from_string line) with _ -> None in
194→ match json with
195→ | Some (`Assoc fields) ->
196→ Mutex.lock state.mutex;
197→ Fun.protect ~finally:(fun () -> Mutex.unlock state.mutex) (fun () ->
198→ (match List.assoc_opt "set_temp" fields with
199→ | Some (`Float f) -> state.temperature <- Some f
200→ | Some (`Int n) -> state.temperature <- Some (float_of_int n)
201→ | Some `Null -> state.temperature <- None
202→ | _ -> ());
203→ (match List.assoc_opt "set_model" fields with
204→ | Some (`String s) -> state.model_override <- Some s
205→ | Some `Null -> state.model_override <- None
206→ | _ -> ());
207→ (match List.assoc_opt "stop" fields with
208→ | Some (`Bool true) -> state.stop <- true
209→ | _ -> ());
210→ (match List.assoc_opt "pause" fields with
211→ | Some (`Bool true) -> state.paused <- true
212→ | _ -> ());
213→ (match List.assoc_opt "resume" fields with
214→ | Some (`Bool true) -> state.paused <- false
215→ | _ -> ());
216→ (match List.assoc_opt "get_memory" fields with
217→ | Some (`Bool true) ->
218→ state.memory_request <- Some Get_memory
219→ | _ -> ());
220→ (match List.assoc_opt "set_memory" fields with
221→ | Some payload ->
222→ (match parse_messages payload with
223→ | Some msgs ->
224→ state.memory_request <- Some (Set_memory msgs)
225→ | None ->
226→ Runtime.Log.warn "set_memory_malformed"
227→ [("line", `String line)])
228→ | _ -> ());
229→ Condition.broadcast state.memory_cond);
230→ Runtime.Log.debug "control"
231→ [("message", Option.get json)]
232→ | _ ->
233→ Runtime.Log.warn "control_parse"
234→ [("line", `String line)]
235→ end
236→ done
237→ with
238→ | End_of_file ->
239→ Mutex.lock state.mutex;
240→ state.stop <- true;
241→ Condition.broadcast state.memory_cond;
242→ Mutex.unlock state.mutex
243→ | Unix.Unix_error (_err, _fn, _arg) ->
244→ Mutex.lock state.mutex;
245→ state.stop <- true;
246→ Condition.broadcast state.memory_cond;
247→ Mutex.unlock state.mutex)
248→
249→let () =
250→ Runtime.Entry.main
251→ ~usage:"Usage: plumb-agent <spec.plumb>"
252→ ~run:(fun spec_path ->
253→ (* Tag log lines with agent name so stderr is attributable *)
254→ (match Sys.getenv_opt "PLUMB_AGENT_NAME" with
255→ | Some name -> Runtime.Log.set_prefix [("agent", `String name)]
256→ | None -> ());
257→ (* Raise log level to Info so agent_config and tool_call are visible.
258→ Preserve Debug if PIPELINE_DEBUG=1 already set it higher. *)
259→ if Runtime.Log.level_rank (Runtime.Log.get_level ()) >
260→ Runtime.Log.level_rank Runtime.Log.Info then
261→ Runtime.Log.set_level Runtime.Log.Info;
262→ let source = Runtime.Entry.read_file spec_path in
263→ let loaded = Runtime.Spec.load_program source in
264→ (* Find the agent binding (there should be exactly one) *)
265→ let agent_bindings = List.filter (fun b ->
266→ match b.Language.impl with Language.Agent _ -> true | _ -> false
267→ ) loaded.program.prog_bindings in
268→ let binding = match agent_bindings with
269→ | [b] -> b
270→ | [] -> Runtime.Error.raise_ Config "config_error" "no agent binding in spec"
271→ | _ ->
272→ let names = List.map (fun b -> b.Language.bd_name) agent_bindings
273→ |> String.concat ", " in
274→ Runtime.Error.raise_ Config "config_error"
275→ (Printf.sprintf "multiple agent bindings: %s; specify one" names)
276→ in
277→ let type_env = loaded.type_env in
278→ let input_type = Language.Validate.unwrap_stream binding.input in
279→ let output_type = Language.Validate.unwrap_stream binding.output in
280→ let config = match binding.impl with
281→ | Language.Agent pairs -> pairs
282→ | _ -> Runtime.Error.raise_ Config "config_error" "not an agent binding"
283→ in
284→ (* Find tool bindings: Tool impl or @tool annotation on bare types *)
285→ let tool_bindings = List.filter
286→ Language.Validate.is_tool_binding
287→ loaded.program.prog_bindings in
288→ (* Provider: config > env > error *)
289→ let provider = match get_config_str config "provider" with
290→ | Some p -> p
291→ | None ->
292→ (match Sys.getenv_opt "PLUMB_PROVIDER" with
293→ | Some p -> p
294→ | None -> Runtime.Error.raise_ Config "config_error"
295→ "no provider: set provider in agent config or PLUMB_PROVIDER env var")
296→ in
297→ (* Model: config > env > error *)
298→ let model = match get_config_str config "model" with
299→ | Some m -> m
300→ | None ->
301→ (match Sys.getenv_opt "PLUMB_MODEL" with
302→ | Some m -> m
303→ | None -> Runtime.Error.raise_ Config "config_error"
304→ "no model: set model in agent config or PLUMB_MODEL env var")
305→ in
306→ (* System prompt *)
307→ let prompt_parts = ref [] in
308→ (match get_config_str config "prompt" with
309→ | Some s -> prompt_parts := s :: !prompt_parts
310→ | None -> ());
311→ (match get_config_str_list config "prompts" with
312→ | Some files ->
313→ List.iter (fun file ->
314→ let content =
315→ try Runtime.Entry.read_file file
316→ with Runtime.Error.Process_error _ ->
317→ Runtime.Error.raise_ Config "config_error"
318→ (Printf.sprintf "cannot read prompt file: %s" file)
319→ in
320→ let tagged = Printf.sprintf "<doc id=\"%s\">\n%s\n</doc>" file content in
321→ prompt_parts := tagged :: !prompt_parts
322→ ) files
323→ | None -> ());
324→ let output_instruction =
325→ let pp = Language.pp_type_expr type_env output_type in
326→ Printf.sprintf
327→ "Your output must be valid JSON matching this type:\n\n\
328→ \ %s\n\n\
329→ Respond with ONLY the JSON value, no surrounding text or explanation." pp
330→ in
331→ let system_prompt = List.rev_map (fun text ->
332→ { Client.text; cache_break = false }
333→ ) !prompt_parts
334→ @ [{ Client.text = output_instruction; cache_break = true }] in
335→ (* Amnesiac mode: fresh conversation per line (default: accumulate) *)
336→ let amnesiac = get_config_bool config "amnesiac"
337→ |> Option.value ~default:false in
338→ (* Max retries *)
339→ let max_retries = get_config_num config "max_retries"
340→ |> Option.map int_of_float |> Option.value ~default:3 in
341→ (* Max tokens *)
342→ let max_tokens = get_config_num config "max_tokens"
343→ |> Option.map int_of_float |> Option.value ~default:8192 in
344→ (* Extended thinking *)
345→ let thinking_budget = get_config_num config "thinking_budget"
346→ |> Option.map int_of_float in
347→ (* Max messages: cap on input messages before clean exit *)
348→ let max_messages = get_config_num config "max_messages"
349→ |> Option.map int_of_float in
350→ (* Runtime context: inject time, token counts, pin count into system prompt *)
351→ let runtime_context = get_config_bool config "runtime_context"
352→ |> Option.value ~default:false in
353→ (* API key — ELIZA needs none *)
354→ let api_key_var = match provider with
355→ | "anthropic" -> Some "ANTHROPIC_API_KEY"
356→ | "openai" -> Some "OPENAI_API_KEY"
357→ | "eliza" | "claude-code" -> None
358→ | p -> Runtime.Error.raise_ Config "config_error"
359→ (Printf.sprintf "unknown provider: %s" p)
360→ in
361→ (* Telemetry: emit via __port envelope on stdout *)
362→ let emit_telemetry_line = Some (mux_write "telemetry") in
363→ (* Announce resolved provider/model before API key check,
364→ so the user sees what was resolved even on config errors. *)
365→ let msg = `Assoc [
366→ ("kind", `String "config");
367→ ("provider", `String provider);
368→ ("model", `String model);
369→ ] in
370→ mux_write "telemetry" (Yojson.Safe.to_string msg);
371→ Runtime.Log.info "agent_config"
372→ [("provider", `String provider); ("model", `String model)];
373→ let api_key = match api_key_var with
374→ | Some var ->
375→ (match Sys.getenv_opt var with
376→ | Some k -> k
377→ | None -> Runtime.Error.raise_ Config "config_error"
378→ (Printf.sprintf "%s not set" var))
379→ | None -> ""
380→ in
381→ let endpoint = get_config_str config "endpoint" in
382→ (* Tool schemas: convert tool bindings to API tool definitions.
383→ For non-record inputs, wrap in an object for Anthropic API
384→ (which requires input_schema to be object-typed). *)
385→ let tool_schemas = List.map (fun (b : Language.binding) ->
386→ let base = Language.Validate.tool_schema type_env b in
387→ (* Anthropic API requires input_schema to be an object.
388→ For non-record tool inputs, wrap in { input: <schema> }. *)
389→ match Language.Validate.resolve type_env b.input with
390→ | Language.Record _ -> base
391→ | _ ->
392→ let open Yojson.Safe.Util in
393→ let name = base |> member "name" in
394→ let description = base |> member "description" in
395→ let inner_schema = base |> member "input_schema" in
396→ `Assoc [
397→ ("name", name);
398→ ("description", description);
399→ ("input_schema", `Assoc [
400→ ("type", `String "object");
401→ ("properties", `Assoc [
402→ ("input", inner_schema)
403→ ]);
404→ ("required", `List [`String "input"]);
405→ ("additionalProperties", `Bool false);
406→ ]);
407→ ]
408→ ) tool_bindings in
409→ (* MCP tool schemas: read from temp file when PLUMB_MCP_TOOLS is set.
410→ These are already in Anthropic format with input_schema (snake_case)
411→ and object-typed schemas — no wrapping needed. *)
412→ let mcp_schemas = match Sys.getenv_opt "PLUMB_MCP_TOOLS" with
413→ | Some path ->
414→ (try
415→ let content = In_channel.with_open_text path In_channel.input_all in
416→ (try Yojson.Safe.from_string content
417→ |> Yojson.Safe.Util.to_list
418→ with _ ->
419→ Runtime.Log.warn "mcp_schema_parse_error"
420→ [("path", `String path)];
421→ [])
422→ with Sys_error msg ->
423→ Runtime.Log.warn "mcp_schema_read_error"
424→ [("path", `String path); ("detail", `String msg)];
425→ [])
426→ | None -> []
427→ in
428→ let tool_schemas = tool_schemas @ mcp_schemas
429→ @ [Agent.Pinning.pin_schema; Agent.Pinning.unpin_schema] in
430→ (* Tool dispatch: request via __port envelope, response via
431→ internal pipe from the stdin demux thread. *)
432→ let has_tools = tool_schemas <> [] in
433→ let tool_resp_r, tool_resp_w =
434→ if has_tools then
435→ let r, w = Unix.pipe ~cloexec:true () in
436→ (Some (Unix.in_channel_of_descr r),
437→ Some (Unix.out_channel_of_descr w))
438→ else (None, None) in
439→ (* Control: input via internal pipe from stdin demux,
440→ output via __port envelope on stdout. *)
441→ let ctrl_r, ctrl_w = Unix.pipe ~cloexec:true () in
442→ let ctrl_ic = Unix.in_channel_of_descr ctrl_r in
443→ let ctrl_oc = Unix.out_channel_of_descr ctrl_w in
444→ let control_state = make_control_state () in
445→ ignore (Thread.create (control_reader_thread control_state) ctrl_ic);
446→ (* Data input: internal pipe from stdin demux *)
447→ let data_r, data_w = Unix.pipe ~cloexec:true () in
448→ let data_ic = Unix.in_channel_of_descr data_r in
449→ let data_oc = Unix.out_channel_of_descr data_w in
450→ (* Start stdin demux thread *)
451→ ignore (start_stdin_demux
452→ ~data_w:data_oc ~ctrl_w:ctrl_oc ~tool_resp_w);
453→ let get_model () =
454→ Mutex.lock control_state.mutex;
455→ let m = match control_state.model_override with
456→ | Some s -> s | None -> model
457→ in
458→ Mutex.unlock control_state.mutex;
459→ m
460→ in
461→ let get_temperature () =
462→ Mutex.lock control_state.mutex;
463→ let t = control_state.temperature in
464→ Mutex.unlock control_state.mutex;
465→ t
466→ in
467→ let is_stopped () =
468→ Mutex.lock control_state.mutex;
469→ let s = control_state.stop in
470→ Mutex.unlock control_state.mutex;
471→ s
472→ in
473→ (* Run with Eio *)
474→ Mirage_crypto_rng_unix.use_default ();
475→ Eio_main.run @@ fun env ->
476→ Eio.Switch.run @@ fun sw ->
477→ let net = Eio.Stdenv.net env in
478→ let client = match provider with
479→ | "anthropic" ->
480→ Client.Anthropic.make ~sw ~net ~api_key
481→ ?endpoint ()
482→ | "openai" ->
483→ Client.Openai.make ~sw ~net ~api_key
484→ ?endpoint ()
485→ | "eliza" ->
486→ Client.Eliza.make ()
487→ | "claude-code" ->
488→ Client.Claude_code.make ()
489→ | p -> Runtime.Error.raise_ Config "config_error"
490→ (Printf.sprintf "unknown provider: %s" p)
491→ in
492→ (* Write a JSON line to control out via mux envelope.
493→ Once the control pipe breaks, skip all further writes —
494→ the agent continues serving data. *)
495→ let control_out_live = ref true in
496→ let control_out json =
497→ if !control_out_live then
498→ (try mux_write "ctrl_out" (Yojson.Safe.to_string json)
499→ with Sys_error _ ->
500→ control_out_live := false;
501→ Runtime.Log.warn "control_out_broken" [])
502→ in
503→ (* JSON Lines: process input stream *)
504→ let line_num = ref 0 in
505→ let msg_count = ref 0 in
506→ let history = ref [] in
507→ let pin_set = ref Agent.Pinning.PinMap.empty in
508→ let pin_counter = ref 0 in
509→ (* Session-wide token counters for runtime context injection *)
510→ let session_input_tokens = ref 0 in
511→ let session_output_tokens = ref 0 in
512→ let api_calls = ref 0 in
513→ let compaction_cycles = ref 0 in
514→ (* Wrap telemetry emitter to also accumulate token counts *)
515→ let emit_telemetry_line = match emit_telemetry_line with
516→ | Some write_line ->
517→ Some (fun line ->
518→ (try
519→ match Yojson.Safe.from_string line with
520→ | `Assoc fields ->
521→ (match List.assoc_opt "kind" fields with
522→ | Some (`String "usage") ->
523→ incr api_calls;
524→ (match List.assoc_opt "input_tokens" fields with
525→ | Some (`Int n) ->
526→ session_input_tokens := !session_input_tokens + n
527→ | _ -> ());
528→ (match List.assoc_opt "output_tokens" fields with
529→ | Some (`Int n) ->
530→ session_output_tokens := !session_output_tokens + n
531→ | _ -> ())
532→ | _ -> ())
533→ | _ -> ()
534→ with _ -> ());
535→ write_line line)
536→ | None -> None
537→ in
538→ (* Emit absolute pin state to telemetry after every pin/unpin. *)
539→ let emit_pin_telemetry action =
540→ match emit_telemetry_line with
541→ | Some write_line ->
542→ let keys = Agent.Pinning.PinMap.bindings !pin_set
543→ |> List.map (fun (k, _) ->
544→ `String (Agent.Pinning.PinKey.to_string k)) in
545→ let ev = `Assoc [
546→ ("kind", `String "pin_state");
547→ ("action", `String action);
548→ ("count", `Int (Agent.Pinning.PinMap.cardinal !pin_set));
549→ ("keys", `List keys);
550→ ] in
551→ write_line (Yojson.Safe.to_string ev)
552→ | None -> ()
553→ in
554→ (* dispatch_tool: handles pin/unpin, cache hits, and mux dispatch *)
555→ let dispatch_tool =
556→ let emit_req = mux_write "tool_req" in
557→ let tool_ic = tool_resp_r in
558→ fun id name input ->
559→ let open Agent.Pinning in
560→ match name with
561→ | "pin" ->
562→ let open Yojson.Safe.Util in
563→ let tool = input |> member "tool" |> to_string in
564→ let args = input |> member "arguments" in
565→ let key = canonical_key tool args in
566→ if PinMap.mem key !pin_set then begin
567→ emit_pin_telemetry "pin";
568→ ({|{"ok":true,"already_pinned":true}|}, false)
569→ end else begin
570→ (* Evict oldest if at capacity *)
571→ if PinMap.cardinal !pin_set >= max_pins then begin
572→ let oldest_key = PinMap.bindings !pin_set
573→ |> List.sort (fun (_, a) (_, b) ->
574→ compare a.pinned_at b.pinned_at)
575→ |> List.hd |> fst in
576→ pin_set := PinMap.remove oldest_key !pin_set
577→ end;
578→ let content = find_tool_result !history tool args
579→ |> Option.value ~default:"" in
580→ incr pin_counter;
581→ pin_set := PinMap.add key
582→ { tool_name = tool; arguments = args;
583→ content; pinned_at = !pin_counter }
584→ !pin_set;
585→ emit_pin_telemetry "pin";
586→ (Printf.sprintf {|{"ok":true,"pin_count":%d}|}
587→ (PinMap.cardinal !pin_set), false)
588→ end
589→ | "unpin" ->
590→ let open Yojson.Safe.Util in
591→ let tool = input |> member "tool" |> to_string in
592→ let args = input |> member "arguments" in
593→ let key = canonical_key tool args in
594→ pin_set := PinMap.remove key !pin_set;
595→ emit_pin_telemetry "unpin";
596→ (Printf.sprintf {|{"ok":true,"pin_count":%d}|}
597→ (PinMap.cardinal !pin_set), false)
598→ | _ ->
599→ (* Check pin cache *)
600→ let key = canonical_key name input in
601→ (match PinMap.find_opt key !pin_set with
602→ | Some entry when entry.content <> "" ->
603→ Runtime.Log.info "pin_cache_hit" [("tool", `String name)];
604→ let key_str = PinKey.to_string key in
605→ (Printf.sprintf
606→ {|{"pinned":true,"note":"This tool result is already pinned in your context. Look for <pinned key=\"%s\"> in the pinned-documents block at the top of your conversation."}|}
607→ (String.escaped key_str), false)
608→ | _ ->
609→ (* Normal dispatch via mux *)
610→ (match tool_ic with
611→ | Some ic ->
612→ let req = `Assoc [
613→ ("id", `String id); ("name", `String name);
614→ ("input", input)] in
615→ emit_req (Yojson.Safe.to_string req);
616→ let resp_line = input_line ic in
617→ let resp_json = Yojson.Safe.from_string resp_line in
618→ let open Yojson.Safe.Util in
619→ let content = resp_json |> member "content" |> to_string in
620→ let is_error = resp_json |> member "is_error" |> to_bool in
621→ (content, is_error)
622→ | None ->
623→ ("{\"error\":\"no tools available\"}", true)))
624→ in
625→ let get_runtime_context =
626→ if runtime_context then
627→ Some (fun () ->
628→ let now = Unix.gettimeofday () in
629→ let tm = Unix.gmtime now in
630→ let time_str = Printf.sprintf "%04d-%02d-%02dT%02d:%02d:%02dZ"
631→ (1900 + tm.Unix.tm_year) (1 + tm.Unix.tm_mon) tm.Unix.tm_mday
632→ tm.Unix.tm_hour tm.Unix.tm_min tm.Unix.tm_sec in
633→ let pin_count = Agent.Pinning.PinMap.cardinal !pin_set in
634→ let history_len = List.length !history in
635→ Printf.sprintf
636→ "[Runtime context]\n\
637→ Time: %s\n\
638→ Session input tokens: %d\n\
639→ Session output tokens: %d\n\
640→ API calls: %d\n\
641→ Max output tokens per turn: %d\n\
642→ History messages: %d\n\
643→ Pinned documents: %d\n\
644→ Compaction cycles: %d"
645→ time_str !session_input_tokens !session_output_tokens
646→ !api_calls max_tokens history_len pin_count
647→ !compaction_cycles)
648→ else None
649→ in
650→ let loop_config = {
651→ Loop.client;
652→ system_prompt;
653→ max_retries;
654→ max_tokens;
655→ thinking_budget;
656→ emit_telemetry_line;
657→ type_env;
658→ input_type;
659→ output_type;
660→ tool_schemas;
661→ dispatch_tool = if has_tools then Some dispatch_tool else None;
662→ get_model;
663→ get_temperature;
664→ is_stopped;
665→ get_runtime_context;
666→ } in
667→ let handle_memory_request () =
668→ (* Called INSIDE mutex lock. Releases lock for I/O,
669→ re-acquires before returning. *)
670→ match control_state.memory_request with
671→ | Some Get_memory ->
672→ let msgs = List.map (fun (m : Client.message) ->
673→ `Assoc [("role", `String m.role); ("content", m.content)]
674→ ) !history in
675→ let pinned = Agent.Pinning.serialise_pins !pin_set in
676→ let dump = `Assoc [
677→ ("kind", `String "memory");
678→ ("messages", `List msgs);
679→ ("pinned", `List pinned);
680→ ] in
681→ control_state.memory_request <- None;
682→ Mutex.unlock control_state.mutex;
683→ control_out dump;
684→ Mutex.lock control_state.mutex
685→ | Some (Set_memory msgs) ->
686→ let old_len = List.length !history in
687→ let pin_msg = Agent.Pinning.reinject_pins_as_message !pin_set in
688→ let with_pins = match pin_msg with
689→ | Some m -> m :: msgs
690→ | None -> msgs
691→ in
692→ history := with_pins;
693→ incr compaction_cycles;
694→ control_state.memory_request <- None;
695→ Runtime.Log.info "memory_set"
696→ [("old_messages", `Int old_len);
697→ ("new_messages", `Int (List.length with_pins))];
698→ Mutex.unlock control_state.mutex;
699→ control_out (`Assoc [
700→ ("kind", `String "memory_set");
701→ ("old_messages", `Int old_len);
702→ ("new_messages", `Int (List.length with_pins));
703→ ]);
704→ Mutex.lock control_state.mutex
705→ | None -> ()
706→ in
707→ try while true do
708→ (* Pause loop: wait while paused, handle memory requests *)
709→ Mutex.lock control_state.mutex;
710→ while control_state.paused && not control_state.stop do
711→ (* Emit pause_ack on first entry into paused state *)
712→ if not control_state.pause_acked then begin
713→ control_state.pause_acked <- true;
714→ Mutex.unlock control_state.mutex;
715→ control_out (`Assoc [("kind", `String "pause_ack")]);
716→ Mutex.lock control_state.mutex
717→ end;
718→ handle_memory_request ();
719→ if control_state.paused && not control_state.stop then
720→ Condition.wait control_state.memory_cond control_state.mutex
721→ done;
722→ (* Emit resume_ack when leaving paused state *)
723→ if control_state.pause_acked then begin
724→ control_state.pause_acked <- false;
725→ Mutex.unlock control_state.mutex;
726→ control_out (`Assoc [("kind", `String "resume_ack");
727→ ("resumed", `Bool true)]);
728→ Mutex.lock control_state.mutex
729→ end;
730→ (* Handle any pending request when not paused *)
731→ handle_memory_request ();
732→ Mutex.unlock control_state.mutex;
733→ (* Do NOT check is_stopped() here. Data and ctrl share a muxed
734→ stdin; when the executor closes the ctrl write-fd after data
735→ EOF, the ctrl EOF and data EOF arrive in quick succession.
736→ The ctrl reader may set stop=true before the main loop reads
737→ the last data line, discarding valid input. The data loop
738→ terminates naturally on End_of_file from data_ic. Stop is
739→ checked in the post-data-EOF wait. *)
740→ let line = input_line data_ic in
741→ incr line_num;
742→ if String.length (String.trim line) = 0 then ()
743→ else begin try
744→ (match max_messages with
745→ | Some limit when !msg_count >= limit -> raise Exit
746→ | _ -> ());
747→ let json =
748→ try Yojson.Safe.from_string line
749→ with Yojson.Json_error msg ->
750→ Runtime.Error.raise_
751→ ~extras:[("line", `Int !line_num)]
752→ Invalid "parse_error" msg
753→ in
754→ match Language.Validate.check type_env input_type json with
755→ | Ok cleaned ->
756→ let result, new_history =
757→ Loop.run loop_config ~history:!history cleaned
758→ in
759→ if not amnesiac then history := new_history;
760→ incr msg_count;
761→ mux_write "output" (Yojson.Safe.to_string result)
762→ | Error errs ->
763→ Runtime.Error.raise_
764→ ~extras:[("line", `Int !line_num)]
765→ Invalid "validation_error"
766→ (Language.Validate.error_string errs)
767→ with Runtime.Error.Process_error err ->
768→ (* Log the error and emit it on the output channel so
769→ downstream sees the dropped message. Do not crash —
770→ the agent stays alive for subsequent valid input. *)
771→ let error_json = Runtime.Error.to_json err in
772→ (try mux_write "output" (Yojson.Safe.to_string error_json)
773→ with Sys_error _ -> ());
774→ Runtime.Log.error "input_error"
775→ [("error", `String err.code);
776→ ("detail", `String err.detail)];
777→ (* Emit on telemetry for TUI visibility *)
778→ let telem_json = `Assoc [
779→ ("kind", `String "error");
780→ ("error", `String err.code);
781→ ("detail", `String err.detail);
782→ ] in
783→ (try mux_write "telemetry"
784→ (Yojson.Safe.to_string telem_json)
785→ with Sys_error _ -> ())
786→ end
787→ done
788→ with
789→ | End_of_file ->
790→ (* Data input exhausted — remain available for protocol
791→ commands (pause/resume/memory) until the control pipe
792→ closes. In muxed mode, data and control share stdin;
793→ the demux thread closes the data pipe on data EOF but
794→ the ctrl pipe stays open while stdin carries control
795→ messages. See PLMB-033. *)
796→ Mutex.lock control_state.mutex;
797→ (try while not control_state.stop do
798→ while control_state.paused && not control_state.stop do
799→ if not control_state.pause_acked then begin
800→ control_state.pause_acked <- true;
801→ Mutex.unlock control_state.mutex;
802→ control_out (`Assoc [("kind", `String "pause_ack")]);
803→ Mutex.lock control_state.mutex
804→ end;
805→ handle_memory_request ();
806→ if control_state.paused && not control_state.stop then
807→ Condition.wait control_state.memory_cond control_state.mutex
808→ done;
809→ if control_state.pause_acked then begin
810→ control_state.pause_acked <- false;
811→ Mutex.unlock control_state.mutex;
812→ control_out (`Assoc [("kind", `String "resume_ack");
813→ ("resumed", `Bool true)]);
814→ Mutex.lock control_state.mutex
815→ end;
816→ handle_memory_request ();
817→ if not control_state.stop then
818→ Condition.wait control_state.memory_cond control_state.mutex
819→ done
820→ with Exit -> ());
821→ Mutex.unlock control_state.mutex
822→ | Exit -> ())
823→ 1→(executable
2→ (name main)
3→ (public_name plumb-agent)
4→ (libraries language runtime client agent yojson eio_main
5→ mirage-crypto-rng.unix))
6→ 1→(* End-to-end tests for the fabric with inline morphisms and
2→ agent subprocesses via the bridge.
3→ Uses the transport layer, routing table, fabric, and bridge
4→ to run complete pipelines. *)
5→
6→module T = Transport.Zmq
7→
8→let pipeline_counter = ref 0
9→let fresh_id () =
10→ incr pipeline_counter;
11→ Printf.sprintf "test-%d-%d" (Unix.getpid ()) !pipeline_counter
12→
13→let parse_and_compile src =
14→ let open Language in
15→ let prog = Language.parse src in
16→ let main = List.find (fun (b : binding) -> b.bd_name = "main") prog.prog_bindings in
17→ let ports, pb = match main.impl with
18→ | Plumb (ports, pb) -> (ports, pb)
19→ | _ -> failwith "main must be plumb"
20→ in
21→ let type_env =
22→ try Validate.build_type_env prog.prog_types
23→ with Validate.Schema_error msg -> failwith msg
24→ in
25→ let routing_table = Runtime.Routing.compile ~prog ~ports ~pb
26→ ~type_env ~main_binding:main ~seeded:pb.pb_seeded in
27→ (prog, routing_table, type_env)
28→
29→(** Run a fabric pipeline (inline only), execute [f] with the boundary sockets. *)
30→let with_fabric src f =
31→ let _prog, routing_table, type_env = parse_and_compile src in
32→ let pipeline_id = fresh_id () in
33→ Eio_main.run (fun _env ->
34→ Eio.Switch.run (fun sw ->
35→ let fabric = Fabric.execute ~sw ~routing_table ~type_env
36→ ~pipeline_id () in
37→ Unix.sleepf 0.1;
38→ let push_in = Fabric.boundary_input fabric "input" in
39→ let pull_out = Fabric.boundary_output fabric "output" in
40→ f push_in pull_out;
41→ fabric.cancel ()))
42→
43→(** Find the agent binary for tests. *)
44→let find_agent_binary _name =
45→ let candidates = [
46→ "../bin/agent/main.exe";
47→ "_build/default/bin/agent/main.exe";
48→ ] in
49→ match List.find_opt Sys.file_exists candidates with
50→ | Some p ->
51→ if Filename.is_relative p then Filename.concat (Sys.getcwd ()) p
52→ else p
53→ | None -> failwith "cannot find agent binary (plumb-agent)"
54→
55→(** Run a fabric pipeline with agent bridge, execute [f] with boundary sockets. *)
56→let with_fabric_agent src f =
57→ let prog, routing_table, type_env = parse_and_compile src in
58→ let pipeline_id = fresh_id () in
59→ Eio_main.run (fun _env ->
60→ Eio.Switch.run (fun sw ->
61→ let handler = Fabric.Bridge.make_handler
62→ ~prog ~type_env
63→ ~bin_dir:None
64→ ~resource_find:(fun name ->
65→ failwith ("resource_find not available in test: " ^ name))
66→ ~resolve_bin:find_agent_binary
67→ () in
68→ let fabric = Fabric.execute ~sw ~routing_table ~type_env
69→ ~pipeline_id ~process_handler:handler () in
70→ Unix.sleepf 0.2;
71→ let push_in = Fabric.boundary_input fabric "input" in
72→ let pull_out = Fabric.boundary_output fabric "output" in
73→ f push_in pull_out;
74→ fabric.cancel ()))
75→
76→(* --- Identity pipeline --- *)
77→
78→let test_identity () =
79→ with_fabric {|
80→ type M = { text: string }
81→ let main : !M -> !M = plumb(input, output) {
82→ spawn id(input, output)
83→ }
84→ |} (fun push_in pull_out ->
85→ Eio.Fiber.both
86→ (fun () ->
87→ T.send push_in ~topic:"input" {|{"text":"hello"}|})
88→ (fun () ->
89→ let (_topic, payload) = T.recv pull_out in
90→ let json = Yojson.Safe.from_string payload in
91→ match json with
92→ | `Assoc fields ->
93→ let text = match List.assoc_opt "text" fields with
94→ | Some (`String s) -> s | _ -> "MISSING" in
95→ Alcotest.(check string) "text preserved" "hello" text
96→ | _ -> Alcotest.fail "expected JSON object"))
97→
98→(* --- Copy + merge pipeline (each message appears twice) --- *)
99→
100→let test_copy_merge () =
101→ with_fabric {|
102→ type M = { text: string }
103→ let main : !M -> !M = plumb(input, output) {
104→ let ch_a : !M = channel
105→ let ch_b : !M = channel
106→ spawn copy(input, ch_a, ch_b)
107→ spawn merge(ch_a, ch_b, output)
108→ }
109→ |} (fun push_in pull_out ->
110→ let received = ref 0 in
111→ Eio.Fiber.both
112→ (fun () ->
113→ T.send push_in ~topic:"input" {|{"text":"hello"}|})
114→ (fun () ->
115→ for _ = 1 to 2 do
116→ let (_topic, payload) = T.recv pull_out in
117→ let json = Yojson.Safe.from_string payload in
118→ (match json with
119→ | `Assoc fields ->
120→ (match List.assoc_opt "text" fields with
121→ | Some (`String "hello") -> incr received
122→ | _ -> Alcotest.fail "wrong text")
123→ | _ -> Alcotest.fail "expected JSON object")
124→ done);
125→ Alcotest.(check int) "2 copies received" 2 !received)
126→
127→(* --- Agent subprocess via bridge --- *)
128→
129→(* Agent identity: send a message through an ELIZA agent, expect a response. *)
130→let test_agent_identity () =
131→ with_fabric_agent {|
132→ let main : !string -> !string = plumb(input, output) {
133→ spawn echo(input, output)
134→ }
135→ let echo : !string -> !string = agent {
136→ provider: "eliza",
137→ model: "doctor"
138→ }
139→ |} (fun push_in pull_out ->
140→ Eio.Fiber.both
141→ (fun () ->
142→ T.send push_in ~topic:"input" {|"hello"|};
143→ Unix.sleepf 2.0;
144→ Fabric.Bridge.send_eof push_in ~topic:"input")
145→ (fun () ->
146→ let (_topic, payload) = T.recv pull_out in
147→ let response = Yojson.Safe.from_string payload in
148→ match response with
149→ | `String s ->
150→ Alcotest.(check bool) "non-empty response" true (String.length s > 0)
151→ | _ -> Alcotest.fail "expected string response"))
152→
153→(** Agent EOF: data + EOF → agent processes, output + EOF propagates. *)
154→let test_agent_eof () =
155→ with_fabric_agent {|
156→ let main : !string -> !string = plumb(input, output) {
157→ spawn echo(input, output)
158→ }
159→ let echo : !string -> !string = agent {
160→ provider: "eliza",
161→ model: "doctor"
162→ }
163→ |} (fun push_in pull_out ->
164→ Eio.Fiber.both
165→ (fun () ->
166→ T.send push_in ~topic:"input" {|"hello"|};
167→ Unix.sleepf 1.0;
168→ Fabric.Bridge.send_eof push_in ~topic:"input")
169→ (fun () ->
170→ (* First recv: the agent's response *)
171→ let (_topic, payload) = T.recv pull_out in
172→ Alcotest.(check bool) "got response" true (String.length payload > 0);
173→ (* Second recv: should be the EOF sentinel *)
174→ let (_topic2, payload2) = T.recv pull_out in
175→ Alcotest.(check bool) "got EOF" true (Fabric.Bridge.is_eof payload2)))
176→
177→let () =
178→ Alcotest.run "fabric" [
179→ "inline", [
180→ Alcotest.test_case "identity" `Quick test_identity;
181→ Alcotest.test_case "copy + merge" `Quick test_copy_merge;
182→ ];
183→ "agent", [
184→ Alcotest.test_case "agent identity" `Slow test_agent_identity;
185→ Alcotest.test_case "agent EOF" `Slow test_agent_eof;
186→ ];
187→ ]
188→ 1→(** Integration tests for agent control protocol over muxed stdin/stdout.
2→
3→ Tests pause/resume, get_memory, set_memory, and their interaction.
4→ All communication uses port-multiplexed JSON envelopes on
5→ stdin/stdout: [{"__port": "ctrl_in", "msg": {...}}] for commands,
6→ [{"__port": "ctrl_out", "msg": {...}}] for responses.
7→
8→ Data EOF is signalled via [{"__port": "input", "__eof": true}],
9→ which closes the data input pipe while keeping control active. *)
10→
11→(** Resolve agent binary path — works from both dune test
12→ (cwd = _build/default/test) and dune exec (cwd = project root) *)
13→let agent_bin =
14→ let candidates = [
15→ "../bin/agent/main.exe";
16→ "_build/default/bin/agent/main.exe";
17→ ] in
18→ match List.find_opt Sys.file_exists candidates with
19→ | Some p ->
20→ if Filename.is_relative p then Filename.concat (Sys.getcwd ()) p
21→ else p
22→ | None -> failwith "cannot find agent binary"
23→
24→let fixture path =
25→ let candidates = [
26→ "fixtures/" ^ path;
27→ "test/fixtures/" ^ path;
28→ ] in
29→ match List.find_opt Sys.file_exists candidates with
30→ | Some p ->
31→ if Filename.is_relative p then Filename.concat (Sys.getcwd ()) p
32→ else p
33→ | None -> failwith ("cannot find fixture: " ^ path)
34→
35→(** Spawn the agent binary. Returns (stdin_w, stdout_r, stderr_r, pid).
36→ All control, data, and tool communication goes through stdin/stdout
37→ using the muxed envelope protocol. *)
38→let spawn_agent ?(spec = "agent_explicit_provider.plumb") env =
39→ Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
40→ let spec_path = fixture spec in
41→ let stdin_r, stdin_w = Unix.pipe ~cloexec:true () in
42→ let stdout_r, stdout_w = Unix.pipe ~cloexec:true () in
43→ let stderr_r, stderr_w = Unix.pipe ~cloexec:true () in
44→ let base_env = List.filter_map (fun k ->
45→ match Sys.getenv_opt k with
46→ | Some v -> Some (k ^ "=" ^ v)
47→ | None -> None
48→ ) ["PATH"; "HOME"; "LANG"; "TERM"; "TMPDIR"] in
49→ let full_env = Array.of_list (base_env @ Array.to_list env) in
50→ let pid = Unix.fork () in
51→ if pid = 0 then begin
52→ Unix.dup2 stdin_r Unix.stdin;
53→ Unix.dup2 stdout_w Unix.stdout;
54→ Unix.dup2 stderr_w Unix.stderr;
55→ (* Close all pipe fds except 0, 1, 2 *)
56→ List.iter (fun fd ->
57→ if fd <> Unix.stdin && fd <> Unix.stdout && fd <> Unix.stderr then
58→ (try Unix.close fd with Unix.Unix_error _ -> ())
59→ ) [stdin_r; stdin_w; stdout_r; stdout_w; stderr_r; stderr_w];
60→ (try Unix.execve agent_bin [| agent_bin; spec_path |] full_env
61→ with _ -> Unix._exit 2)
62→ end else begin
63→ Unix.close stdin_r; Unix.close stdout_w; Unix.close stderr_w;
64→ (stdin_w, stdout_r, stderr_r, pid)
65→ end
66→
67→(** Read a line from a fd using Unix.select with timeout.
68→ Returns None on timeout or EOF. *)
69→let read_line_timeout ?(timeout=2.0) fd =
70→ let buf = Buffer.create 256 in
71→ let byte = Bytes.create 1 in
72→ let deadline = Unix.gettimeofday () +. timeout in
73→ let rec loop () =
74→ let remaining = deadline -. Unix.gettimeofday () in
75→ if remaining <= 0.0 then None
76→ else begin
77→ let readable, _, _ = Unix.select [fd] [] [] remaining in
78→ if readable = [] then None
79→ else begin
80→ let n = try Unix.read fd byte 0 1 with Unix.Unix_error _ -> 0 in
81→ if n = 0 then
82→ if Buffer.length buf > 0 then Some (Buffer.contents buf)
83→ else None
84→ else begin
85→ let c = Bytes.get byte 0 in
86→ if c = '\n' then Some (Buffer.contents buf)
87→ else begin Buffer.add_char buf c; loop () end
88→ end
89→ end
90→ end
91→ in
92→ loop ()
93→
94→(** Write a string to a fd, ignoring EPIPE *)
95→let write_fd fd s =
96→ let b = Bytes.of_string s in
97→ let len = Bytes.length b in
98→ let rec loop off =
99→ if off >= len then ()
100→ else begin
101→ let n = try Unix.write fd b off (len - off)
102→ with Unix.Unix_error (Unix.EPIPE, _, _) -> len - off in
103→ loop (off + n)
104→ end
105→ in
106→ loop 0
107→
108→(** Send a control message wrapped in a mux envelope *)
109→let send_control stdin_w msg =
110→ let envelope = Printf.sprintf "{\"__port\":\"ctrl_in\",\"msg\":%s}\n" msg in
111→ write_fd stdin_w envelope
112→
113→(** Send a null data message (triggers pause loop entry without data).
114→ The demux thread writes a blank line to the data pipe, which the
115→ conversation loop skips. *)
116→let send_blank_data stdin_w =
117→ write_fd stdin_w "{\"__port\":\"input\",\"msg\":null}\n"
118→
119→(** Signal data input EOF (closes data pipe, keeps control alive) *)
120→let send_data_eof stdin_w =
121→ write_fd stdin_w "{\"__port\":\"input\",\"__eof\":true}\n"
122→
123→(** Read a muxed envelope from stdout, filtering by port.
124→ Skips envelopes for other ports (e.g. telemetry).
125→ Returns the unwrapped msg JSON string, or None on timeout. *)
126→let read_port ?(timeout=3.0) stdout_r port =
127→ let deadline = Unix.gettimeofday () +. timeout in
128→ let rec loop () =
129→ let remaining = deadline -. Unix.gettimeofday () in
130→ if remaining <= 0.0 then None
131→ else match read_line_timeout ~timeout:remaining stdout_r with
132→ | None -> None
133→ | Some line ->
134→ (match Yojson.Safe.from_string line with
135→ | `Assoc fields ->
136→ let p = match List.assoc_opt "__port" fields with
137→ | Some (`String s) -> s | _ -> "" in
138→ if p = port then
139→ match List.assoc_opt "msg" fields with
140→ | Some m -> Some (Yojson.Safe.to_string m)
141→ | None -> loop ()
142→ else
143→ loop () (* skip other ports *)
144→ | _ -> loop ()
145→ | exception _ -> loop ())
146→ in
147→ loop ()
148→
149→(** Read a control response (ctrl_out port) *)
150→let read_ctrl_out ?(timeout=3.0) stdout_r =
151→ read_port ~timeout stdout_r "ctrl_out"
152→
153→(** Read all available stderr *)
154→let drain_stderr stderr_r =
155→ let buf = Buffer.create 1024 in
156→ let chunk = Bytes.create 4096 in
157→ let rec loop () =
158→ let readable, _, _ = Unix.select [stderr_r] [] [] 0.1 in
159→ if readable <> [] then begin
160→ let n = try Unix.read stderr_r chunk 0 4096 with Unix.Unix_error _ -> 0 in
161→ if n > 0 then begin
162→ Buffer.add_subbytes buf chunk 0 n; loop ()
163→ end
164→ end
165→ in
166→ loop (); Buffer.contents buf
167→
168→(** Check that a string contains a substring *)
169→let contains haystack needle =
170→ let re = Str.regexp_string needle in
171→ try ignore (Str.search_forward re haystack 0); true
172→ with Not_found -> false
173→
174→(** Wait for child with timeout. Sends SIGKILL on timeout. *)
175→let wait_child ?(timeout=5.0) pid =
176→ let deadline = Unix.gettimeofday () +. timeout in
177→ let rec poll () =
178→ let remaining = deadline -. Unix.gettimeofday () in
179→ if remaining <= 0.0 then begin
180→ (try Unix.kill pid Sys.sigkill with Unix.Unix_error _ -> ());
181→ let _, status = Unix.waitpid [] pid in
182→ match status with
183→ | Unix.WEXITED c -> c
184→ | Unix.WSIGNALED s -> 128 + s
185→ | Unix.WSTOPPED _ -> 128
186→ end else begin
187→ match Unix.waitpid [Unix.WNOHANG] pid with
188→ | 0, _ ->
189→ Unix.sleepf 0.05;
190→ poll ()
191→ | _, status ->
192→ (match status with
193→ | Unix.WEXITED c -> c
194→ | Unix.WSIGNALED s -> 128 + s
195→ | Unix.WSTOPPED _ -> 128)
196→ end
197→ in
198→ poll ()
199→
200→(** Clean up fds from a spawn *)
201→let cleanup stdin_w stdout_r stderr_r =
202→ (try Unix.close stdin_w with _ -> ());
203→ (try Unix.close stdout_r with _ -> ());
204→ (try Unix.close stderr_r with _ -> ())
205→
206→(** {1 Stop tests} *)
207→
208→let test_stop_via_control () =
209→ let stdin_w, stdout_r, stderr_r, pid =
210→ spawn_agent
211→ ~spec:"agent_explicit_provider.plumb"
212→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
213→ in
214→ Unix.sleepf 0.3;
215→ send_control stdin_w {|{"stop": true}|};
216→ Unix.sleepf 0.1;
217→ Unix.close stdin_w;
218→ let code = wait_child pid in
219→ let stderr = drain_stderr stderr_r in
220→ cleanup stdin_w stdout_r stderr_r;
221→ if code <> 0 then
222→ Alcotest.failf "agent exit %d; stderr: %s" code stderr
223→
224→let test_stdin_eof_sets_stop () =
225→ let stdin_w, stdout_r, stderr_r, pid =
226→ spawn_agent
227→ ~spec:"agent_explicit_provider.plumb"
228→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
229→ in
230→ Unix.sleepf 0.3;
231→ (* Close stdin — triggers EOF on all mux channels, agent exits *)
232→ Unix.close stdin_w;
233→ let code = wait_child pid in
234→ cleanup stdin_w stdout_r stderr_r;
235→ Alcotest.(check int) "agent exits cleanly" 0 code
236→
237→(** {1 Memory tests} *)
238→
239→let test_get_memory_empty () =
240→ let stdin_w, stdout_r, stderr_r, pid =
241→ spawn_agent
242→ ~spec:"agent_explicit_provider.plumb"
243→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
244→ in
245→ Unix.sleepf 0.3;
246→ (* Pause, send blank data to unblock input_line, enter pause loop *)
247→ send_control stdin_w {|{"pause": true}|};
248→ Unix.sleepf 0.1;
249→ send_blank_data stdin_w;
250→ Unix.sleepf 0.2;
251→ (* Consume pause_ack *)
252→ let pause_ack = read_ctrl_out ~timeout:3.0 stdout_r in
253→ (match pause_ack with
254→ | None -> Alcotest.fail "no pause_ack on ctrl_out"
255→ | Some line ->
256→ let json = Yojson.Safe.from_string line in
257→ let open Yojson.Safe.Util in
258→ Alcotest.(check string) "pause_ack kind"
259→ "pause_ack" (json |> member "kind" |> to_string));
260→ send_control stdin_w {|{"get_memory": true}|};
261→ let response = read_ctrl_out ~timeout:3.0 stdout_r in
262→ send_control stdin_w {|{"stop": true}|};
263→ Unix.close stdin_w;
264→ let _ = wait_child pid in
265→ cleanup stdin_w stdout_r stderr_r;
266→ match response with
267→ | None -> Alcotest.fail "no response for get_memory"
268→ | Some line ->
269→ let json = Yojson.Safe.from_string line in
270→ let open Yojson.Safe.Util in
271→ Alcotest.(check string) "kind is memory"
272→ "memory" (json |> member "kind" |> to_string);
273→ Alcotest.(check int) "empty history"
274→ 0 (json |> member "messages" |> to_list |> List.length)
275→
276→let test_set_then_get_memory () =
277→ let stdin_w, stdout_r, stderr_r, pid =
278→ spawn_agent
279→ ~spec:"agent_explicit_provider.plumb"
280→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
281→ in
282→ Unix.sleepf 0.3;
283→ send_control stdin_w {|{"pause": true}|};
284→ Unix.sleepf 0.1;
285→ send_blank_data stdin_w;
286→ Unix.sleepf 0.2;
287→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* pause_ack *)
288→ send_control stdin_w
289→ {|{"set_memory": [{"role":"user","content":"\"hello\""},{"role":"assistant","content":"\"world\""}]}|};
290→ let ack = read_ctrl_out ~timeout:3.0 stdout_r in
291→ (match ack with
292→ | None -> Alcotest.fail "no ack for set_memory"
293→ | Some line ->
294→ let json = Yojson.Safe.from_string line in
295→ let open Yojson.Safe.Util in
296→ Alcotest.(check string) "kind is memory_set"
297→ "memory_set" (json |> member "kind" |> to_string);
298→ Alcotest.(check int) "old_messages is 0"
299→ 0 (json |> member "old_messages" |> to_int);
300→ Alcotest.(check int) "new_messages is 2"
301→ 2 (json |> member "new_messages" |> to_int));
302→ send_control stdin_w {|{"get_memory": true}|};
303→ let response = read_ctrl_out ~timeout:3.0 stdout_r in
304→ send_control stdin_w {|{"stop": true}|};
305→ Unix.close stdin_w;
306→ let _ = wait_child pid in
307→ cleanup stdin_w stdout_r stderr_r;
308→ match response with
309→ | None -> Alcotest.fail "no response for get_memory"
310→ | Some line ->
311→ let json = Yojson.Safe.from_string line in
312→ let open Yojson.Safe.Util in
313→ Alcotest.(check string) "kind is memory"
314→ "memory" (json |> member "kind" |> to_string);
315→ let msgs = json |> member "messages" |> to_list in
316→ Alcotest.(check int) "2 messages" 2 (List.length msgs);
317→ Alcotest.(check string) "first role"
318→ "user" (List.nth msgs 0 |> member "role" |> to_string);
319→ Alcotest.(check string) "second role"
320→ "assistant" (List.nth msgs 1 |> member "role" |> to_string)
321→
322→let test_set_memory_empty () =
323→ let stdin_w, stdout_r, stderr_r, pid =
324→ spawn_agent
325→ ~spec:"agent_explicit_provider.plumb"
326→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
327→ in
328→ Unix.sleepf 0.3;
329→ send_control stdin_w {|{"pause": true}|};
330→ Unix.sleepf 0.1;
331→ send_blank_data stdin_w;
332→ Unix.sleepf 0.2;
333→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* pause_ack *)
334→ send_control stdin_w
335→ {|{"set_memory": [{"role":"user","content":"\"hello\""}]}|};
336→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* memory_set ack *)
337→ send_control stdin_w {|{"set_memory": []}|};
338→ let ack = read_ctrl_out ~timeout:3.0 stdout_r in
339→ (match ack with
340→ | None -> Alcotest.fail "no ack for empty set_memory"
341→ | Some line ->
342→ let json = Yojson.Safe.from_string line in
343→ let open Yojson.Safe.Util in
344→ Alcotest.(check int) "old_messages is 1"
345→ 1 (json |> member "old_messages" |> to_int);
346→ Alcotest.(check int) "new_messages is 0"
347→ 0 (json |> member "new_messages" |> to_int));
348→ send_control stdin_w {|{"get_memory": true}|};
349→ let response = read_ctrl_out ~timeout:3.0 stdout_r in
350→ send_control stdin_w {|{"stop": true}|};
351→ Unix.close stdin_w;
352→ let _ = wait_child pid in
353→ cleanup stdin_w stdout_r stderr_r;
354→ match response with
355→ | None -> Alcotest.fail "no get_memory response"
356→ | Some line ->
357→ let json = Yojson.Safe.from_string line in
358→ let open Yojson.Safe.Util in
359→ Alcotest.(check int) "history cleared"
360→ 0 (json |> member "messages" |> to_list |> List.length)
361→
362→let test_set_memory_malformed_rejected () =
363→ let stdin_w, stdout_r, stderr_r, pid =
364→ spawn_agent
365→ ~spec:"agent_explicit_provider.plumb"
366→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
367→ in
368→ Unix.sleepf 0.3;
369→ send_control stdin_w {|{"pause": true}|};
370→ Unix.sleepf 0.1;
371→ send_blank_data stdin_w;
372→ Unix.sleepf 0.2;
373→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* pause_ack *)
374→ send_control stdin_w {|{"set_memory": [{"role":"user"}]}|};
375→ let response = read_ctrl_out ~timeout:1.0 stdout_r in
376→ send_control stdin_w {|{"get_memory": true}|};
377→ let memory = read_ctrl_out ~timeout:3.0 stdout_r in
378→ send_control stdin_w {|{"stop": true}|};
379→ Unix.close stdin_w;
380→ let _ = wait_child pid in
381→ let stderr = drain_stderr stderr_r in
382→ cleanup stdin_w stdout_r stderr_r;
383→ Alcotest.(check bool) "no ack for malformed payload"
384→ true (response = None);
385→ Alcotest.(check bool) "stderr warns about malformed"
386→ true (contains stderr "set_memory_malformed");
387→ (match memory with
388→ | None -> Alcotest.fail "no get_memory response"
389→ | Some line ->
390→ let json = Yojson.Safe.from_string line in
391→ let open Yojson.Safe.Util in
392→ Alcotest.(check int) "history unchanged (empty)"
393→ 0 (json |> member "messages" |> to_list |> List.length))
394→
395→(** {1 Pause tests} *)
396→
397→let test_pause_get_set_resume () =
398→ let stdin_w, stdout_r, stderr_r, pid =
399→ spawn_agent
400→ ~spec:"agent_explicit_provider.plumb"
401→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
402→ in
403→ Unix.sleepf 0.3;
404→ send_control stdin_w {|{"pause": true}|};
405→ Unix.sleepf 0.1;
406→ send_blank_data stdin_w;
407→ Unix.sleepf 0.2;
408→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* pause_ack *)
409→ send_control stdin_w {|{"get_memory": true}|};
410→ let response = read_ctrl_out ~timeout:3.0 stdout_r in
411→ (match response with
412→ | None -> Alcotest.fail "no get_memory response while paused"
413→ | Some line ->
414→ let json = Yojson.Safe.from_string line in
415→ let open Yojson.Safe.Util in
416→ Alcotest.(check string) "kind is memory"
417→ "memory" (json |> member "kind" |> to_string));
418→ send_control stdin_w
419→ {|{"set_memory": [{"role":"user","content":"\"compacted\""}]}|};
420→ let ack = read_ctrl_out ~timeout:3.0 stdout_r in
421→ (match ack with
422→ | None -> Alcotest.fail "no ack while paused"
423→ | Some line ->
424→ let json = Yojson.Safe.from_string line in
425→ let open Yojson.Safe.Util in
426→ Alcotest.(check string) "kind is memory_set"
427→ "memory_set" (json |> member "kind" |> to_string));
428→ send_control stdin_w {|{"resume": true}|};
429→ Unix.sleepf 0.1;
430→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* resume_ack *)
431→ send_control stdin_w {|{"stop": true}|};
432→ Unix.close stdin_w;
433→ let code = wait_child pid in
434→ cleanup stdin_w stdout_r stderr_r;
435→ Alcotest.(check int) "agent exits cleanly" 0 code
436→
437→let test_resume_without_pause () =
438→ let stdin_w, stdout_r, stderr_r, pid =
439→ spawn_agent
440→ ~spec:"agent_explicit_provider.plumb"
441→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
442→ in
443→ Unix.sleepf 0.3;
444→ send_control stdin_w {|{"resume": true}|};
445→ Unix.sleepf 0.1;
446→ send_control stdin_w {|{"pause": true}|};
447→ Unix.sleepf 0.1;
448→ send_blank_data stdin_w;
449→ Unix.sleepf 0.2;
450→ let _ = read_ctrl_out ~timeout:3.0 stdout_r in (* pause_ack *)
451→ send_control stdin_w {|{"get_memory": true}|};
452→ let response = read_ctrl_out ~timeout:3.0 stdout_r in
453→ send_control stdin_w {|{"stop": true}|};
454→ Unix.close stdin_w;
455→ let code = wait_child pid in
456→ cleanup stdin_w stdout_r stderr_r;
457→ Alcotest.(check bool) "got memory response" true (response <> None);
458→ Alcotest.(check int) "agent exits cleanly" 0 code
459→
460→let test_stop_while_paused () =
461→ let stdin_w, stdout_r, stderr_r, pid =
462→ spawn_agent
463→ ~spec:"agent_explicit_provider.plumb"
464→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
465→ in
466→ Unix.sleepf 0.3;
467→ send_control stdin_w {|{"pause": true}|};
468→ Unix.sleepf 0.1;
469→ send_blank_data stdin_w;
470→ Unix.sleepf 0.2;
471→ send_control stdin_w {|{"stop": true}|};
472→ Unix.close stdin_w;
473→ let code = wait_child pid in
474→ cleanup stdin_w stdout_r stderr_r;
475→ Alcotest.(check int) "exits while paused" 0 code
476→
477→let test_stdin_eof_while_paused () =
478→ let stdin_w, stdout_r, stderr_r, pid =
479→ spawn_agent
480→ ~spec:"agent_explicit_provider.plumb"
481→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
482→ in
483→ Unix.sleepf 0.3;
484→ send_control stdin_w {|{"pause": true}|};
485→ Unix.sleepf 0.1;
486→ send_blank_data stdin_w;
487→ Unix.sleepf 0.2;
488→ (* Close stdin — closes all mux channels, agent exits *)
489→ Unix.close stdin_w;
490→ let code = wait_child pid in
491→ cleanup stdin_w stdout_r stderr_r;
492→ Alcotest.(check int) "exits on EOF while paused" 0 code
493→
494→(** {1 Drain tests — protocol handling after data EOF} *)
495→
496→let test_drain_completes_protocol () =
497→ let stdin_w, stdout_r, stderr_r, pid =
498→ spawn_agent
499→ ~spec:"agent_explicit_provider.plumb"
500→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
501→ in
502→ Unix.sleepf 0.3;
503→ (* Signal data EOF — agent enters drain loop, but ctrl stays open *)
504→ send_data_eof stdin_w;
505→ Unix.sleepf 0.2;
506→ (* Send pause — agent should handle it in drain loop *)
507→ send_control stdin_w {|{"pause": true}|};
508→ let ack = read_ctrl_out ~timeout:3.0 stdout_r in
509→ (match ack with
510→ | None -> Alcotest.fail "no pause_ack in drain mode"
511→ | Some line ->
512→ let json = Yojson.Safe.from_string line in
513→ let open Yojson.Safe.Util in
514→ Alcotest.(check string) "pause_ack kind"
515→ "pause_ack" (json |> member "kind" |> to_string));
516→ send_control stdin_w {|{"get_memory": true}|};
517→ let mem = read_ctrl_out ~timeout:3.0 stdout_r in
518→ (match mem with
519→ | None -> Alcotest.fail "no get_memory response in drain mode"
520→ | Some line ->
521→ let json = Yojson.Safe.from_string line in
522→ let open Yojson.Safe.Util in
523→ Alcotest.(check string) "memory kind"
524→ "memory" (json |> member "kind" |> to_string));
525→ send_control stdin_w {|{"resume": true}|};
526→ let resume = read_ctrl_out ~timeout:3.0 stdout_r in
527→ (match resume with
528→ | None -> Alcotest.fail "no resume_ack in drain mode"
529→ | Some line ->
530→ let json = Yojson.Safe.from_string line in
531→ let open Yojson.Safe.Util in
532→ Alcotest.(check string) "resume_ack kind"
533→ "resume_ack" (json |> member "kind" |> to_string));
534→ (* Close stdin — ctrl EOF triggers agent exit *)
535→ Unix.close stdin_w;
536→ let code = wait_child pid in
537→ cleanup stdin_w stdout_r stderr_r;
538→ Alcotest.(check int) "drain completes protocol" 0 code
539→
540→let test_drain_exits_on_stdin_eof () =
541→ let stdin_w, stdout_r, stderr_r, pid =
542→ spawn_agent
543→ ~spec:"agent_explicit_provider.plumb"
544→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
545→ in
546→ Unix.sleepf 0.3;
547→ (* Close stdin — EOF on all channels, agent exits *)
548→ Unix.close stdin_w;
549→ let code = wait_child pid in
550→ cleanup stdin_w stdout_r stderr_r;
551→ Alcotest.(check int) "drain exits on stdin EOF" 0 code
552→
553→let test_drain_handles_memory_without_pause () =
554→ let stdin_w, stdout_r, stderr_r, pid =
555→ spawn_agent
556→ ~spec:"agent_explicit_provider.plumb"
557→ [| "ANTHROPIC_API_KEY=test-key-not-real" |]
558→ in
559→ Unix.sleepf 0.3;
560→ (* Signal data EOF *)
561→ send_data_eof stdin_w;
562→ Unix.sleepf 0.2;
563→ (* Send get_memory without pause — exercises non-paused drain path *)
564→ send_control stdin_w {|{"get_memory": true}|};
565→ let mem = read_ctrl_out ~timeout:3.0 stdout_r in
566→ (match mem with
567→ | None -> Alcotest.fail "no get_memory response in drain (no pause)"
568→ | Some line ->
569→ let json = Yojson.Safe.from_string line in
570→ let open Yojson.Safe.Util in
571→ Alcotest.(check string) "memory kind"
572→ "memory" (json |> member "kind" |> to_string));
573→ Unix.close stdin_w;
574→ let code = wait_child pid in
575→ cleanup stdin_w stdout_r stderr_r;
576→ Alcotest.(check int) "drain handles memory without pause" 0 code
577→
578→let () =
579→ Alcotest.run "agent-control" [
580→ "stop", [
581→ Alcotest.test_case "stop via control" `Quick
582→ test_stop_via_control;
583→ Alcotest.test_case "stdin EOF sets stop" `Quick
584→ test_stdin_eof_sets_stop;
585→ ];
586→ "memory", [
587→ Alcotest.test_case "get_memory empty history" `Quick
588→ test_get_memory_empty;
589→ Alcotest.test_case "set_memory then get_memory" `Quick
590→ test_set_then_get_memory;
591→ Alcotest.test_case "set_memory empty clears history" `Quick
592→ test_set_memory_empty;
593→ Alcotest.test_case "set_memory malformed rejected" `Quick
594→ test_set_memory_malformed_rejected;
595→ ];
596→ "pause", [
597→ Alcotest.test_case "pause get set resume flow" `Quick
598→ test_pause_get_set_resume;
599→ Alcotest.test_case "resume without pause is no-op" `Quick
600→ test_resume_without_pause;
601→ Alcotest.test_case "stop while paused" `Quick
602→ test_stop_while_paused;
603→ Alcotest.test_case "stdin EOF while paused" `Quick
604→ test_stdin_eof_while_paused;
605→ ];
606→ "drain", [
607→ Alcotest.test_case "drain completes protocol" `Quick
608→ test_drain_completes_protocol;
609→ Alcotest.test_case "drain exits on stdin EOF" `Quick
610→ test_drain_exits_on_stdin_eof;
611→ Alcotest.test_case "drain handles memory without pause" `Quick
612→ test_drain_handles_memory_without_pause;
613→ ];
614→ ]
615→ 1→(library
2→ (name fabric)
3→ (public_name plumbing.fabric)
4→ (libraries transport supervisor runtime language spawn_lib yojson eio eio.unix unix threads.posix))
5→ 1→(* ZeroMQ implementation of the TRANSPORT signature.
2→ Uses Eio_unix.run_in_systhread for non-blocking recv/send,
3→ NOT zmq-eio (which has bugs and cleanup issues). See PLMB-037.
4→
5→ Each blocking ZMQ operation runs in a system thread via
6→ Eio_unix.run_in_systhread, yielding the Eio scheduler while
7→ waiting. No persistent event loop fibres — clean shutdown. *)
8→
9→open Transport_intf
10→
11→(* All sockets are wrapped in an existential. The phantom type
12→ parameter is enforced by the module type ascription. *)
13→
14→type zmq_socket =
15→ | Zmq_push : [`Push] Zmq.Socket.t -> zmq_socket
16→ | Zmq_pull : [`Pull] Zmq.Socket.t -> zmq_socket
17→ | Zmq_pub : [`Pub] Zmq.Socket.t -> zmq_socket
18→ | Zmq_sub : [`Sub] Zmq.Socket.t -> zmq_socket
19→ | Zmq_pair : [`Pair] Zmq.Socket.t -> zmq_socket
20→
21→type _ socket = zmq_socket
22→
23→type context = Zmq.Context.t
24→
25→let wrap_zmq_exn f =
26→ try f () with
27→ | Zmq.ZMQ_exception (_, msg) ->
28→ raise (Transport_error (Internal msg, msg))
29→ | Unix.Unix_error (code, fn, arg) ->
30→ let msg = Printf.sprintf "%s: %s(%s)" (Unix.error_message code) fn arg in
31→ raise (Transport_error (Internal msg, msg))
32→
33→let create_context () = wrap_zmq_exn Zmq.Context.create
34→
35→let destroy_context ctx =
36→ try Zmq.Context.terminate ctx with
37→ | Zmq.ZMQ_exception _ -> ()
38→
39→(* Socket creation.
40→ Sets linger=0 so close doesn't block waiting for pending messages.
41→ Sets receive timeout=1000ms so recv unblocks periodically, allowing
42→ clean shutdown (the recv loop retries until the socket is closed). *)
43→let make_socket ctx kind =
44→ let s = Zmq.Socket.create ctx kind in
45→ Zmq.Socket.set_linger_period s 0;
46→ Zmq.Socket.set_receive_timeout s 1000;
47→ s
48→
49→let push_socket ctx ~sw:_ = Zmq_push (make_socket ctx Zmq.Socket.push)
50→let pull_socket ctx ~sw:_ = Zmq_pull (make_socket ctx Zmq.Socket.pull)
51→let pub_socket ctx ~sw:_ = Zmq_pub (make_socket ctx Zmq.Socket.pub)
52→let sub_socket ctx ~sw:_ = Zmq_sub (make_socket ctx Zmq.Socket.sub)
53→let pair_socket ctx ~sw:_ = Zmq_pair (make_socket ctx Zmq.Socket.pair)
54→
55→(* Bind/connect/disconnect — non-blocking, run directly *)
56→let bind_raw : type z. z Zmq.Socket.t -> endpoint -> unit =
57→ fun s ep -> wrap_zmq_exn (fun () -> Zmq.Socket.bind s ep)
58→
59→let connect_raw : type z. z Zmq.Socket.t -> endpoint -> unit =
60→ fun s ep -> wrap_zmq_exn (fun () -> Zmq.Socket.connect s ep)
61→
62→let disconnect_raw : type z. z Zmq.Socket.t -> endpoint -> unit =
63→ fun s ep -> wrap_zmq_exn (fun () -> Zmq.Socket.disconnect s ep)
64→
65→let bind sock ep = match sock with
66→ | Zmq_push s -> bind_raw s ep | Zmq_pull s -> bind_raw s ep
67→ | Zmq_pub s -> bind_raw s ep | Zmq_sub s -> bind_raw s ep
68→ | Zmq_pair s -> bind_raw s ep
69→
70→let connect sock ep = match sock with
71→ | Zmq_push s -> connect_raw s ep | Zmq_pull s -> connect_raw s ep
72→ | Zmq_pub s -> connect_raw s ep | Zmq_sub s -> connect_raw s ep
73→ | Zmq_pair s -> connect_raw s ep
74→
75→let disconnect sock ep = match sock with
76→ | Zmq_push s -> disconnect_raw s ep | Zmq_pull s -> disconnect_raw s ep
77→ | Zmq_pub s -> disconnect_raw s ep | Zmq_sub s -> disconnect_raw s ep
78→ | Zmq_pair s -> disconnect_raw s ep
79→
80→let subscribe sock topic = match sock with
81→ | Zmq_sub s -> wrap_zmq_exn (fun () -> Zmq.Socket.subscribe s topic)
82→ | _ -> invalid_arg "subscribe: not a SUB socket"
83→
84→let unsubscribe sock topic = match sock with
85→ | Zmq_sub s -> wrap_zmq_exn (fun () -> Zmq.Socket.unsubscribe s topic)
86→ | _ -> invalid_arg "unsubscribe: not a SUB socket"
87→
88→(* Single-frame envelope encoding: <4-byte topic length><topic><payload> *)
89→
90→let encode_frame ~topic payload =
91→ let tlen = String.length topic in
92→ let buf = Buffer.create (4 + tlen + String.length payload) in
93→ Buffer.add_char buf (Char.chr ((tlen lsr 24) land 0xff));
94→ Buffer.add_char buf (Char.chr ((tlen lsr 16) land 0xff));
95→ Buffer.add_char buf (Char.chr ((tlen lsr 8) land 0xff));
96→ Buffer.add_char buf (Char.chr (tlen land 0xff));
97→ Buffer.add_string buf topic;
98→ Buffer.add_string buf payload;
99→ Buffer.contents buf
100→
101→let decode_frame frame =
102→ let len = String.length frame in
103→ if len < 4 then
104→ raise (Transport_error (Protocol_error, "frame too short for envelope header"));
105→ let tlen =
106→ (Char.code frame.[0] lsl 24)
107→ lor (Char.code frame.[1] lsl 16)
108→ lor (Char.code frame.[2] lsl 8)
109→ lor (Char.code frame.[3])
110→ in
111→ if 4 + tlen > len then
112→ raise (Transport_error (Protocol_error,
113→ Printf.sprintf "topic length %d exceeds frame size %d" tlen len));
114→ let topic = String.sub frame 4 tlen in
115→ let payload = String.sub frame (4 + tlen) (len - 4 - tlen) in
116→ (topic, payload)
117→
118→(* Send: non-blocking send with fd-based waiting for backpressure. *)
119→let send_raw : type z. z Zmq.Socket.t -> string -> unit =
120→ fun s frame ->
121→ let fd = Zmq.Socket.get_fd s in
122→ let rec loop () =
123→ try Zmq.Socket.send ~block:false s frame
124→ with Unix.Unix_error (Unix.EAGAIN, _, _) ->
125→ Eio_unix.await_writable fd;
126→ loop ()
127→ in
128→ loop ()
129→
130→let send sock ~topic payload =
131→ let frame = encode_frame ~topic payload in
132→ match sock with
133→ | Zmq_push s -> send_raw s frame
134→ | Zmq_pub s -> send_raw s frame
135→ | Zmq_pair s -> send_raw s frame
136→ | _ -> invalid_arg "send: not a sendable socket"
137→
138→(* Recv: uses ZMQ's fd + Eio_unix.await_readable for proper
139→ cancellation support. The ZMQ fd becomes readable when there
140→ are events to process; we then try a non-blocking recv. *)
141→let recv_raw : type z. z Zmq.Socket.t -> string =
142→ fun s ->
143→ let fd = Zmq.Socket.get_fd s in
144→ let rec loop () =
145→ (* Check if data is available without blocking *)
146→ match Zmq.Socket.events s with
147→ | Zmq.Socket.Poll_in | Zmq.Socket.Poll_in_out ->
148→ (try Zmq.Socket.recv ~block:false s
149→ with Unix.Unix_error (Unix.EAGAIN, _, _) ->
150→ (* Spurious — fd was ready but recv got EAGAIN.
151→ Wait for next notification. *)
152→ Eio_unix.await_readable fd;
153→ loop ())
154→ | _ ->
155→ (* No data ready — wait for the fd to become readable.
156→ This yields to Eio and is cancellable. *)
157→ Eio_unix.await_readable fd;
158→ loop ()
159→ in
160→ loop ()
161→
162→let recv sock =
163→ let frame = match sock with
164→ | Zmq_pull s -> recv_raw s
165→ | Zmq_sub s -> recv_raw s
166→ | Zmq_pair s -> recv_raw s
167→ | _ -> invalid_arg "recv: not a receivable socket"
168→ in
169→ decode_frame frame
170→
171→let try_recv sock =
172→ let try_on : type z. z Zmq.Socket.t -> (topic * string) option = fun s ->
173→ try
174→ let frame = Zmq.Socket.recv ~block:false s in
175→ Some (decode_frame frame)
176→ with Unix.Unix_error (Unix.EAGAIN, _, _) -> None
177→ in
178→ match sock with
179→ | Zmq_pull s -> try_on s
180→ | Zmq_sub s -> try_on s
181→ | Zmq_pair s -> try_on s
182→ | _ -> invalid_arg "try_recv: not a receivable socket"
183→
184→(* Multipart encoding as single frame *)
185→let send_parts sock parts =
186→ let buf = Buffer.create 256 in
187→ let n = List.length parts in
188→ Buffer.add_char buf (Char.chr ((n lsr 8) land 0xff));
189→ Buffer.add_char buf (Char.chr (n land 0xff));
190→ List.iter (fun part ->
191→ let plen = String.length part in
192→ Buffer.add_char buf (Char.chr ((plen lsr 24) land 0xff));
193→ Buffer.add_char buf (Char.chr ((plen lsr 16) land 0xff));
194→ Buffer.add_char buf (Char.chr ((plen lsr 8) land 0xff));
195→ Buffer.add_char buf (Char.chr (plen land 0xff));
196→ Buffer.add_string buf part
197→ ) parts;
198→ let frame = Buffer.contents buf in
199→ match sock with
200→ | Zmq_push s -> send_raw s frame
201→ | Zmq_pub s -> send_raw s frame
202→ | Zmq_pair s -> send_raw s frame
203→ | _ -> invalid_arg "send_parts: not a sendable socket"
204→
205→let recv_parts sock =
206→ let frame = match sock with
207→ | Zmq_pull s -> recv_raw s
208→ | Zmq_sub s -> recv_raw s
209→ | Zmq_pair s -> recv_raw s
210→ | _ -> invalid_arg "recv_parts: not a receivable socket"
211→ in
212→ let len = String.length frame in
213→ if len < 2 then raise (Transport_error (Protocol_error, "parts frame too short"));
214→ let n = (Char.code frame.[0] lsl 8) lor (Char.code frame.[1]) in
215→ let pos = ref 2 in
216→ let parts = ref [] in
217→ for _ = 1 to n do
218→ if !pos + 4 > len then
219→ raise (Transport_error (Protocol_error, "truncated parts frame"));
220→ let plen =
221→ (Char.code frame.[!pos] lsl 24)
222→ lor (Char.code frame.[!pos + 1] lsl 16)
223→ lor (Char.code frame.[!pos + 2] lsl 8)
224→ lor (Char.code frame.[!pos + 3])
225→ in
226→ pos := !pos + 4;
227→ if !pos + plen > len then
228→ raise (Transport_error (Protocol_error, "truncated part data"));
229→ parts := String.sub frame !pos plen :: !parts;
230→ pos := !pos + plen
231→ done;
232→ List.rev !parts
233→
234→(* Monitor — simplified, uses polling instead of zmq-eio *)
235→let monitor _ctx ~sw sock : event Eio.Stream.t =
236→ let stream = Eio.Stream.create 64 in
237→ let poll_raw : type z. z Zmq.Socket.t -> unit = fun raw ->
238→ let mon = Zmq.Monitor.create raw in
239→ let mon_sock = Zmq.Monitor.connect _ctx mon in
240→ Eio.Fiber.fork_daemon ~sw (fun () ->
241→ let rec loop () =
242→ let zmq_event =
243→ Eio_unix.run_in_systhread (fun () ->
244→ Zmq.Monitor.recv mon_sock)
245→ in
246→ let our_event = match zmq_event with
247→ | Zmq.Monitor.Connected (addr, _) -> Some (Peer_connected addr)
248→ | Zmq.Monitor.Disconnected (addr, _) -> Some (Peer_disconnected addr)
249→ | Zmq.Monitor.Connect_delayed addr ->
250→ Some (Connect_failed (addr, "connect delayed"))
251→ | Zmq.Monitor.Bind_failed (addr, errno, msg) ->
252→ Some (Bind_failed_event (addr,
253→ Printf.sprintf "errno=%d: %s" errno msg))
254→ | Zmq.Monitor.Monitor_stopped _ -> None
255→ | _ -> None
256→ in
257→ (match our_event with
258→ | Some e -> Eio.Stream.add stream e
259→ | None -> ());
260→ match zmq_event with
261→ | Zmq.Monitor.Monitor_stopped _ -> `Stop_daemon
262→ | _ -> loop ()
263→ in
264→ loop ())
265→ in
266→ (match sock with
267→ | Zmq_push s -> poll_raw s | Zmq_pull s -> poll_raw s
268→ | Zmq_pub s -> poll_raw s | Zmq_sub s -> poll_raw s
269→ | Zmq_pair s -> poll_raw s);
270→ stream
271→
272→let close sock =
273→ let close_raw : type z. z Zmq.Socket.t -> unit = fun s ->
274→ (try Zmq.Socket.close s with Zmq.ZMQ_exception _ -> ())
275→ in
276→ match sock with
277→ | Zmq_push s -> close_raw s | Zmq_pull s -> close_raw s
278→ | Zmq_pub s -> close_raw s | Zmq_sub s -> close_raw s
279→ | Zmq_pair s -> close_raw s
280→
281→let force_close = close (* Same as close — no event loop to bypass *)
282→
283→let set_hwm_raw : type z. z Zmq.Socket.t -> unit =
284→ fun s ->
285→ Zmq.Socket.set_send_high_water_mark s 0;
286→ Zmq.Socket.set_receive_high_water_mark s 0
287→
288→let set_unlimited_hwm sock = match sock with
289→ | Zmq_push s -> set_hwm_raw s | Zmq_pull s -> set_hwm_raw s
290→ | Zmq_pub s -> set_hwm_raw s | Zmq_sub s -> set_hwm_raw s
291→ | Zmq_pair s -> set_hwm_raw s
292→ 1→# Agent runtime
2→
3→Runtime behaviour of the agent process. For primitive semantics and
4→configuration, see [processes.md](processes.md). For the multiplexed I/O
5→envelope format, see [processes.md § Multiplexed I/O](processes.md#multiplexed-io).
6→
7→## Environment variables
8→
9→| Variable | Description |
10→|----------|-------------|
11→| `PLUMB_PROVIDER` | Default LLM provider for agents without explicit `provider` in config. Accepted values: `"anthropic"`, `"openai"`. Set via `--provider` flag on `chat`/`plumb`. |
12→| `PLUMB_MODEL` | Default LLM model for agents without explicit `model` in config. Set via `--model` flag on `chat`/`plumb`. |
13→| `ANTHROPIC_API_KEY` | Required when `provider = "anthropic"`. API key for the Anthropic Messages API. |
14→| `OPENAI_API_KEY` | Required when `provider = "openai"`. API key for the OpenAI Chat Completions API. |
15→| `PIPELINE_DEBUG` | Set to `1` to enable debug logging. Per-message conversation transcript and API metadata are written as JSON Lines to stderr. Default: off (only warnings and errors). |
16→| `PLUMB_MCP_TOOLS` | Path to temp JSON file containing MCP tool schemas. Set by plumb runtime when MCP servers are configured. Internal — not user-facing. |
17→
18→When `PIPELINE_DEBUG=1`, the agent logs each new message in the conversation (including retry messages that don't carry forward into history) and API call metadata:
19→
20→```sh
21→echo '"hello"' | PIPELINE_DEBUG=1 agent spec.plumb
22→```
23→
24→Each log line is a JSON object with `"log"` (level) and `"event"` (name) fields, distinct from both error output and thinking output. Example:
25→
26→```json
27→{"log":"debug","event":"message","role":"user","content":"\"hello\""}
28→{"log":"debug","event":"api_request","model":"claude-sonnet-4-5-20250514","max_tokens":8192,"thinking_budget":null,"message_count":1}
29→{"log":"debug","event":"message","role":"assistant","content":[{"type":"text","text":"{\"greeting\":\"hi\"}"}]}
30→{"log":"debug","event":"api_response","status":200,"content_blocks":1,"stop_reason":"end_turn","input_tokens":150,"output_tokens":42}
31→```
32→
33→Retry messages are tagged with `"retry":true` so they can be distinguished from messages that persist in history.
34→
35→The environment variable propagates through pipelines — all processes in a composed pipeline inherit it.
36→
37→## Telemetry
38→
39→The agent writes structured telemetry as `__port: "telemetry"` envelopes
40→on stdout (see [processes.md § Multiplexed I/O](processes.md#multiplexed-io)).
41→Every telemetry event has a `kind` field that identifies the event type.
42→The event kinds are:
43→
44→| Kind | Description | Key fields |
45→|------|-------------|------------|
46→| `config` | Agent configuration at startup | `model`, `provider`, `max_tokens`, `thinking_budget`, `tools` |
47→| `usage` | Token usage per API call | `input_tokens`, `output_tokens`, `cache_creation_input_tokens`, `cache_read_input_tokens` |
48→| `thinking` | Extended thinking block | `thinking`, `signature` |
49→| `tool_call` | Tool invocation summary | `name`, `input`, `summary`, `is_error` |
50→| `pin_state` | Absolute pin state after pin/unpin | `action`, `count`, `keys` |
51→| `output` | Validated output value | `content` |
52→
53→Example events:
54→
55→```json
56→{"kind":"config","model":"claude-haiku-4-5-20251001","provider":"anthropic","max_tokens":8192}
57→{"kind":"usage","input_tokens":150,"output_tokens":42}
58→{"kind":"thinking","thinking":"Let me consider...","signature":"abc123"}
59→{"kind":"tool_call","name":"doc_read","input":{"path":"notes.md"},"summary":"ok","is_error":false}
60→{"kind":"pin_state","action":"pin","count":2,"keys":["doc_read({\"path\":\"notes.md\"})","search({\"query\":\"cats\"})"]}
61→{"kind":"output","content":{"greeting":"hello"}}
62→```
63→
64→### The `type` → `kind` rewrite
65→
66→The LLM API returns thinking blocks with a `"type": "thinking"` field.
67→Since `type` is a reserved keyword in the plumbing language (used for
68→type declarations), it cannot appear in filter expressions like
69→`filter(type = "thinking")`. The agent binary rewrites `"type"` to
70→`"kind"` on all thinking blocks before emitting them as telemetry, so
71→filters and observers can reference the field:
72→
73→```
74→agent.telemetry ; filter(kind = "thinking") ; observer
75→```
76→
77→All other telemetry events (`usage`, `config`, `tool_call`, `output`)
78→are constructed by the agent binary and use `"kind"` natively — no
79→rewriting needed.
80→
81→### Wiring telemetry
82→
83→In the plumbing language, telemetry maps to the agent's `.telemetry`
84→port. By default this port is implicitly discarded (`; discard`). To
85→observe an agent's reasoning, wire the telemetry stream to an
86→observer:
87→
88→```
89→composer.telemetry ; angel ; mux(source: angel) ; composer
90→```
91→
92→Example config with thinking:
93→
94→```
95→summarizer : Article → Summary = agent {
96→ model: "claude-sonnet-4-5-20250514"
97→ prompt: "Summarize the article."
98→ max_tokens: 16384
99→ thinking_budget: 10000
100→}
101→```
102→
103→Thinking blocks are preserved in conversation history for multi-turn
104→consistency. Each thinking block includes a `signature` field required
105→by the API for multi-turn pass-through.
106→
107→## stderr
108→
109→Stderr (fd 2) carries debug logs and errors, distinguished by format:
110→
111→- **Debug logs**: `{"log": "debug", "event": "...", ...}` — conversation messages, API metadata. Only emitted when `PIPELINE_DEBUG=1`.
112→- **Errors**: `{"error": "...", "code": "...", ...}` — structured error output. Always emitted.
113→
114→Thinking blocks go to the telemetry port, not stderr. Debug logs and
115→errors go to stderr.
116→
117→## Control channel
118→
119→Agents with product input types receive a control stream on a second input channel:
120→
121→```
122→type Control = { set_temp: float } | { set_model: string } | { stop: bool }
123→
124→let composer : (!Draft, !Control) → !Verdict = agent {
125→ prompt: "Review the draft."
126→}
127→```
128→
129→The control stream arrives as `__port: "ctrl_in"` envelopes on stdin. A background thread reads from the internal ctrl pipe (fed by the stdin demux) and updates the agent's mutable state under a mutex. The agent writes responses (memory dumps, acknowledgements) as `__port: "ctrl_out"` envelopes on stdout.
130→
131→| Field | Type | Effect |
132→|-------|------|--------|
133→| `set_temp` | float \| null | Set or clear temperature override |
134→| `set_model` | string \| null | Set or clear model override |
135→| `stop` | bool | Signal clean exit after current message |
136→| `pause` | bool | Stop reading data from stdin; control and telemetry stay live |
137→| `resume` | bool | Resume reading data from stdin |
138→| `get_memory` | bool | Emit conversation history on ctrl_out (kind: `memory`) |
139→| `set_memory` | array | Replace conversation history; emit ack on ctrl_out (kind: `memory_set`) |
140→
141→Sending `null` clears an override, reverting to the spec default:
142→
143→```json
144→{"set_temp": 0.9}
145→{"set_temp": null}
146→{"set_model": "claude-opus-4-20250514"}
147→{"set_model": null}
148→{"stop": true}
149→{"pause": true}
150→{"get_memory": true}
151→{"set_memory": [{"role":"user","content":"\"hello\""},{"role":"assistant","content":"\"world\""}]}
152→{"resume": true}
153→```
154→
155→Unknown fields are silently ignored (forward compatibility).
156→
157→### Control output (fd 7)
158→
159→The agent writes JSON Lines to fd 7 in response to memory operations:
160→
161→**get_memory response:**
162→```json
163→{"kind":"memory","messages":[{"role":"user","content":"\"hello\""},{"role":"assistant","content":"{\"greeting\":\"hi\"}"}],"pinned":[]}
164→```
165→
166→The `pinned` array contains tool results the agent has marked for
167→preservation across compaction cycles. Each entry has `tool`,
168→`arguments`, and `content` fields. See [Document pinning](#document-pinning).
169→
170→**set_memory acknowledgement:**
171→```json
172→{"kind":"memory_set","old_messages":42,"new_messages":2}
173→```
174→
175→**pause acknowledgement:**
176→```json
177→{"kind":"pause_ack"}
178→```
179→
180→**resume acknowledgement:**
181→```json
182→{"kind":"resume_ack"}
183→```
184→
185→**Note**: `pause_ack` and `resume_ack` are emitted on fd 7 (ctrl_out), which flows through the session tagger if a `<->` wire is active. The tagger wraps all ctrl_out messages in `{__step: N, payload: msg}` envelopes. For linear protocols (terminated by `end`), the protocol structure provides one-shot trigger semantics — no additional gating is needed.
186→
187→### When changes take effect
188→
189→Model and temperature changes take effect on the **next API call** — not just the next input message. The conversation loop reads the current values via closures on every iteration, including retries and tool-use rounds within a single input. No HTTP connection restart is needed: `model` and `temperature` are per-request body parameters, not connection-level config.
190→
191→The `stop` signal is checked between input messages. It does not interrupt a running API call.
192→
193→### Pause and memory protocol
194→
195→When paused, the agent stops reading from stdin but continues handling control messages. Memory requests (`get_memory`, `set_memory`) are processed at message boundaries — between input lines when not paused, or immediately when paused.
196→
197→The `set_memory` operation replaces conversation history. The system prompt is never affected. If any message in the `set_memory` array lacks `role` or `content`, the entire payload is rejected with a warning.
198→
199→When pins are active, `set_memory` automatically prepends a user message containing all pinned tool results in `<pinned-documents>` blocks. This happens transparently — the compactor does not need to preserve pinned content in its output.
200→
201→**Protocol constraint**: `memory_request` is a single slot. The controller must wait for a response on fd 7 before sending the next memory request. If two requests arrive before the main loop handles either, the first is silently overwritten.
202→
203→## System prompt construction
204→
205→The system prompt is assembled from the agent's config:
206→
207→1. **Inline prompt** (`prompt: "..."`) — added first as a plain text block.
208→2. **Prompt files** (`prompts: ["system.md", "./role.md"]`) — each file is read at startup and wrapped in document tags:
209→
210→ ```
211→ <doc id="system.md">
212→ file contents here
213→ </doc>
214→ ```
215→
216→ File paths are resolved by the executor before the agent starts:
217→
218→ - **Bare name** (no `/`) — searched via the resource path
219→ (`PLUMB_RESOURCES` + installed resource directories).
220→ Use for shared files: `"system.md"`, `"grammar.peg"`.
221→ - **Path** (contains `/`) — resolved relative to the `.plumb`
222→ file that defines the agent. Use for local files:
223→ `"./composer.md"`, `"./heat.plumb"`.
224→
225→3. **Output instruction** — appended automatically. Describes the expected JSON output type and instructs the model to respond with only the JSON value.
226→
227→Each part becomes a separate system block. The output instruction has `cache_break: true` for prompt caching efficiency.
228→
229→## Streaming
230→
231→API responses are received via Server-Sent Events (SSE). This prevents read timeouts during extended thinking and avoids buffering large responses in memory. Streaming is an internal transport detail — the process interface remains synchronous JSON Lines on stdin/stdout.
232→
233→## Document pinning
234→
235→Agents can mark tool results as important so they survive context
236→compaction. The agent calls the synthetic `pin` tool with the name and
237→arguments of a previous tool call; the result is stored in a session-local
238→pin set. On the next compaction cycle, pinned content is excluded from
239→the compressible conversation, provided to the compactor as read-only
240→reference, and reinjected at the top of the post-compaction history.
241→
242→### Pin and unpin tools
243→
244→The agent binary injects two synthetic tools into every agent's tool
245→list:
246→
247→- **`pin`** `{tool: string, arguments: object}` — pin the result of a
248→ previous tool call. If the pin limit (20) is reached, the oldest pin
249→ is evicted (FIFO). Idempotent — pinning an already-pinned call is a
250→ no-op.
251→
252→- **`unpin`** `{tool: string, arguments: object}` — remove a pin,
253→ allowing the result to be compacted normally. Idempotent.
254→
255→### Canonical keys
256→
257→Pins are identified by a canonical form of `(tool_name,
258→sorted_arguments_json)`. Object keys are recursively sorted;
259→integer-valued floats are normalised to integers. This means
260→`{"b":1,"a":2}` and `{"a":2,"b":1}` produce the same pin key.
261→
262→### Cache-hit interception
263→
264→When the agent calls a tool whose canonical key matches an active pin,
265→the request is intercepted. Instead of dispatching to the tool, the
266→agent returns a note telling the LLM where to find the pinned content:
267→
268→```json
269→{"pinned":true,"note":"This tool result is already pinned in your context. Look for <pinned key=\"fetch({\\\"path\\\":\\\"foo.md\\\"})\"> in the pinned-documents block at the top of your conversation."}
270→```
271→
272→### Reinjection format
273→
274→After compaction, pinned content is prepended to the new history as a
275→single user message:
276→
277→```xml
278→<pinned-documents>
279→The following tool results are pinned in your context. They survive
280→compaction and are always available. Do not re-fetch them.
281→
282→<pinned key="fetch({"path":"foo.md"})">
283→file contents here
284→</pinned>
285→</pinned-documents>
286→```
287→
288→Each `<pinned>` element carries a `key` attribute — the canonical tool
289→call form — so the agent can cross-reference it when told a result is
290→already in context.
291→
292→### Persistence
293→
294→Pins are session-local. They survive compaction but not process
295→restart.
296→
297→## Runtime context
298→
299→When `runtime_context: true` is set in the agent config, the agent
300→appends a dynamic system block before each API call:
301→
302→```
303→[Runtime context]
304→Time: 2026-03-08T14:23:17Z
305→Session input tokens: 82341
306→Session output tokens: 12045
307→API calls: 23
308→Max output tokens per turn: 8192
309→History messages: 47
310→Pinned documents: 3
311→```
312→
313→This is not cached (no `cache_break`) — it changes on every call.
314→Token counts are cumulative for the session. The agent uses this to
315→plan: how much context budget remains, whether to be concise, when to
316→summarise.
317→
318→## Context window
319→
320→Unbounded accumulation will eventually exceed the model's context window. The [compaction protocol](protocols.md) provides automated context management via pause/get_memory/set_memory/resume. Document pinning ensures critical tool results survive compaction. Runtime context injection gives the agent visibility into its own resource consumption.
321→ 1→(** Routing table — compiler output for the ZMQ fabric.
2→
3→ Two-phase compilation:
4→ 1. The compiler walks the plumb body's spawn list and emits
5→ intermediate routes that reference channel names as nodes.
6→ 2. [resolve_channels] eliminates channel intermediaries by
7→ cross-joining writer and reader routes into direct
8→ process-to-process routes. This is the operational realisation
9→ of ν-restriction in composition (f ; g = (νc)(f(in,c) | g(c,out))).
10→
11→ The fabric reads the resolved routing table to create ZMQ sockets.
12→
13→ Design decisions D1 (types in lib/runtime) and D10 (routing_table
14→ terminology). See PLMB-037. *)
15→
16→open Language
17→
18→(** Channel transport pattern, determined statically by the compiler. *)
19→type channel_kind =
20→ | Point_to_point
21→ (** PUSH/PULL — reliable, blocking backpressure. Default for all
22→ algebraic channels except copy fan-out. *)
23→ | Fan_out
24→ (** PUB/SUB — copy morphism only. HWM set to unlimited. *)
25→ | Bidirectional
26→ (** PAIR — control channels between supervisor and inline morphisms. *)
27→
28→(** Whether a process runs inline (Eio fibre) or as a subprocess. *)
29→type process_kind =
30→ | Inline
31→ (** Structural morphisms: id, copy, merge, barrier, map, filter, etc.
32→ Run as Eio fibres within the fabric. *)
33→ | Subprocess
34→ (** Agents and nested plumb pipelines. Fork/exec'd by the supervisor. *)
35→
36→(** A route: one directional connection between two processes. *)
37→type route = {
38→ src : string; (** source process name *)
39→ src_port : string; (** source port name *)
40→ dst : string; (** destination process name *)
41→ dst_port : string; (** destination port name *)
42→ kind : channel_kind; (** transport pattern *)
43→ topic : string; (** provenance tag for PUB/SUB filtering *)
44→ via : string option; (** original channel name, if resolved *)
45→}
46→
47→(** A process entry in the routing table. *)
48→type process_entry = {
49→ proc_name : string;
50→ proc_kind : process_kind;
51→ proc_impl : process_impl;
52→ proc_binding : binding;
53→}
54→
55→(** The routing table: compiler output consumed by the fabric.
56→
57→ Invariant: all process names are unique within a table.
58→ The compiler guarantees this via per-binding instance numbering
59→ (e.g. [copy0], [copy1], [merge0]). *)
60→type routing_table = {
61→ processes : process_entry list;
62→ (** All processes, in dependency order (leaves first). *)
63→ routes : route list;
64→ (** All wiring instructions. *)
65→ seeded_channels : string list;
66→ (** Channels pre-loaded with a null token (Petri net initial markings). *)
67→}
68→
69→(** Classify a process implementation as inline or subprocess. *)
70→let process_kind_of_impl = function
71→ | Agent _ | Plumb _ -> Subprocess
72→ | _ -> Inline
73→
74→(** Determine the channel kind for a (src, src_port) pair.
75→ If the same source port appears in multiple routes, it's Fan_out
76→ (copy morphism). Control channels are Bidirectional. Otherwise
77→ Point_to_point. *)
78→let classify_channel_kinds routes =
79→ (* Count how many routes share each (src, src_port) pair *)
80→ let tbl = Hashtbl.create 16 in
81→ List.iter (fun r ->
82→ let key = (r.src, r.src_port) in
83→ let n = try Hashtbl.find tbl key with Not_found -> 0 in
84→ Hashtbl.replace tbl key (n + 1)
85→ ) routes;
86→ List.map (fun r ->
87→ if r.kind = Bidirectional then r
88→ else
89→ let key = (r.src, r.src_port) in
90→ let n = try Hashtbl.find tbl key with Not_found -> 1 in
91→ if n > 1 then { r with kind = Fan_out }
92→ else r
93→ ) routes
94→
95→(** Look up the per-message type for a channel by name.
96→ Checks boundary ports first, then declared channels. *)
97→let find_channel_type ~ports ~boundary_in_types ~boundary_out_types
98→ ~n_in ~pb ch_name =
99→ match List.find_index (fun p -> p = ch_name) ports with
100→ | Some i when i < n_in -> Some (List.nth boundary_in_types i)
101→ | Some i -> Some (List.nth boundary_out_types (i - n_in))
102→ | None ->
103→ match List.find_opt (fun (ch : channel_decl) ->
104→ ch.ch_name = ch_name) pb.pb_channels with
105→ | Some ch -> Some ch.ch_type
106→ | None -> None
107→
108→(** Resolve channel intermediaries into direct process-to-process routes.
109→
110→ Channels are bound names (ν-restriction). In the intermediate routing
111→ table, they appear as route endpoints — but they are not processes and
112→ have no state. This pass eliminates them by cross-joining writers and
113→ readers through each channel, producing direct routes.
114→
115→ After resolution, no declared channel name appears as a route src or dst.
116→ The original channel name is preserved in the [via] field for debugging. *)
117→let resolve_channels ~processes ~boundary_ports routes ~channels =
118→ let channel_names =
119→ List.map (fun (ch : Language.channel_decl) -> ch.ch_name) channels in
120→ let is_channel name =
121→ List.mem name channel_names in
122→ let is_process name =
123→ List.exists (fun (pe : process_entry) -> pe.proc_name = name) processes in
124→ let is_boundary name =
125→ List.mem name boundary_ports in
126→ let is_real_node name =
127→ is_process name || is_boundary name in
128→
129→ (* Partition routes *)
130→ let direct = ref [] in
131→ let writers = Hashtbl.create 8 in (* channel → routes writing to it *)
132→ let readers = Hashtbl.create 8 in (* channel → routes reading from it *)
133→
134→ List.iter (fun r ->
135→ let dst_is_ch = is_channel r.dst in
136→ let src_is_ch = is_channel r.src in
137→ match dst_is_ch, src_is_ch with
138→ | false, false ->
139→ (* Both endpoints are real nodes — keep as-is *)
140→ direct := r :: !direct
141→ | true, false ->
142→ (* Process writes to channel *)
143→ let ch = r.dst in
144→ let existing = try Hashtbl.find writers ch with Not_found -> [] in
145→ Hashtbl.replace writers ch (r :: existing)
146→ | false, true ->
147→ (* Process reads from channel *)
148→ let ch = r.src in
149→ let existing = try Hashtbl.find readers ch with Not_found -> [] in
150→ Hashtbl.replace readers ch (r :: existing)
151→ | true, true ->
152→ (* Channel-to-channel: should not happen *)
153→ failwith (Printf.sprintf
154→ "resolve_channels: channel-to-channel route %s → %s" r.src r.dst)
155→ ) routes;
156→
157→ (* Cross-join writers and readers for each channel *)
158→ let resolved = ref (List.rev !direct) in
159→ List.iter (fun ch_name ->
160→ let ch_writers = try Hashtbl.find writers ch_name with Not_found -> [] in
161→ let ch_readers = try Hashtbl.find readers ch_name with Not_found -> [] in
162→ let n_writers = List.length ch_writers in
163→ let n_readers = List.length ch_readers in
164→
165→ match n_writers, n_readers with
166→ | 0, 0 -> () (* unused channel *)
167→ | _, 0 ->
168→ failwith (Printf.sprintf
169→ "channel %s has %d writer(s) but no readers — \
170→ if the output is intentionally unused, add an explicit discard"
171→ ch_name n_writers)
172→ | 0, _ ->
173→ failwith (Printf.sprintf
174→ "channel %s has readers but no writer — \
175→ add 'spawn empty(%s)' or check your wiring"
176→ ch_name ch_name)
177→ | _ when n_writers > 1 && n_readers > 1 ->
178→ failwith (Printf.sprintf
179→ "resolve_channels: channel %s has %d writers and %d readers \
180→ — no categorical meaning" ch_name n_writers n_readers)
181→ | _ ->
182→ List.iter (fun w ->
183→ List.iter (fun rd ->
184→ resolved := {
185→ src = w.src;
186→ src_port = w.src_port;
187→ dst = rd.dst;
188→ dst_port = rd.dst_port;
189→ kind = Point_to_point;
190→ topic = rd.topic;
191→ via = Some ch_name;
192→ } :: !resolved
193→ ) ch_readers
194→ ) ch_writers
195→ ) channel_names;
196→
197→ (* Validate: no channel names remain as src or dst *)
198→ List.iter (fun r ->
199→ if is_channel r.src && is_real_node r.dst then
200→ failwith (Printf.sprintf
201→ "resolve_channels: channel %s still appears as route src" r.src);
202→ if is_channel r.dst && is_real_node r.src then
203→ failwith (Printf.sprintf
204→ "resolve_channels: channel %s still appears as route dst" r.dst)
205→ ) !resolved;
206→
207→ List.rev !resolved
208→
209→let compile ~prog ~ports ~pb ~type_env ~main_binding ~seeded =
210→ let boundary_in_types = Validate.port_element_types type_env main_binding.input in
211→ let boundary_out_types = Validate.port_element_types type_env main_binding.output in
212→ let n_in = List.length boundary_in_types in
213→
214→ let ch_type name =
215→ match find_channel_type ~ports ~boundary_in_types ~boundary_out_types
216→ ~n_in ~pb name with
217→ | Some t -> t
218→ | None -> Unit
219→ in
220→
221→ (* Type-check a spawn statement's channel types against binding
222→ expectations. Extracted from the main iteration loop for clarity:
223→ this is validation, not routing. Raises [Failure] on mismatch —
224→ caught by main.ml as exit 2. *)
225→ let check_spawn_types ~sp ~binding ~channels =
226→ let check_ch idx expected label =
227→ let ch_t = ch_type (List.nth channels idx) in
228→ if not (Validate.type_eq type_env ch_t expected) then
229→ failwith (Printf.sprintf "spawn %s: %s type is %s, expected %s"
230→ sp.sp_process label
231→ (Pp.pp_type_expr type_env ch_t)
232→ (Pp.pp_type_expr type_env expected))
233→ in
234→ let n_ch = List.length channels in
235→ (try match binding.impl with
236→ | Id ->
237→ let inp = ch_type (List.nth channels 0) in
238→ let outp = ch_type (List.nth channels 1) in
239→ if not (Validate.type_eq type_env inp outp) then
240→ failwith (Printf.sprintf "spawn id: input type %s != output type %s"
241→ (Pp.pp_type_expr type_env inp)
242→ (Pp.pp_type_expr type_env outp))
243→ | Copy ->
244→ (* Validate binding output type is a pair *)
245→ let out_t = Validate.resolve type_env
246→ (Validate.unwrap_stream binding.output) in
247→ (match out_t with
248→ | Product [_; _] -> ()
249→ | Product ts ->
250→ failwith (Printf.sprintf
251→ "copy: output product must have exactly 2 elements, got %d"
252→ (List.length ts))
253→ | _ -> ());
254→ let inp = ch_type (List.nth channels 0) in
255→ check_ch 1 inp "output0";
256→ check_ch 2 inp "output1"
257→ | Merge ->
258→ let in0_t = ch_type (List.nth channels 0) in
259→ let in1_t = ch_type (List.nth channels 1) in
260→ let out_t = ch_type (List.nth channels 2) in
261→ let expected_out =
262→ if Validate.type_eq type_env in0_t in1_t then in0_t
263→ else Sum [in0_t; in1_t]
264→ in
265→ if not (Validate.type_eq type_env out_t expected_out) then
266→ failwith (Printf.sprintf
267→ "spawn merge: output type %s, expected %s"
268→ (Pp.pp_type_expr type_env out_t)
269→ (Pp.pp_type_expr type_env expected_out))
270→ | Barrier ->
271→ let n = n_ch - 1 in
272→ let in_types = List.init n (fun i -> ch_type (List.nth channels i)) in
273→ let expected_out = Product in_types in
274→ check_ch n expected_out "output"
275→ | Discard ->
276→ if n_ch >= 2 then check_ch 1 Unit "output"
277→ | Project n_proj ->
278→ let inp = ch_type (List.nth channels 0) in
279→ (match Validate.resolve type_env inp with
280→ | Product ts ->
281→ if n_proj >= List.length ts then
282→ failwith (Printf.sprintf
283→ "spawn project(%d): index out of bounds (product has %d elements)"
284→ n_proj (List.length ts));
285→ check_ch 1 (List.nth ts n_proj) "output"
286→ | _ ->
287→ failwith (Printf.sprintf
288→ "spawn project(%d): input type %s is not a product"
289→ n_proj (Pp.pp_type_expr type_env inp)))
290→ | FormatJson ->
291→ check_ch 0 (Primitive Json) "input";
292→ check_ch 1 (Primitive String) "output"
293→ | ParseJson ->
294→ check_ch 0 (Primitive String) "input";
295→ check_ch 1 (Primitive Json) "output"
296→ | Agent _ ->
297→ (* Agent type validation: check channel types against binding ports.
298→ Named args determine which ports are present. *)
299→ let has_labeled lbl = List.exists (function
300→ | Labeled (l, _) -> l = lbl | _ -> false) sp.sp_args in
301→ let binding_in_t = Validate.resolve type_env binding.input in
302→ let binding_out_t = Validate.resolve type_env binding.output in
303→ (* Check non-product agent doesn't get extra ports *)
304→ let has_control = match binding_in_t with
305→ | Product [Stream _; Stream _] -> true | _ -> false in
306→ let has_telemetry = match binding_out_t with
307→ | Product [Stream _; Stream _] -> true | _ -> false in
308→ if has_labeled "telemetry" && not has_telemetry then
309→ failwith (Printf.sprintf
310→ "spawn %s: agent output is not a product, cannot have telemetry port"
311→ sp.sp_process);
312→ if has_labeled "ctrl_in" && not has_control then
313→ failwith (Printf.sprintf
314→ "spawn %s: agent input is not a product, cannot have ctrl_in port"
315→ sp.sp_process);
316→ (* Check ctrl_in type *)
317→ if has_labeled "ctrl_in" && has_control then begin
318→ let ctrl_ch = List.find_map (function
319→ | Labeled ("ctrl_in", ch) -> Some ch | _ -> None) sp.sp_args in
320→ match ctrl_ch with
321→ | Some ch ->
322→ let ctrl_t = match binding_in_t with
323→ | Product [_; Stream t] -> t
324→ | _ -> Unit in
325→ let ch_t = ch_type ch in
326→ if not (Validate.type_eq type_env ch_t ctrl_t) then
327→ failwith (Printf.sprintf
328→ "spawn %s: ctrl_in channel type %s, expected %s"
329→ sp.sp_process
330→ (Pp.pp_type_expr type_env ch_t)
331→ (Pp.pp_type_expr type_env ctrl_t))
332→ | None -> ()
333→ end;
334→ (* Check telemetry type *)
335→ if has_labeled "telemetry" && has_telemetry then begin
336→ let telem_ch = List.find_map (function
337→ | Labeled ("telemetry", ch) -> Some ch | _ -> None) sp.sp_args in
338→ match telem_ch with
339→ | Some ch ->
340→ let telem_t = match binding_out_t with
341→ | Product [_; Stream t] -> t
342→ | _ -> Unit in
343→ let ch_t = ch_type ch in
344→ if not (Validate.type_eq type_env ch_t telem_t) then
345→ failwith (Printf.sprintf
346→ "spawn %s: telemetry channel type %s, expected %s"
347→ sp.sp_process
348→ (Pp.pp_type_expr type_env ch_t)
349→ (Pp.pp_type_expr type_env telem_t))
350→ | None -> ()
351→ end
352→ | Empty ->
353→ (* Single channel (output); type inferred from channel decl *)
354→ ()
355→ | MergeAny | Tagger _ | Log _ ->
356→ (* Compiler-generated morphisms — well-typed by construction *)
357→ ()
358→ | Map _ | Filter _ ->
359→ (* Type consistency checked by validator at bind time *)
360→ ()
361→ | Tool _ | Lower _ | Builtin _ ->
362→ (* Configured externally; types from binding signature *)
363→ ()
364→ | Plumb _ ->
365→ (* Nested plumb: channel types checked when sub-pipeline compiles *)
366→ ()
367→ with Failure msg when msg = "nth" || msg = "List.nth" ->
368→ failwith (Printf.sprintf "spawn %s: wrong number of channels \
369→ (got %d)" sp.sp_process n_ch))
370→ in
371→
372→ let find_binding name =
373→ match List.find_opt (fun (b : binding) -> b.bd_name = name) pb.pb_bindings with
374→ | Some b -> Some b
375→ | None ->
376→ List.find_opt (fun (b : binding) -> b.bd_name = name) prog.prog_bindings
377→ in
378→
379→ (* Synthesise a binding for a direct structural morphism spawn
380→ with proper types inferred from channel declarations. *)
381→ let synthesise_structural sp_process channels =
382→ let impl = match sp_process with
383→ | "id" -> Id | "copy" -> Copy | "merge" -> Merge
384→ | "discard" -> Discard | "empty" -> Empty | "barrier" -> Barrier
385→ | "_format_json" -> FormatJson | "_parse_json" -> ParseJson
386→ | name -> failwith (Printf.sprintf "unknown structural morphism: %s" name)
387→ in
388→ match impl with
389→ | Id ->
390→ let t = ch_type (List.nth channels 0) in
391→ { bd_name = sp_process; bd_attrs = [];
392→ input = Stream t; output = Stream t; impl }
393→ | Copy ->
394→ let t = ch_type (List.nth channels 0) in
395→ { bd_name = sp_process; bd_attrs = [];
396→ input = Stream t;
397→ output = Stream (Product [t; t]); impl }
398→ | Merge ->
399→ let t0 = ch_type (List.nth channels 0) in
400→ let t1 = ch_type (List.nth channels 1) in
401→ let out_t = if Validate.type_eq type_env t0 t1 then t0 else Sum [t0; t1] in
402→ { bd_name = sp_process; bd_attrs = [];
403→ input = Stream (Product [t0; t1]);
404→ output = Stream out_t; impl }
405→ | Barrier ->
406→ let n = List.length channels - 1 in
407→ let in_types = List.init n (fun i -> ch_type (List.nth channels i)) in
408→ { bd_name = sp_process; bd_attrs = [];
409→ input = Stream (Product in_types);
410→ output = Stream (Product in_types); impl }
411→ | Discard ->
412→ let t = ch_type (List.nth channels 0) in
413→ { bd_name = sp_process; bd_attrs = [];
414→ input = Stream t; output = Stream Unit; impl }
415→ | Empty ->
416→ let t = ch_type (List.nth channels 0) in
417→ { bd_name = sp_process; bd_attrs = [];
418→ input = Stream Unit; output = Stream t; impl }
419→ | FormatJson ->
420→ { bd_name = sp_process; bd_attrs = [];
421→ input = Stream (Primitive Json);
422→ output = Stream (Primitive String); impl }
423→ | ParseJson ->
424→ { bd_name = sp_process; bd_attrs = [];
425→ input = Stream (Primitive String);
426→ output = Stream (Primitive Json); impl }
427→ | _ ->
428→ { bd_name = sp_process; bd_attrs = [];
429→ input = Unit; output = Unit; impl }
430→ in
431→
432→ let processes = ref [] in
433→ let routes = ref [] in
434→
435→ (* Per-binding-name counter for unique instance names.
436→ Every spawn gets a numbered name: copy0, copy1, merge0, etc. *)
437→ let name_counters = Hashtbl.create 8 in
438→ let fresh_proc_name base =
439→ let n = try Hashtbl.find name_counters base with Not_found -> 0 in
440→ Hashtbl.replace name_counters base (n + 1);
441→ Printf.sprintf "%s%d" base n
442→ in
443→
444→ List.iter (fun (sp : spawn_stmt) ->
445→ (* For named args, validate port names and resolve to positional
446→ order before extracting channels. For positional args,
447→ pass through. *)
448→ let has_named = List.exists (function Labeled _ -> true | _ -> false)
449→ sp.sp_args in
450→ let raw_channels = List.map (function
451→ | Positional ch -> ch
452→ | Labeled (_, ch) -> ch
453→ ) sp.sp_args in
454→
455→ let binding = match find_binding sp.sp_process with
456→ | Some b -> b
457→ | None -> synthesise_structural sp.sp_process raw_channels
458→ in
459→
460→ (* Validate named arg port names and resolve to positional order. *)
461→ let channels =
462→ if has_named then begin
463→ let valid_ports = match binding.impl with
464→ | Empty -> [| "output" |]
465→ | Id | Map _ | Filter _ | Tagger _ | FormatJson | ParseJson
466→ | Log _ | Project _ ->
467→ [| "input"; "output" |]
468→ | Copy -> [| "input"; "out0"; "out1" |]
469→ | Merge | MergeAny -> [| "in0"; "in1"; "output" |]
470→ | Discard -> [| "input"; "output" |]
471→ | Barrier ->
472→ let n = List.length raw_channels - 1 in
473→ Array.init (n + 1) (fun i ->
474→ if i < n then Printf.sprintf "in%d" i else "output")
475→ | Agent _ ->
476→ let has_control = match Validate.resolve type_env binding.input with
477→ | Product [Stream _; Stream _] -> true | _ -> false in
478→ let has_telemetry = match Validate.resolve type_env binding.output with
479→ | Product [Stream _; Stream _] -> true | _ -> false in
480→ (match has_control, has_telemetry with
481→ | false, false -> [| "input"; "output" |]
482→ | false, true -> [| "input"; "output"; "telemetry" |]
483→ | true, false -> [| "input"; "ctrl_in"; "output"; "ctrl_out" |]
484→ | true, true -> [| "input"; "ctrl_in"; "output"; "ctrl_out"; "telemetry" |])
485→ | Plumb (ports, _) -> Array.of_list ports
486→ | Tool _ | Lower _ | Builtin _ -> [||]
487→ in
488→ let named = List.filter_map (function
489→ | Labeled (p, ch) -> Some (p, ch) | _ -> None) sp.sp_args in
490→ List.iter (fun (p, _) ->
491→ if not (Array.exists (fun n -> n = p) valid_ports) then
492→ failwith (Printf.sprintf "spawn %s: unknown port '%s'"
493→ sp.sp_process p)
494→ ) named;
495→ (* Reorder to positional based on port names *)
496→ Array.to_list valid_ports |> List.map (fun port_name ->
497→ match List.assoc_opt port_name named with
498→ | Some ch -> ch
499→ | None ->
500→ failwith (Printf.sprintf
501→ "spawn %s: missing required port '%s'"
502→ sp.sp_process port_name))
503→ end else
504→ raw_channels
505→ in
506→
507→ check_spawn_types ~sp ~binding ~channels;
508→
509→ let proc_name = fresh_proc_name sp.sp_process in
510→ let binding = { binding with bd_name = proc_name } in
511→ let kind = process_kind_of_impl binding.impl in
512→
513→ processes := {
514→ proc_name;
515→ proc_kind = kind;
516→ proc_impl = binding.impl;
517→ proc_binding = binding;
518→ } :: !processes;
519→
520→ (* Determine port layout from the implementation kind.
521→ Each channel argument maps to a port. Convention:
522→ - Input ports: read from the named channel
523→ - Output ports: write to the named channel
524→
525→ A route is created for each channel connecting it to the
526→ process. The src/dst naming follows the data flow direction:
527→ the channel's writer is the src, the reader is the dst. *)
528→ let add_route ~src ~src_port ~dst ~dst_port ~channel_name =
529→ routes := {
530→ src; src_port; dst; dst_port;
531→ kind = Point_to_point;
532→ topic = channel_name;
533→ via = None;
534→ } :: !routes
535→ in
536→
537→ (* For inline morphisms, the process reads from input channels and
538→ writes to output channels. We model this as:
539→ - Input channel: route from channel → process (process reads)
540→ - Output channel: route from process → channel (process writes)
541→
542→ Since channels are named entities shared between processes, the
543→ route connects the producing process's output to the consuming
544→ process's input via the channel name. *)
545→ match binding.impl with
546→ | Id | FormatJson | ParseJson | Map _ | Filter _ | Tagger _ ->
547→ (* 2 channels: [input, output] *)
548→ let ch_in = List.nth channels 0 in
549→ let ch_out = List.nth channels 1 in
550→ add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
551→ ~channel_name:ch_in;
552→ add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
553→ ~channel_name:ch_out
554→
555→ | Discard ->
556→ (* 2 channels: [input, output(unit)] *)
557→ let ch_in = List.nth channels 0 in
558→ add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
559→ ~channel_name:ch_in
560→
561→ | Empty ->
562→ (* 1 channel: [output] *)
563→ let ch_out = List.nth channels 0 in
564→ add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
565→ ~channel_name:ch_out
566→
567→ | Log _ ->
568→ (* 1 channel: [input] *)
569→ let ch_in = List.nth channels 0 in
570→ add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
571→ ~channel_name:ch_in
572→
573→ | Copy ->
574→ (* 3 channels: [input, output0, output1] *)
575→ let ch_in = List.nth channels 0 in
576→ let ch_out0 = List.nth channels 1 in
577→ let ch_out1 = List.nth channels 2 in
578→ add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
579→ ~channel_name:ch_in;
580→ add_route ~src:proc_name ~src_port:"out0" ~dst:ch_out0 ~dst_port:"in"
581→ ~channel_name:ch_out0;
582→ add_route ~src:proc_name ~src_port:"out1" ~dst:ch_out1 ~dst_port:"in"
583→ ~channel_name:ch_out1
584→
585→ | Merge | MergeAny ->
586→ (* 3 channels: [input0, input1, output] *)
587→ let ch_in0 = List.nth channels 0 in
588→ let ch_in1 = List.nth channels 1 in
589→ let ch_out = List.nth channels 2 in
590→ add_route ~src:ch_in0 ~src_port:"out" ~dst:proc_name ~dst_port:"in0"
591→ ~channel_name:ch_in0;
592→ add_route ~src:ch_in1 ~src_port:"out" ~dst:proc_name ~dst_port:"in1"
593→ ~channel_name:ch_in1;
594→ add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
595→ ~channel_name:ch_out
596→
597→ | Barrier ->
598→ (* N+1 channels: [input0..inputN-1, output] *)
599→ let n = List.length channels - 1 in
600→ for i = 0 to n - 1 do
601→ let ch = List.nth channels i in
602→ add_route ~src:ch ~src_port:"out" ~dst:proc_name
603→ ~dst_port:(Printf.sprintf "in%d" i)
604→ ~channel_name:ch
605→ done;
606→ let ch_out = List.nth channels n in
607→ add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
608→ ~channel_name:ch_out
609→
610→ | Project n_proj ->
611→ (* 2 channels: [input, output] *)
612→ ignore n_proj;
613→ let ch_in = List.nth channels 0 in
614→ let ch_out = List.nth channels 1 in
615→ add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"in"
616→ ~channel_name:ch_in;
617→ add_route ~src:proc_name ~src_port:"out" ~dst:ch_out ~dst_port:"in"
618→ ~channel_name:ch_out
619→
620→ | Agent _ ->
621→ (* Variable channels: data_in, [ctrl_in], data_out, [ctrl_out], [telemetry] *)
622→ let n = List.length channels in
623→ (* Minimum: input, output (2 channels)
624→ With control: input, ctrl_in, output, ctrl_out (4)
625→ With telemetry: +1 *)
626→ if n >= 2 then begin
627→ let ch_in = List.nth channels 0 in
628→ add_route ~src:ch_in ~src_port:"out" ~dst:proc_name ~dst_port:"data_in"
629→ ~channel_name:ch_in;
630→ (* The output channel is at index 1 (no ctrl) or 2 (with ctrl) *)
631→ let out_idx = if n >= 4 then 2 else 1 in
632→ let ch_out = List.nth channels out_idx in
633→ add_route ~src:proc_name ~src_port:"data_out" ~dst:ch_out ~dst_port:"in"
634→ ~channel_name:ch_out;
635→ (* Control channels if present *)
636→ if n >= 4 then begin
637→ let ch_ctrl_in = List.nth channels 1 in
638→ let ch_ctrl_out = List.nth channels 3 in
639→ add_route ~src:ch_ctrl_in ~src_port:"out" ~dst:proc_name ~dst_port:"ctrl_in"
640→ ~channel_name:ch_ctrl_in;
641→ add_route ~src:proc_name ~src_port:"ctrl_out" ~dst:ch_ctrl_out ~dst_port:"in"
642→ ~channel_name:ch_ctrl_out
643→ end;
644→ (* Telemetry if present *)
645→ let telem_idx = if n >= 4 then 4 else 2 in
646→ if telem_idx < n then begin
647→ let ch_telem = List.nth channels telem_idx in
648→ add_route ~src:proc_name ~src_port:"telemetry" ~dst:ch_telem ~dst_port:"in"
649→ ~channel_name:ch_telem
650→ end
651→ end
652→
653→ | Plumb (plumb_ports, _) ->
654→ (* N port channels, matching the plumb body's port list *)
655→ List.iteri (fun i ch ->
656→ let port = List.nth plumb_ports i in
657→ (* Determine direction from port index relative to input/output *)
658→ add_route ~src:ch ~src_port:"out" ~dst:proc_name ~dst_port:port
659→ ~channel_name:ch
660→ ) channels
661→
662→ | Tool _ | Lower _ | Builtin _ ->
663→ (* These are not spawned directly — they're invoked via tool dispatch *)
664→ ()
665→ ) pb.pb_spawns;
666→
667→ (* Channel-node routes are intermediate — resolve them into direct
668→ process-to-process routes before the fabric sees them. *)
669→ let intermediate_routes = List.rev !routes in
670→ let resolved = resolve_channels
671→ ~processes:(List.rev !processes)
672→ ~boundary_ports:ports
673→ ~channels:pb.pb_channels
674→ intermediate_routes in
675→ let routes = classify_channel_kinds resolved in
676→ {
677→ processes = List.rev !processes;
678→ routes;
679→ seeded_channels = seeded;
680→ }
681→
682→(** Pretty-print a resolved routing table for debugging.
683→ Expects channel intermediaries to be resolved; prints the [via]
684→ field when present to show channel provenance. *)
685→let pp_routing_table rt =
686→ let buf = Buffer.create 256 in
687→ Buffer.add_string buf "Processes:\n";
688→ List.iter (fun pe ->
689→ let impl_str = match pe.proc_impl with
690→ | Id -> "id" | Copy -> "copy" | Merge -> "merge"
691→ | MergeAny -> "merge_any" | Discard -> "discard" | Empty -> "empty"
692→ | Barrier -> "barrier" | Map _ -> "map" | Filter _ -> "filter"
693→ | Tagger _ -> "tagger" | Log _ -> "log" | FormatJson -> "format_json"
694→ | ParseJson -> "parse_json" | Project n -> Printf.sprintf "project(%d)" n
695→ | Agent _ -> "agent" | Plumb _ -> "plumb" | Tool _ -> "tool"
696→ | Lower _ -> "lower" | Builtin _ -> "builtin"
697→ in
698→ Buffer.add_string buf (Printf.sprintf " %s (%s, %s)\n" pe.proc_name
699→ (match pe.proc_kind with Inline -> "inline" | Subprocess -> "subprocess")
700→ impl_str)
701→ ) rt.processes;
702→ Buffer.add_string buf "Routes:\n";
703→ List.iter (fun r ->
704→ let via_str = match r.via with
705→ | Some ch -> Printf.sprintf " via=%s" ch
706→ | None -> ""
707→ in
708→ Buffer.add_string buf (Printf.sprintf " %s:%s -> %s:%s [%s] topic=%s%s\n"
709→ r.src r.src_port r.dst r.dst_port
710→ (match r.kind with
711→ | Point_to_point -> "push/pull"
712→ | Fan_out -> "pub/sub"
713→ | Bidirectional -> "pair")
714→ r.topic via_str)
715→ ) rt.routes;
716→ if rt.seeded_channels <> [] then begin
717→ Buffer.add_string buf "Seeded: ";
718→ Buffer.add_string buf (String.concat ", " rt.seeded_channels);
719→ Buffer.add_char buf '\n'
720→ end;
721→ Buffer.contents buf
722→ 1→(tests
2→ (names test_parser test_validate test_plumb test_expr
3→ test_session_compile)
4→ (libraries language runtime alcotest yojson unix)
5→ (deps
6→ (glob_files fixtures/*.plumb)
7→ ../bin/plumb/main.exe))
8→
9→(test
10→ (name test_session_exec)
11→ (libraries language runtime plumb alcotest yojson unix threads.posix)
12→ (deps
13→ ../bin/plumb/main.exe))
14→
15→(test
16→ (name test_tool)
17→ (libraries language runtime plumb alcotest yojson unix str threads.posix)
18→ (deps
19→ (glob_files fixtures/*.plumb)
20→ ../bin/agent/main.exe))
21→
22→(test
23→ (name test_agent_config)
24→ (libraries alcotest unix str)
25→ (deps
26→ (glob_files fixtures/*.plumb)
27→ ../bin/agent/main.exe))
28→
29→(test
30→ (name test_agent_control)
31→ (libraries alcotest unix str yojson)
32→ (deps
33→ (glob_files fixtures/*.plumb)
34→ ../bin/agent/main.exe))
35→
36→(test
37→ (name test_agent_eliza)
38→ (libraries alcotest unix str yojson)
39→ (deps
40→ (glob_files fixtures/*.plumb)
41→ ../bin/agent/main.exe))
42→
43→(test
44→ (name test_pinning)
45→ (libraries agent client alcotest yojson str))
46→
47→(test
48→ (name test_docstore)
49→ (libraries docstore alcotest unix))
50→
51→(test
52→ (name test_render_group)
53→ (libraries render alcotest))
54→
55→(test
56→ (name test_error)
57→ (libraries runtime alcotest yojson))
58→
59→(test
60→ (name test_transport)
61→ (libraries transport alcotest eio_main unix))
62→
63→(test
64→ (name test_routing)
65→ (libraries language runtime alcotest))
66→
67→(test
68→ (name test_supervisor)
69→ (libraries supervisor alcotest unix))
70→
71→(test
72→ (name test_fabric)
73→ (libraries fabric transport language runtime spawn_lib alcotest eio_main eio.unix unix yojson threads.posix)
74→ (deps
75→ ../bin/agent/main.exe))
76→ 1→# Processes
2→
3→Processes are the executable units of the plumbing language. Each process is a native binary that reads a `.plumb` spec file, validates I/O against the declared types, and communicates via JSON Lines on stdin/stdout.
4→
5→See [tools.md](tools.md) for tool semantics and the dispatch protocol. See [agent-runtime.md](agent-runtime.md) for environment variables, control channel, and telemetry.
6→
7→## Stream transducers
8→
9→A process with type `A → B` consumes a stream of `A`-typed messages and produces a stream of `B`-typed messages. The type signature governs the per-message schema, not the inter-message dependency.
10→
11→An agent is a *stateful* stream transducer: the n-th output may depend on all prior inputs, not just the n-th. Conversation context accumulates across the input stream by default. This is essential for pipelines with review loops — a composer receiving a revision request must remember the original task.
12→
13→The structural processes (`id`, `copy`, `merge`, `discard`, `empty`) are stateless.
14→
15→## Algebraic structure
16→
17→The category is a **copy-discard (CD) category**, not a Markov category. Every object is equipped with a commutative comonoid structure (copy and discard) compatible with tensor products. Discard is not natural over our morphisms — agents are nondeterministic and filters are not total — so the stricter Markov structure does not hold.
18→
19→Merge is additional structure: a monoid (many to one) interacting with copy's comonoid (one to many). Whether this interaction forms a bialgebra or something weaker remains open.
20→
21→**Session types.** Protocol declarations (see [protocols.md](protocols.md)) add a specification layer for channel ordering constraints. The session type primitives correspond to linear logic connectives: `send T` to ⊗ (tensor), `recv T` to ⅋ (par), `select` to ⊕ (sum), `offer` to & (with), `end` to the monoidal unit. The duality of session types (send ↔ recv, select ↔ offer) maps to the duality between copy/filter (selection) and merge (being-selected-for) in the existing algebra. This situates the plumbing category within *-autonomous categories.
22→
23→**`!(A, B)` vs `(!A, !B)`**: `!(A, B)` is a stream of synchronised pairs (one channel, barrier output). `(!A, !B)` is a pair of independent streams on separate file descriptors (agent telemetry, control channels). The distinction is load-bearing for understanding barrier, project, and agent port decomposition.
24→
25→## Composition
26→
27→### Sequential composition (`;`)
28→
29→`f ; g` pipes the output stream of `f` into the input stream of `g`. Each process maintains independent internal state. Type-checked: the output type of `f` must match the input type of `g`. Associative: `(f ; g) ; h = f ; (g ; h)`. Unit: `f ; id = id ; f = f`.
30→
31→### Tensor product (`⊗`)
32→
33→`f ⊗ g` runs `f` and `g` on independent channels with independent state. No shared state, no interference. `(A → B) ⊗ (C → D) = A ⊗ C → B ⊗ D`. Associative, with unit `I` (the monoidal unit).
34→
35→## Primitives
36→
37→### id : A → A
38→
39→The identity morphism. Validates each input line against type `A` and writes it to stdout. Stateless — each line is independent. Total.
40→
41→### agent : A → (T, B)
42→
43→An LLM conversation loop. Each input line becomes a user message; the model's response is validated against type `B`. If validation fails, the error is fed back to the model for retry (up to `max_retries`). The agent also produces a telemetry stream `T` — reasoning traces from extended thinking, timing data, etc. The `.telemetry` port is silently discarded unless explicitly wired to an observer.
44→
45→By default, conversation context accumulates: each input builds on the prior exchange. The model sees the full history of user inputs and its own successful responses.
46→
47→Retry messages (validation failures and re-prompts) are internal reductions — they are visible to the model during the current exchange but do not carry forward into the history. The clean history contains only: user input, successful assistant response.
48→
49→Config options:
50→ 1→# Architecture
2→
3→Internal architecture for developers working on the plumbing language and runtime. For user-facing documentation, see the [README](../README.md).
4→
5→See [extending.md](extending.md) for how to add language features. See [executor.md](executor.md) for executor internals. See [mcp-server.md](mcp-server.md) for the MCP server. See [code-map.md](code-map.md) for a function-level reference.
6→
7→## Pipeline: parse → load → execute
8→
9→Every `.plumb` file follows a three-stage pipeline:
10→
11→```
12→source text → parse → program AST
13→ ↓
14→ spec load (validate types, find binding, desugar)
15→ ↓
16→ execute (create channels, spawn processes)
17→```
18→
19→For multi-file programs with `use` declarations, the pipeline includes module resolution:
20→
21→```
22→source file → load_file → parse_raw → raw_program
23→ ↓
24→ resolve use decls (recursive, with cycle detection)
25→ ↓
26→ prefix names (rewrite_program: Mod.name)
27→ ↓
28→ merge imports → flat program AST
29→ ↓
30→ spec load (validate, find binding, desugar)
31→```
32→
33→1. **Parse** (`lib/language/language.ml:parse`). Menhir parser produces a `program` AST (type declarations + protocol declarations + bindings). Value references (e.g., `agent my_config`) are resolved during parsing — the parser's `collect_program` resolves all `PI_value_ref` names before returning. Protocol well-formedness checking (guardedness, message type validity, label uniqueness, name uniqueness) also runs in `collect_program` (Pass 2b).
34→
35→ For multi-file programs, `parse_raw` returns a `raw_program` with `use` declarations and `module` name. The loader (`lib/language/loader.ml`) resolves imports recursively, prefixes all names with the module name, and merges into a flat `program`. Module-qualified names (e.g., `Fs.read`) are flat dotted strings — the dot has no structural significance outside the loader.
36→
37→2. **Spec load** (`lib/runtime/spec.ml`). Entry points:
38→ - `load_binding` — finds a binding by impl kind (id, agent, discard). Used by standalone process binaries.
39→ - `load_main` — finds the `main` binding from source text. Used for synthesised specs.
40→ - `load_main_from_file` — loads from a file path, resolving `use` declarations. Used by the plumb and chat binaries.
41→ - `load_main_from_string` — loads from source text (not a file), resolving `use` declarations against the search path. Used by the MCP server's `call` tool to execute pipelines submitted as source text.
42→ - `load_program` — returns the full parsed program + type_env without selecting a binding. Used by the agent binary to access all bindings (its own Agent binding + Tool bindings in the spec).
43→
44→ All entry points: parse, build `type_env`, validate sum disjointness, check reserved names, enforce bare types on tool bindings. `load_main` additionally desugars wiring statements (see [extending.md § Desugaring](extending.md#desugaring-static--dynamic-form)).
45→
46→3. **Execute** (`lib/plumb/exec.ml:execute`). Takes a program and a main binding (with `Plumb` impl). Creates Unix pipes for all channels. Two-pass execution:
47→ - Pass 1 (validate): resolve each spawn to a binding, type-check channels, track claimed fds.
48→ - Pass 2 (execute): create threads (structural morphisms) and fork subprocesses (agents, nested plumbs).
49→
50→## Module structure
51→
52→```
53→lib/language/ (library: language)
54→ ast.ml AST types — the single source of truth for the IR
55→ lexer.mll ocamllex tokenizer → tokens
56→ parser.mly Menhir grammar → program AST
57→ pp.ml Pretty-printer (AST → source text)
58→ expr.ml Expression evaluator for map/filter
59→ validate.ml JSON validation, type_env, type equality, disjointness
60→ desugar_utils.ml Shared helpers — fresh names, debug taps, agent port info
61→ desugar_session.ml Session wire desugaring — protocols → barrier chains + named channels
62→ desugar.ml Wiring statement desugaring (static → dynamic form)
63→ loader.ml Module loader — resolve use decls, prefix names, merge
64→ compile_session.ml Session type compiler — linear protocols → barrier chains
65→ session_names.ml Channel naming conventions for session protocols
66→ language.ml Public API — re-exports AST, exposes parse/pp/modules
67→
68→lib/runtime/ (library: runtime, depends on: language)
69→ error.ml ProcessError type + JSON serialization
70→ log.ml Structured JSON logging to stderr
71→ entry.ml CLI envelope — read spec file, run main, exit codes
72→ spec.ml Spec loading — parse, validate, desugar, find binding
73→
74→lib/plumb/ (library: plumb, depends on: language, runtime, dune-site)
75→ exec.ml Pipeline executor — channels, spawns, inline morphisms
76→ stdlib_path.ml Search path assembly — PLUMB_PATH env + dune sites
77→ resource_path.ml Resource file search — PLUMB_RESOURCES env + dune sites
78→ sites.ml (generated) Dune sites module — stdlib + resource discovery
79→
80→lib/mcp_client/ (library: mcp_client, depends on: yojson, unix, threads, runtime) 1→(** Inline morphism fibres — structural stream transformations over ZMQ.
2→
3→ Adapted from [Forward] (lib/plumb/forward.ml) to use the transport
4→ layer instead of Unix file descriptors. Each morphism runs as an
5→ Eio fibre, reading from PULL sockets and writing to PUSH sockets.
6→
7→ Fatal errors raise [Morphism_fatal] instead of calling [exit 1].
8→ The supervisor catches this at the fibre boundary. *)
9→
10→open Language
11→
12→exception Morphism_fatal of string * (string * Yojson.Safe.t) list
13→exception Stream_eof
14→
15→(** Truncate a JSON value for logging. *)
16→let truncate_json ?(max_str=200) ?(max_items=5) json =
17→ let rec trunc = function
18→ | `String s when String.length s > max_str ->
19→ `String (String.sub s 0 max_str ^ "...")
20→ | `List items when List.length items > max_items ->
21→ let kept = List.filteri (fun i _ -> i < max_items) items in
22→ `List (List.map trunc kept @
23→ [`String (Printf.sprintf "...(%d more)" (List.length items - max_items))])
24→ | `List items -> `List (List.map trunc items)
25→ | `Assoc fields -> `Assoc (List.map (fun (k, v) -> (k, trunc v)) fields)
26→ | other -> other
27→ in
28→ trunc json
29→
30→let morphism_fatal label extras =Now I have a thorough understanding of the codebase. Let me set up a worktree and start implementing.
git worktree add /tmp/plmb-037-step5c plumbing-1 2>&1 && echo "---OK---"