The direction is correct — the bridge is 445 lines of impedance matching that should not exist for OCaml agents. But the plan has problems on several axes.
The most important recommendation: drop the transport abstraction (Q6). Do not build a two-backend (stdio/ZMQ) abstraction inside the agent binary. The agent binary goes ZMQ-only. The bridge IS the stdio compatibility layer, and it stays in the fabric for non-OCaml agents. Adding a transport abstraction record type inside the agent buys you nothing — Loop.config is already transport-agnostic via callbacks.
The second most important concern: the fabric should not create sockets it immediately closes (Q1). For subprocess entries, compute endpoint strings but skip socket creation. Pass endpoint strings to the handler. Socket ownership must be unambiguous: whoever creates it, closes it.
The highest-risk item is the thread-to-fibre migration (Q5). This MUST happen simultaneously with the ZMQ migration, not independently. The current code has three OS threads synchronising via Mutex.t + Condition.t. If you replace stdio with ZMQ, all I/O becomes T.recv/T.send (fibre-safe via Eio_unix.await_readable), and the threads naturally become fibres. But a hybrid state (some threads, some fibres, mixed mutex types) is a deadlock factory. Do them together or not at all.
On the specific questions:
pipeline_id is fine but connect_endpoints is unnecessary if endpoint strings are passed directly.The full review with rationale for each point is at /Users/ww/.claude/plans/abundant-cuddling-oasis-agent-aa913465427ce4ea0.md.
Architect review in. Strong pushback: (1) drop the transport abstraction — agent goes ZMQ-only, bridge IS the stdio compat layer; (2) fabric should skip socket creation for subprocesses, not create-then-close; (3) thread→fibre migration must be simultaneous. Waiting for theory-critic, tech-writer, and test-engineer.
The plan is categorically sound. Moving from multiplexed stdio to per-port ZMQ sockets is a faithful implementation of the typed port decomposition the algebra already describes. The bridge was always characterised as a natural transformation between two stream representations; eliminating one representation is simpler than maintaining the transformation. One socket per port makes the channel structure isomorphic to the categorical decomposition — !(A, C) as independent streams gets independent sockets, which is what the tensor product promises.
No coherence concern. This is a strict improvement.
The __port envelope protocol is a manual encoding of what the type system already knows — the port decomposition of the agent's input/output types. Per-port sockets make the transport structurally isomorphic to the categorical decomposition. The key invariant — no cross-port interference — is better satisfied with independent sockets than with mutex-serialised writes to a single pipe.
The "close fabric sockets, agent creates fresh ones" sequence introduces a race condition. When the fabric closes a PUSH socket at an ipc:// endpoint and the agent later binds a new PUSH at the same path, downstream PULLs that were connected to the old socket may or may not reconnect. This depends on ZMQ internal state and timing — ipc:// socket file lifecycle, reconnection intervals, whether the file was unlinked.
Do not close and rebind at the same endpoint. The correct design:
Agents connect, the fabric binds. In ZMQ topology, the stable side binds and the transient side connects. The fabric is stable; agents are transient. For agent inputs: fabric binds PUSH, agent connects PULL. For agent outputs: fabric binds PULL, agent connects PUSH. No socket is ever closed and rebound. No gap. No race.
This requires the routing table compiler to know that subprocess output routes need reversed bind/connect polarity: the fabric creates a PULL (not PUSH) at the agent's output endpoint. For inline morphisms the current topology is fine since the fabric owns both sides. Looking at fabric.ml lines 112-136, the current code always binds PUSH — this needs to change for subprocess outputs.
This is a routing-table-level concern, not an algebraic one. The algebra is indifferent to bind/connect polarity.
PUSH/PULL is correct. REQ/REP is wrong here.
REQ/REP imposes strict send-recv alternation at the socket level. If either side violates this (bug, retry, race), the socket enters a permanent error state. PUSH/PULL has no such coupling — synchronisation is in the application protocol (the agent blocks on PULL after PUSHing), not the socket pattern.
Additionally, PUSH/PULL does not foreclose concurrent tool dispatch. The protocol already includes an id field for request-response correlation. If you later want parallel tool calls, PUSH/PULL supports it; REQ/REP does not.
Sound if and only if all I/O is Eio-cooperative.
The critical distinction: OCaml Mutex.lock blocks the OS thread; Eio.Mutex.lock suspends the fibre. If the conversation loop blocks in a non-Eio-aware input_line while a control reader fibre is waiting on Eio.Condition.await on the same OS thread, the condition is never signalled — deadlock.
With per-port ZMQ sockets, the control reader becomes an Eio fibre doing T.recv on the control PULL socket, which yields via Eio_unix.await_readable. The data reader does the same on the data PULL socket. The LLM client uses ~sw ~net parameters, indicating Eio-native HTTP. All paths are Eio-cooperative, so Eio.Mutex/Condition is correct.
The danger is accidental introduction of blocking I/O that starves other fibres. Document this invariant explicitly: all I/O in the agent event loop must go through Eio-aware primitives or Eio_unix.run_in_systhread.
Transport abstraction: A record type (not a functor) with per-port recv/send/close is the right design. The agent binary spawns one reader fibre per input port. The abstraction should not try to unify the concurrency model across backends.
EOF protocol: Preserving the empty-payload sentinel is correct. Per-port sockets make EOF per-port by construction, which is cleaner than {"__port":"...","__eof":true} envelopes.
Information partitions: Preserved. Endpoint discovery via PLUMB_EP_* env vars is capability-based — the agent can only connect to endpoints the fabric tells it about, which are determined by the type-checked routing table.
Bridge retention: Correct. The bridge remains as the natural transformation for stdio-based processes (nested plumbs, third-party agents). ZMQ-native agents bypass it.
The full review is at /Users/ww/.claude/plans/abundant-cuddling-oasis-agent-a8c28f3edf4ac6941.md.
1. Does the plan account for the __port envelope section in doc/processes.md?
No — and that is correct for Step 5c. The "Multiplexed I/O" section should not be removed in this step because the bridge (the stdio backend) is explicitly kept for backward compatibility. The implementation plan already defers that removal to the Step 6 documentation pass, after exec.ml is deleted. What Step 5c should do is annotate that section to identify it as the stdio transport path, and note that the ZMQ path uses direct socket connections via the PLUMB_EP_* env vars instead.
2. Do the doc/executor.md fd-ownership sections need updating?
No, not in Step 5c. Those sections document the exec.ml code path, which Step 5c does not touch. The PLMB-059 ticket (already open) requests a status note flagging that the fabric is now the primary path. That note is appropriate to add now as PLMB-059 backlog, but it is not a Step 5c obligation.
3. Does doc/architecture.md need updating for agent_transport.ml?
Yes. The module table and dependency diagram are already stale by PLMB-059's reckoning (it's missing lib/fabric/, lib/supervisor/, lib/transport/, lib/spawn/). Step 5c adds another new module. The plan should co-locate all of these updates rather than leave a second partial pass for later.
The dependency diagram in doc/architecture.md currently shows bin/agent depending only on client. After Step 5c, bin/agent will also depend directly on lib/transport/. This needs to be reflected.
4. Is the omission of doc files from "Files to modify" an oversight?
Yes. The following files need updating in Step 5c:
| File | Change |
|---|---|
doc/agent-runtime.md |
Add new env vars to the table; update "Control channel" section |
doc/processes.md |
Annotate "Multiplexed I/O" as the stdio transport path |
doc/architecture.md |
Add new transport module(s) to module table and dependency graph |
5. Are there LDC design notes that should be updated?
No standalone bridge architecture design note exists in LDC. The bridge is documented in research/plumbing/engineering-state.md (Step 5a description) and the implementation plan. After Step 5c, the engineering-state.md should be updated to record the relationship between the new transport abstraction and the bridge (bridge = stdio backend of the transport).
Naming conflict — PLUMB_EP_* vs PLUMB_ENDPOINT_*
The Step 5c plan uses PLUMB_EP_*. The implementation plan D8 specifies the longer form:
PLUMB_ENDPOINT_INPUT=ipc:///tmp/plumb-12345/agent-drafter/input
PLUMB_ENDPOINT_OUTPUT=...
PLUMB_ENDPOINT_CTRL=...
PLUMB_ENDPOINT_TELEMETRY=...
PLUMB_ENDPOINT_TOOL_REQ=...
PLUMB_ENDPOINT_TOOL_RESP=...
These names will appear in doc/agent-runtime.md, in bridge.ml, and in the agent binary. They must be consistent. The plan should state which form is chosen and it should match D8 or explicitly supersede it.
PLUMB_TRANSPORT selector semantics
The plan introduces PLUMB_TRANSPORT=zmq as a transport selector. D8 does not mention this variable — it assumes the ZMQ path is used when the endpoint vars are present and absent otherwise. There are two distinct approaches:
PLUMB_ENDPOINT_INPUT is set; stdio path otherwise. No separate selector needed.PLUMB_TRANSPORT=zmq|stdio. Cleaner to document, but adds a variable.This is a design decision, not just a naming choice. doc/agent-runtime.md needs to document whichever approach is chosen.
doc/agent-runtime.md needsBeyond the env var table, the "Control channel" section currently describes the control path in terms of file descriptors:
A background thread reads from the internal ctrl pipe (fed by the stdin demux) and updates the agent's mutable state under a mutex.
Control output (fd 7)
Step 5c migrates the control reader from a thread to an Eio fibre, and under ZMQ the control path uses a PAIR socket, not fd 7. Both the "background thread" and "fd 7" references need updating for the ZMQ path. The stdio path description can remain (it covers the bridge/backward-compat case) but the two paths need to be distinguished clearly.
The following belong to Step 6 (the full documentation pass, PLMB-059):
doc/processes.md — only valid after the stdio path is fully retired (Step 5d deletion of exec.ml)doc/executor.md — still accurate for exec.ml, rewrite waits for deletiondoc/extending.md Step 7 (exec.ml references) — same reason
Theory critic and tech writer in. Key consensus forming:
PLUMB_EP_* vs PLUMB_ENDPOINT_*), and PLUMB_TRANSPORT selector needs a decision.Still waiting for test-engineer.
The proposed 2-test strategy is insufficient. It covers roughly 15% of the semantic surface affected by Step 5c. Here are the key findings:
The critical gap is the control protocol. You have 13 tests in test_agent_control.ml exercising pause, resume, get_memory, set_memory, stop, drain mode, and EOF handling — all over muxed stdio. Step 5c replaces that entire I/O layer. Every one of those 13 tests verifies a session-type state transition that must still hold under ZMQ. The plan adds zero control protocol tests for the new transport.
Tool dispatch is untested at any level. The plan introduces PLUMB_ENDPOINT_TOOL_REQ / PLUMB_ENDPOINT_TOOL_RESP ZMQ sockets, but no existing test exercises tool dispatch through the fabric at all (ELIZA has no tools). This is new wiring with no coverage.
Six other gaps identified, in priority order:
make_stdio / make_zmq abstraction has its own contracts worth testing independentlyempty ; agent ; id) — the plan mentions verifying this but does not make it a formal testRevised must-have list: 13 tests (the proposed 2 plus 11 more). Full breakdown with rationale is in the plan file at /Users/ww/.claude/plans/abundant-cuddling-oasis-agent-a3bdd1dce5298a136.md.
Practical recommendation for the control protocol tests: extract the test helpers in test_agent_control.ml (spawn, read_port, send_control, etc.) into a shared module parameterised by transport backend, then run both stdio and ZMQ variants. If that is too costly, port the 6 most semantically rich tests: stop, stdin-EOF, pause-get-set-resume, drain-completes-protocol, set-memory-malformed-rejected, and basic-response.
Answer to Q5 specifically: yes, empty ; agent ; id should be a formal test. It is the canonical termination cascade — three components, two transport boundaries, and it would catch four distinct bug classes (EOF not propagated in either direction, agent hanging in drain, fabric not shutting down).
All five reviews are in. Let me synthesise the consensus and update the plan.
Review consensus — critical changes needed:
Loop.config is already transport-agnostic.# PLMB-037 Step 5c: Agent binary ZMQ rewrite
## Context
The fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ
sockets, but agent subprocesses still communicate via stdin/stdout
with `__port` JSON envelopes. A **bridge** module (`bridge.ml`,
~445 lines) translates between the two.
Step 5c eliminates the bridge for agents by having the agent binary
speak ZMQ directly — one socket per logical port, no envelopes.
**Worktree:** create from `plumbing-1` branch
## Review consensus (5 reviewers)
### Agreed changes from original plan
- **No transport abstraction**: agent goes ZMQ-only. The bridge IS
the stdio compatibility layer for non-OCaml agents. `Loop.config`
is already transport-agnostic via callbacks — no new abstraction
needed in the agent binary.
- **Fabric binds both sides; agent always connects**: no
close-and-rebind. For agent output routes, fabric binds PULL at
the agent's endpoint; agent connects PUSH to it. For agent input
routes, fabric binds PUSH (as today); agent connects PULL. Fabric
skips creating agent-side sockets entirely for ZMQ-native
subprocesses.
- **Thread→fibre migration is simultaneous**: no hybrid thread/fibre
state. All agent I/O becomes Eio-cooperative with ZMQ; OCaml
threads become daemon fibres. `Mutex`/`Condition` become
`Eio.Mutex`/`Eio.Condition`.
- **Explicit startup and shutdown protocols**: state machines, not
assumptions about ZMQ auto-reconnect timing.
- **ZMQ handler in a separate module**: not in `bridge.ml` (which is
a natural transformation between representations; the ZMQ handler
is not a bridge).
- **PUSH/PULL for tool dispatch** (not REQ/REP): no strict
alternation constraint, supports future parallel tool calls.
- **Naming**: `PLUMB_ENDPOINT_*` (matching implementation plan D8).
Presence-based detection (ZMQ mode if `PLUMB_ENDPOINT_INPUT` is
set; no separate `PLUMB_TRANSPORT` selector).
## Architecture
### Current (bridge)
```
Fabric Agent process
PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop
PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop
```
### After (ZMQ direct)
```
Fabric Agent process
PUSH ─bind──(ZMQ)──connect─ PULL → loop (Eio fibre)
PULL ─bind──(ZMQ)──connect─ PUSH ← loop (Eio fibre)
```
Fabric binds all sockets. Agent connects. No pipes, no envelopes,
no demux thread.
## Design decisions
### D1: Agent goes ZMQ-only
The agent binary (`bin/agent/main.ml`) is rewritten to use ZMQ
sockets directly. No transport abstraction record — the bridge
remains as the stdio path for non-OCaml agents and nested plumbs.
`Loop.config` callbacks (`dispatch_tool`, `emit_telemetry_line`,
`get_model`, etc.) are already transport-agnostic; the main.ml
changes are confined to I/O setup and the main loop's read/write
calls.
### D2: Endpoint discovery
The fabric sets per-port env vars before spawning:
```
PLUMB_ENDPOINT_INPUT=ipc:///tmp/plumb-{pid}/{proc}/data_in
PLUMB_ENDPOINT_OUTPUT=ipc:///tmp/plumb-{pid}/{proc}/data_out
PLUMB_ENDPOINT_CTRL_IN=ipc:///tmp/plumb-{pid}/{proc}/ctrl_in
PLUMB_ENDPOINT_CTRL_OUT=ipc:///tmp/plumb-{pid}/{proc}/ctrl_out
PLUMB_ENDPOINT_TELEMETRY=ipc:///tmp/plumb-{pid}/{proc}/telemetry
PLUMB_ENDPOINT_TOOL_REQ=ipc:///tmp/plumb-{pid}/{proc}/tool_req
PLUMB_ENDPOINT_TOOL_RESP=ipc:///tmp/plumb-{pid}/{proc}/tool_resp
```
Absent var = port not wired. ZMQ mode detected by presence of
`PLUMB_ENDPOINT_INPUT`. If absent, the existing stdio path runs
(for backward compatibility during transition).
### D3: Socket ownership — fabric binds, agent connects
The fabric creates sockets differently for subprocess vs inline:
**Inline morphisms** (as today): fabric creates both PUSH (bind)
and PULL (connect) per route.
**ZMQ-native subprocesses**: fabric creates only the non-agent-side
socket. For routes where data flows TO the agent, fabric binds
PUSH at the upstream endpoint. For routes where data flows FROM
the agent, fabric binds PULL at the agent's endpoint. Agent
connects complementary sockets to these endpoints.
This requires the fabric to distinguish subprocess from inline
processes before socket creation (available from `proc_kind` on
`process_entry`).
### D4: Threads → Eio fibres
| Current | After |
|---------|-------|
| `start_stdin_demux` (Thread) | Eliminated — one socket per port |
| `control_reader_thread` (Thread) | Daemon fibre, `T.recv` on ctrl PULL |
| `Mutex.t` on control_state | `Eio.Mutex.t` |
| `Condition.t` for pause/memory | `Eio.Condition.t` |
| `stdout_mutex` for mux_write | Eliminated — one socket per port |
**Critical invariant** (theory-critic): all I/O in the agent must
go through Eio-aware primitives or `Eio_unix.run_in_systhread`.
Do not hold `Eio.Mutex` across ZMQ recv calls (engineer deadlock
finding).
### D5: Startup synchronisation
After binding/connecting all sockets, the agent sends a `ready`
message on its ctrl_out PUSH socket. The fabric's ZMQ handler
waits for this before sending data. This eliminates the
slow-joiner race.
### D6: Shutdown protocol
1. Fabric sends EOF on data_in (empty payload sentinel)
2. Agent `recv_data` raises `End_of_file`, enters drain phase
3. Agent processes any pending control messages (pause/memory)
4. Fabric sends EOF on ctrl_in (or `{"stop":true}` + EOF)
5. Agent sends EOF on all output sockets (data_out, ctrl_out,
telemetry, tool_req)
6. Agent closes sockets, destroys context, exits
7. Fabric detects EOF on its PULL sockets and reaps child
Per-socket EOF timeouts in the fabric — do not assume all EOFs
arrive atomically.
### D7: Tool dispatch stays in fabric
Agent PUSH → fabric PULL → dispatch thread → fabric PUSH → agent
PULL. Pipe adapters (ZMQ↔pipe) bridge to the existing
`tool_dispatch_loop`. Follow-on ticket to unify the interface.
### D8: Bridge retained
The bridge remains for:
- Nested plumb subprocesses (which speak stdio)
- Non-OCaml agent binaries
- Transition period (fallback if ZMQ path has issues)
### D9: Back-pressure
Set `ZMQ_SNDHWM = 0` (unlimited) on all agent sockets — matching
the existing pipe semantics (pipes block on full buffer, never
drop). Document this choice. A future step can add explicit
back-pressure policy.
## Changes
### 1. Fabric socket creation for subprocesses
**`lib/fabric/fabric.ml`** — modify socket creation loop:
For `Subprocess` processes, don't create agent-side sockets.
Instead, compute endpoint strings. For routes where agent is DST:
agent needs to connect its PULL to the upstream PUSH endpoint. For
routes where agent is SRC: fabric binds PULL at the agent's
endpoint; agent connects PUSH.
Extend `process_handler` callback to pass endpoint strings:
```ocaml
?process_handler:(
sw:Eio.Switch.t ->
proc_entry:process_entry ->
push_sockets:(string * [`Push] T.socket) list ->
pull_sockets:(string * [`Pull] T.socket) list ->
endpoints:(string * string) list -> (* port_name, endpoint_uri *)
unit)
```
`push_sockets`/`pull_sockets` contain only the fabric-side sockets
(for the bridge path). `endpoints` contains all port→URI mappings
(for the ZMQ-native path).
### 2. ZMQ process handler — `lib/fabric/zmq_agent.ml` (new)
Separate module (not in bridge.ml). `make_handler` function:
1. Synthesise agent spec (reuse from bridge.ml)
2. Set up tool dispatch: bind PULL at tool_req endpoint, bind PUSH
at tool_resp endpoint, run pipe-adapter fibres to existing
`tool_dispatch_loop`
3. Spawn agent with `PLUMB_ENDPOINT_*` env vars + credentials
4. Wait for `ready` message on ctrl_out PULL socket
5. Fork reaper fibre (waitpid)
6. Fork per-socket EOF timeout fibres
### 3. Rewrite `bin/agent/main.ml`
Major changes:
- Remove `stdout_mutex`, `mux_write`, `start_stdin_demux` (~90 lines)
- Create ZMQ context + sockets from `PLUMB_ENDPOINT_*` env vars
- Replace `input_line data_ic` with `T.recv data_in_sock`; raise
`End_of_file` on empty payload
- Replace `mux_write "output"` with `T.send data_out_sock`
- Replace `mux_write "telemetry"` with `T.send telemetry_sock`
- Replace `mux_write "ctrl_out"` with `T.send ctrl_out_sock`
- Replace `mux_write "tool_req"` / `input_line tool_ic` with
`T.send tool_req_sock` / `T.recv tool_resp_sock`
- Control reader: daemon fibre doing `T.recv ctrl_in_sock`
- `control_state`: `Eio.Mutex.t` + `Eio.Condition.t`
- Send `ready` on ctrl_out after socket setup
- Shutdown: send EOF on all output sockets
**Fallback**: if `PLUMB_ENDPOINT_INPUT` is not set, keep existing
stdio path (for running agent binary standalone or under the bridge
during transition).
### 4. Spawn lib — env var support
**`lib/spawn/spawn_lib.ml`**:
- Add `PLUMB_ENDPOINT_*` vars to `safe_env_keys`
- Helper to compute env var list from port→endpoint mapping
### 5. Dune dependency
**`bin/agent/dune`**: add `transport` to libraries.
### 6. Tests (13 minimum)
**`test/test_fabric.ml`** — new tests:
1. `test_agent_identity_zmq` — identity response via ZMQ
2. `test_agent_eof_zmq` — EOF propagation via ZMQ
3. `test_agent_eof_cascade_zmq` — `empty ; agent ; id` terminates
4. `test_agent_telemetry_zmq` — telemetry arrives on correct socket
5. `test_agent_tool_dispatch_zmq` — tool req/resp over ZMQ
**`test/test_agent_control.ml`** — port key control tests to ZMQ:
6. `test_stop_zmq` — stop via control socket
7. `test_stdin_eof_zmq` — data EOF triggers drain
8. `test_pause_resume_zmq` — pause/get/set/resume flow
9. `test_drain_zmq` — drain completes protocol
10. `test_set_memory_malformed_zmq` — rejects malformed memory
11. `test_basic_response_zmq` — basic LLM response
**Error path tests**:
12. `test_missing_endpoint_env` — clear error on missing env var
13. `test_startup_ready_signal` — fabric waits for ready before send
Extract test helpers into a shared module parameterised by transport
backend to avoid duplicating test infrastructure.
### 7. Documentation
**`doc/agent-runtime.md`**:
- Add `PLUMB_ENDPOINT_*` env var table
- Update control channel section (fibre, not thread; socket, not fd)
- Note presence-based ZMQ detection
**`doc/processes.md`**:
- Annotate "Multiplexed I/O" section as stdio transport path
- Note that ZMQ-native agents use per-port sockets
**`doc/architecture.md`**:
- Add `lib/fabric/zmq_agent.ml` to module table
- Update dependency diagram: `bin/agent` → `transport`
## Files to modify
| File | Change |
|------|--------|
| `lib/fabric/fabric.ml` | Subprocess socket creation; extend callback |
| `lib/fabric/zmq_agent.ml` | New — ZMQ process handler |
| `bin/agent/main.ml` | ZMQ rewrite; threads→fibres |
| `bin/agent/dune` | Add `transport` dependency |
| `lib/spawn/spawn_lib.ml` | Endpoint env var helpers |
| `test/test_fabric.ml` | 5 new ZMQ agent tests |
| `test/test_agent_control.ml` | 6 control tests ported to ZMQ |
| `test/test_zmq_errors.ml` | 2 error path tests |
| `doc/agent-runtime.md` | Env vars, control channel |
| `doc/processes.md` | Annotate Multiplexed I/O |
| `doc/architecture.md` | Module table, dependency graph |
## Implementation order
1. **Fabric socket bifurcation** — modify fabric.ml to skip
agent-side sockets for subprocess processes; extend callback
with endpoints. Update bridge's `make_handler` to work with
the new callback signature. `dune build && dune test` — all
existing tests pass.
2. **ZMQ handler** — `zmq_agent.ml`: spec synthesis, tool dispatch
wiring, endpoint env vars, agent spawn, ready handshake.
`dune build`.
3. **Agent binary rewrite** — main.ml: ZMQ sockets, daemon fibres,
Eio.Mutex/Condition, ready signal, shutdown protocol.
`dune build`.
4. **Integration + tests** — wire ZMQ handler into fabric, run all
13 tests. `dune build && dune test`.
5. **Documentation** — agent-runtime.md, processes.md,
architecture.md.
## Verification
1. After step 1: `dune build && dune test` — all existing bridge
tests pass with extended callback
2. After step 3: `dune build` — agent binary compiles with ZMQ
3. After step 4: full test suite; all 13 new tests + existing tests
4. EOF cascade: `empty ; agent ; id` terminates cleanly (test 3)
5. Control protocol: pause/resume/memory works over ZMQ (tests 8-10)
6. Tool dispatch: req/resp round-trips over ZMQ (test 5)
7. Update PLMB ticket and engineering state in LDC
The file /Users/ww/.claude/plans/abundant-cuddling-oasis.md has been updated successfully.
Plan updated with all five reviewers' feedback incorporated. Key changes from the original:
zmq_agent.ml module — separate from bridge.mlready message) eliminates slow-joiner race{
"allowedPrompts": [
{
"tool": "Bash",
"prompt": "run nix develop build and test commands"
},
{
"tool": "Bash",
"prompt": "run git commands for worktree creation, commits, and merges"
}
],
"plan": "# PLMB-037 Step 5c: Agent binary ZMQ rewrite\n\n## Context\n\nThe fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ\nsockets, but agent subprocesses still communicate via stdin/stdout\nwith `__port` JSON envelopes. A **bridge** module (`bridge.ml`,\n~445 lines) translates between the two.\n\nStep 5c eliminates the bridge for agents by having the agent binary\nspeak ZMQ directly — one socket per logical port, no envelopes.\n\n**Worktree:** create from `plumbing-1` branch\n\n## Review consensus (5 reviewers)\n\n### Agreed changes from original plan\n\n- **No transport abstraction**: agent goes ZMQ-only. The bridge IS\n the stdio compatibility layer for non-OCaml agents. `Loop.config`\n is already transport-agnostic via callbacks — no new abstraction\n needed in the agent binary.\n- **Fabric binds both sides; agent always connects**: no\n close-and-rebind. For agent output routes, fabric binds PULL at\n the agent's endpoint; agent connects PUSH to it. For agent input\n routes, fabric binds PUSH (as today); agent connects PULL. Fabric\n skips creating agent-side sockets entirely for ZMQ-native\n subprocesses.\n- **Thread→fibre migration is simultaneous**: no hybrid thread/fibre\n state. All agent I/O becomes Eio-cooperative with ZMQ; OCaml\n threads become daemon fibres. `Mutex`/`Condition` become\n `Eio.Mutex`/`Eio.Condition`.\n- **Explicit startup and shutdown protocols**: state machines, not\n assumptions about ZMQ auto-reconnect timing.\n- **ZMQ handler in a separate module**: not in `bridge.ml` (which is\n a natural transformation between representations; the ZMQ handler\n is not a bridge).\n- **PUSH/PULL for tool dispatch** (not REQ/REP): no strict\n alternation constraint, supports future parallel tool calls.\n- **Naming**: `PLUMB_ENDPOINT_*` (matching implementation plan D8).\n Presence-based detection (ZMQ mode if `PLUMB_ENDPOINT_INPUT` is\n set; no separate `PLUMB_TRANSPORT` selector).\n\n## Architecture\n\n### Current (bridge)\n\n```\nFabric Agent process\n PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop\n PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop\n```\n\n### After (ZMQ direct)\n\n```\nFabric Agent process\n PUSH ─bind──(ZMQ)──connect─ PULL → loop (Eio fibre)\n PULL ─bind──(ZMQ)──connect─ PUSH ← loop (Eio fibre)\n```\n\nFabric binds all sockets. Agent connects. No pipes, no envelopes,\nno demux thread.\n\n## Design decisions\n\n### D1: Agent goes ZMQ-only\n\nThe agent binary (`bin/agent/main.ml`) is rewritten to use ZMQ\nsockets directly. No transport abstraction record — the bridge\nremains as the stdio path for non-OCaml agents and nested plumbs.\n`Loop.config` callbacks (`dispatch_tool`, `emit_telemetry_line`,\n`get_model`, etc.) are already transport-agnostic; the main.ml\nchanges are confined to I/O setup and the main loop's read/write\ncalls.\n\n### D2: Endpoint discovery\n\nThe fabric sets per-port env vars before spawning:\n\n```\nPLUMB_ENDPOINT_INPUT=ipc:///tmp/plumb-{pid}/{proc}/data_in\nPLUMB_ENDPOINT_OUTPUT=ipc:///tmp/plumb-{pid}/{proc}/data_out\nPLUMB_ENDPOINT_CTRL_IN=ipc:///tmp/plumb-{pid}/{proc}/ctrl_in\nPLUMB_ENDPOINT_CTRL_OUT=ipc:///tmp/plumb-{pid}/{proc}/ctrl_out\nPLUMB_ENDPOINT_TELEMETRY=ipc:///tmp/plumb-{pid}/{proc}/telemetry\nPLUMB_ENDPOINT_TOOL_REQ=ipc:///tmp/plumb-{pid}/{proc}/tool_req\nPLUMB_ENDPOINT_TOOL_RESP=ipc:///tmp/plumb-{pid}/{proc}/tool_resp\n```\n\nAbsent var = port not wired. ZMQ mode detected by presence of\n`PLUMB_ENDPOINT_INPUT`. If absent, the existing stdio path runs\n(for backward compatibility during transition).\n\n### D3: Socket ownership — fabric binds, agent connects\n\nThe fabric creates sockets differently for subprocess vs inline:\n\n**Inline morphisms** (as today): fabric creates both PUSH (bind)\nand PULL (connect) per route.\n\n**ZMQ-native subprocesses**: fabric creates only the non-agent-side\nsocket. For routes where data flows TO the agent, fabric binds\nPUSH at the upstream endpoint. For routes where data flows FROM\nthe agent, fabric binds PULL at the agent's endpoint. Agent\nconnects complementary sockets to these endpoints.\n\nThis requires the fabric to distinguish subprocess from inline\nprocesses before socket creation (available from `proc_kind` on\n`process_entry`).\n\n### D4: Threads → Eio fibres\n\n| Current | After |\n|---------|-------|\n| `start_stdin_demux` (Thread) | Eliminated — one socket per port |\n| `control_reader_thread` (Thread) | Daemon fibre, `T.recv` on ctrl PULL |\n| `Mutex.t` on control_state | `Eio.Mutex.t` |\n| `Condition.t` for pause/memory | `Eio.Condition.t` |\n| `stdout_mutex` for mux_write | Eliminated — one socket per port |\n\n**Critical invariant** (theory-critic): all I/O in the agent must\ngo through Eio-aware primitives or `Eio_unix.run_in_systhread`.\nDo not hold `Eio.Mutex` across ZMQ recv calls (engineer deadlock\nfinding).\n\n### D5: Startup synchronisation\n\nAfter binding/connecting all sockets, the agent sends a `ready`\nmessage on its ctrl_out PUSH socket. The fabric's ZMQ handler\nwaits for this before sending data. This eliminates the\nslow-joiner race.\n\n### D6: Shutdown protocol\n\n1. Fabric sends EOF on data_in (empty payload sentinel)\n2. Agent `recv_data` raises `End_of_file`, enters drain phase\n3. Agent processes any pending control messages (pause/memory)\n4. Fabric sends EOF on ctrl_in (or `{\"stop\":true}` + EOF)\n5. Agent sends EOF on all output sockets (data_out, ctrl_out,\n telemetry, tool_req)\n6. Agent closes sockets, destroys context, exits\n7. Fabric detects EOF on its PULL sockets and reaps child\n\nPer-socket EOF timeouts in the fabric — do not assume all EOFs\narrive atomically.\n\n### D7: Tool dispatch stays in fabric\n\nAgent PUSH → fabric PULL → dispatch thread → fabric PUSH → agent\nPULL. Pipe adapters (ZMQ↔pipe) bridge to the existing\n`tool_dispatch_loop`. Follow-on ticket to unify the interface.\n\n### D8: Bridge retained\n\nThe bridge remains for:\n- Nested plumb subprocesses (which speak stdio)\n- Non-OCaml agent binaries\n- Transition period (fallback if ZMQ path has issues)\n\n### D9: Back-pressure\n\nSet `ZMQ_SNDHWM = 0` (unlimited) on all agent sockets — matching\nthe existing pipe semantics (pipes block on full buffer, never\ndrop). Document this choice. A future step can add explicit\nback-pressure policy.\n\n## Changes\n\n### 1. Fabric socket creation for subprocesses\n\n**`lib/fabric/fabric.ml`** — modify socket creation loop:\n\nFor `Subprocess` processes, don't create agent-side sockets.\nInstead, compute endpoint strings. For routes where agent is DST:\nagent needs to connect its PULL to the upstream PUSH endpoint. For\nroutes where agent is SRC: fabric binds PULL at the agent's\nendpoint; agent connects PUSH.\n\nExtend `process_handler` callback to pass endpoint strings:\n\n```ocaml\n?process_handler:(\n sw:Eio.Switch.t ->\n proc_entry:process_entry ->\n push_sockets:(string * [`Push] T.socket) list ->\n pull_sockets:(string * [`Pull] T.socket) list ->\n endpoints:(string * string) list -> (* port_name, endpoint_uri *)\n unit)\n```\n\n`push_sockets`/`pull_sockets` contain only the fabric-side sockets\n(for the bridge path). `endpoints` contains all port→URI mappings\n(for the ZMQ-native path).\n\n### 2. ZMQ process handler — `lib/fabric/zmq_agent.ml` (new)\n\nSeparate module (not in bridge.ml). `make_handler` function:\n\n1. Synthesise agent spec (reuse from bridge.ml)\n2. Set up tool dispatch: bind PULL at tool_req endpoint, bind PUSH\n at tool_resp endpoint, run pipe-adapter fibres to existing\n `tool_dispatch_loop`\n3. Spawn agent with `PLUMB_ENDPOINT_*` env vars + credentials\n4. Wait for `ready` message on ctrl_out PULL socket\n5. Fork reaper fibre (waitpid)\n6. Fork per-socket EOF timeout fibres\n\n### 3. Rewrite `bin/agent/main.ml`\n\nMajor changes:\n- Remove `stdout_mutex`, `mux_write`, `start_stdin_demux` (~90 lines)\n- Create ZMQ context + sockets from `PLUMB_ENDPOINT_*` env vars\n- Replace `input_line data_ic` with `T.recv data_in_sock`; raise\n `End_of_file` on empty payload\n- Replace `mux_write \"output\"` with `T.send data_out_sock`\n- Replace `mux_write \"telemetry\"` with `T.send telemetry_sock`\n- Replace `mux_write \"ctrl_out\"` with `T.send ctrl_out_sock`\n- Replace `mux_write \"tool_req\"` / `input_line tool_ic` with\n `T.send tool_req_sock` / `T.recv tool_resp_sock`\n- Control reader: daemon fibre doing `T.recv ctrl_in_sock`\n- `control_state`: `Eio.Mutex.t` + `Eio.Condition.t`\n- Send `ready` on ctrl_out after socket setup\n- Shutdown: send EOF on all output sockets\n\n**Fallback**: if `PLUMB_ENDPOINT_INPUT` is not set, keep existing\nstdio path (for running agent binary standalone or under the bridge\nduring transition).\n\n### 4. Spawn lib — env var support\n\n**`lib/spawn/spawn_lib.ml`**:\n- Add `PLUMB_ENDPOINT_*` vars to `safe_env_keys`\n- Helper to compute env var list from port→endpoint mapping\n\n### 5. Dune dependency\n\n**`bin/agent/dune`**: add `transport` to libraries.\n\n### 6. Tests (13 minimum)\n\n**`test/test_fabric.ml`** — new tests:\n1. `test_agent_identity_zmq` — identity response via ZMQ\n2. `test_agent_eof_zmq` — EOF propagation via ZMQ\n3. `test_agent_eof_cascade_zmq` — `empty ; agent ; id` terminates\n4. `test_agent_telemetry_zmq` — telemetry arrives on correct socket\n5. `test_agent_tool_dispatch_zmq` — tool req/resp over ZMQ\n\n**`test/test_agent_control.ml`** — port key control tests to ZMQ:\n6. `test_stop_zmq` — stop via control socket\n7. `test_stdin_eof_zmq` — data EOF triggers drain\n8. `test_pause_resume_zmq` — pause/get/set/resume flow\n9. `test_drain_zmq` — drain completes protocol\n10. `test_set_memory_malformed_zmq` — rejects malformed memory\n11. `test_basic_response_zmq` — basic LLM response\n\n**Error path tests**:\n12. `test_missing_endpoint_env` — clear error on missing env var\n13. `test_startup_ready_signal` — fabric waits for ready before send\n\nExtract test helpers into a shared module parameterised by transport\nbackend to avoid duplicating test infrastructure.\n\n### 7. Documentation\n\n**`doc/agent-runtime.md`**:\n- Add `PLUMB_ENDPOINT_*` env var table\n- Update control channel section (fibre, not thread; socket, not fd)\n- Note presence-based ZMQ detection\n\n**`doc/processes.md`**:\n- Annotate \"Multiplexed I/O\" section as stdio transport path\n- Note that ZMQ-native agents use per-port sockets\n\n**`doc/architecture.md`**:\n- Add `lib/fabric/zmq_agent.ml` to module table\n- Update dependency diagram: `bin/agent` → `transport`\n\n## Files to modify\n\n| File | Change |\n|------|--------|\n| `lib/fabric/fabric.ml` | Subprocess socket creation; extend callback |\n| `lib/fabric/zmq_agent.ml` | New — ZMQ process handler |\n| `bin/agent/main.ml` | ZMQ rewrite; threads→fibres |\n| `bin/agent/dune` | Add `transport` dependency |\n| `lib/spawn/spawn_lib.ml` | Endpoint env var helpers |\n| `test/test_fabric.ml` | 5 new ZMQ agent tests |\n| `test/test_agent_control.ml` | 6 control tests ported to ZMQ |\n| `test/test_zmq_errors.ml` | 2 error path tests |\n| `doc/agent-runtime.md` | Env vars, control channel |\n| `doc/processes.md` | Annotate Multiplexed I/O |\n| `doc/architecture.md` | Module table, dependency graph |\n\n## Implementation order\n\n1. **Fabric socket bifurcation** — modify fabric.ml to skip\n agent-side sockets for subprocess processes; extend callback\n with endpoints. Update bridge's `make_handler` to work with\n the new callback signature. `dune build && dune test` — all\n existing tests pass.\n\n2. **ZMQ handler** — `zmq_agent.ml`: spec synthesis, tool dispatch\n wiring, endpoint env vars, agent spawn, ready handshake.\n `dune build`.\n\n3. **Agent binary rewrite** — main.ml: ZMQ sockets, daemon fibres,\n Eio.Mutex/Condition, ready signal, shutdown protocol.\n `dune build`.\n\n4. **Integration + tests** — wire ZMQ handler into fabric, run all\n 13 tests. `dune build && dune test`.\n\n5. **Documentation** — agent-runtime.md, processes.md,\n architecture.md.\n\n## Verification\n\n1. After step 1: `dune build && dune test` — all existing bridge\n tests pass with extended callback\n2. After step 3: `dune build` — agent binary compiles with ZMQ\n3. After step 4: full test suite; all 13 new tests + existing tests\n4. EOF cascade: `empty ; agent ; id` terminates cleanly (test 3)\n5. Control protocol: pause/resume/memory works over ZMQ (tests 8-10)\n6. Tool dispatch: req/resp round-trips over ZMQ (test 5)\n7. Update PLMB ticket and engineering state in LDC\n"
}The user doesn't want to proceed with this tool use. The tool use was rejected (eg. if it was a file edit, the new_string was NOT written to the file). To tell you how to proceed, the user said: reviews were substantive. you made major changes to the plan. send for re-review
1→# PLMB-037 Step 5c: Agent binary ZMQ rewrite
2→
3→## Context
4→
5→The fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ
6→sockets, but agent subprocesses still communicate via stdin/stdout
7→with `__port` JSON envelopes. A **bridge** module (`bridge.ml`,
8→~445 lines) translates between the two.
9→
10→Step 5c eliminates the bridge for agents by having the agent binary
11→speak ZMQ directly — one socket per logical port, no envelopes.
12→
13→**Worktree:** create from `plumbing-1` branch
14→
15→## Review consensus (5 reviewers)
16→
17→### Agreed changes from original plan
18→
19→- **No transport abstraction**: agent goes ZMQ-only. The bridge IS
20→ the stdio compatibility layer for non-OCaml agents. `Loop.config`
21→ is already transport-agnostic via callbacks — no new abstraction
22→ needed in the agent binary.
23→- **Fabric binds both sides; agent always connects**: no
24→ close-and-rebind. For agent output routes, fabric binds PULL at
25→ the agent's endpoint; agent connects PUSH to it. For agent input
26→ routes, fabric binds PUSH (as today); agent connects PULL. Fabric
27→ skips creating agent-side sockets entirely for ZMQ-native
28→ subprocesses.
29→- **Thread→fibre migration is simultaneous**: no hybrid thread/fibre
30→ state. All agent I/O becomes Eio-cooperative with ZMQ; OCaml
31→ threads become daemon fibres. `Mutex`/`Condition` become
32→ `Eio.Mutex`/`Eio.Condition`.
33→- **Explicit startup and shutdown protocols**: state machines, not
34→ assumptions about ZMQ auto-reconnect timing.
35→- **ZMQ handler in a separate module**: not in `bridge.ml` (which is
36→ a natural transformation between representations; the ZMQ handler
37→ is not a bridge).
38→- **PUSH/PULL for tool dispatch** (not REQ/REP): no strict
39→ alternation constraint, supports future parallel tool calls.
40→- **Naming**: `PLUMB_ENDPOINT_*` (matching implementation plan D8).
41→ Presence-based detection (ZMQ mode if `PLUMB_ENDPOINT_INPUT` is
42→ set; no separate `PLUMB_TRANSPORT` selector).
43→
44→## Architecture
45→
46→### Current (bridge)
47→
48→```
49→Fabric Agent process
50→ PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop
51→ PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop
52→```
53→
54→### After (ZMQ direct)
55→
56→```
57→Fabric Agent process
58→ PUSH ─bind──(ZMQ)──connect─ PULL → loop (Eio fibre)
59→ PULL ─bind──(ZMQ)──connect─ PUSH ← loop (Eio fibre)
60→```
61→
62→Fabric binds all sockets. Agent connects. No pipes, no envelopes,
63→no demux thread.
64→
65→## Design decisions
66→
67→### D1: Agent goes ZMQ-only
68→
69→The agent binary (`bin/agent/main.ml`) is rewritten to use ZMQ
70→sockets directly. No transport abstraction record — the bridge
71→remains as the stdio path for non-OCaml agents and nested plumbs.
72→`Loop.config` callbacks (`dispatch_tool`, `emit_telemetry_line`,
73→`get_model`, etc.) are already transport-agnostic; the main.ml
74→changes are confined to I/O setup and the main loop's read/write
75→calls.
76→
77→### D2: Endpoint discovery
78→
79→The fabric sets per-port env vars before spawning:
80→
81→```
82→PLUMB_ENDPOINT_INPUT=ipc:///tmp/plumb-{pid}/{proc}/data_in
83→PLUMB_ENDPOINT_OUTPUT=ipc:///tmp/plumb-{pid}/{proc}/data_out
84→PLUMB_ENDPOINT_CTRL_IN=ipc:///tmp/plumb-{pid}/{proc}/ctrl_in
85→PLUMB_ENDPOINT_CTRL_OUT=ipc:///tmp/plumb-{pid}/{proc}/ctrl_out
86→PLUMB_ENDPOINT_TELEMETRY=ipc:///tmp/plumb-{pid}/{proc}/telemetry
87→PLUMB_ENDPOINT_TOOL_REQ=ipc:///tmp/plumb-{pid}/{proc}/tool_req
88→PLUMB_ENDPOINT_TOOL_RESP=ipc:///tmp/plumb-{pid}/{proc}/tool_resp
89→```
90→
91→Absent var = port not wired. ZMQ mode detected by presence of
92→`PLUMB_ENDPOINT_INPUT`. If absent, the existing stdio path runs
93→(for backward compatibility during transition).
94→
95→### D3: Socket ownership — fabric binds, agent connects
96→
97→The fabric creates sockets differently for subprocess vs inline:
98→
99→**Inline morphisms** (as today): fabric creates both PUSH (bind)
100→and PULL (connect) per route.
101→
102→**ZMQ-native subprocesses**: fabric creates only the non-agent-side
103→socket. For routes where data flows TO the agent, fabric binds
104→PUSH at the upstream endpoint. For routes where data flows FROM
105→the agent, fabric binds PULL at the agent's endpoint. Agent
106→connects complementary sockets to these endpoints.
107→
108→This requires the fabric to distinguish subprocess from inline
109→processes before socket creation (available from `proc_kind` on
110→`process_entry`).
111→
112→### D4: Threads → Eio fibres
113→
114→| Current | After |
115→|---------|-------|
116→| `start_stdin_demux` (Thread) | Eliminated — one socket per port |
117→| `control_reader_thread` (Thread) | Daemon fibre, `T.recv` on ctrl PULL |
118→| `Mutex.t` on control_state | `Eio.Mutex.t` |
119→| `Condition.t` for pause/memory | `Eio.Condition.t` |
120→| `stdout_mutex` for mux_write | Eliminated — one socket per port |
121→
122→**Critical invariant** (theory-critic): all I/O in the agent must
123→go through Eio-aware primitives or `Eio_unix.run_in_systhread`.
124→Do not hold `Eio.Mutex` across ZMQ recv calls (engineer deadlock
125→finding).
126→
127→### D5: Startup synchronisation
128→
129→After binding/connecting all sockets, the agent sends a `ready`
130→message on its ctrl_out PUSH socket. The fabric's ZMQ handler
131→waits for this before sending data. This eliminates the
132→slow-joiner race.
133→
134→### D6: Shutdown protocol
135→
136→1. Fabric sends EOF on data_in (empty payload sentinel)
137→2. Agent `recv_data` raises `End_of_file`, enters drain phase
138→3. Agent processes any pending control messages (pause/memory)
139→4. Fabric sends EOF on ctrl_in (or `{"stop":true}` + EOF)
140→5. Agent sends EOF on all output sockets (data_out, ctrl_out,
141→ telemetry, tool_req)
142→6. Agent closes sockets, destroys context, exits
143→7. Fabric detects EOF on its PULL sockets and reaps child
144→
145→Per-socket EOF timeouts in the fabric — do not assume all EOFs
146→arrive atomically.
147→
148→### D7: Tool dispatch stays in fabric
149→
150→Agent PUSH → fabric PULL → dispatch thread → fabric PUSH → agent
151→PULL. Pipe adapters (ZMQ↔pipe) bridge to the existing
152→`tool_dispatch_loop`. Follow-on ticket to unify the interface.
153→
154→### D8: Bridge retained
155→
156→The bridge remains for:
157→- Nested plumb subprocesses (which speak stdio)
158→- Non-OCaml agent binaries
159→- Transition period (fallback if ZMQ path has issues)
160→
161→### D9: Back-pressure
162→
163→Set `ZMQ_SNDHWM = 0` (unlimited) on all agent sockets — matching
164→the existing pipe semantics (pipes block on full buffer, never
165→drop). Document this choice. A future step can add explicit
166→back-pressure policy.
167→
168→## Changes
169→
170→### 1. Fabric socket creation for subprocesses
171→
172→**`lib/fabric/fabric.ml`** — modify socket creation loop:
173→
174→For `Subprocess` processes, don't create agent-side sockets.
175→Instead, compute endpoint strings. For routes where agent is DST:
176→agent needs to connect its PULL to the upstream PUSH endpoint. For
177→routes where agent is SRC: fabric binds PULL at the agent's
178→endpoint; agent connects PUSH.
179→
180→Extend `process_handler` callback to pass endpoint strings:
181→
182→```ocaml
183→?process_handler:(
184→ sw:Eio.Switch.t ->
185→ proc_entry:process_entry ->
186→ push_sockets:(string * [`Push] T.socket) list ->
187→ pull_sockets:(string * [`Pull] T.socket) list ->
188→ endpoints:(string * string) list -> (* port_name, endpoint_uri *)
189→ unit)
190→```
191→
192→`push_sockets`/`pull_sockets` contain only the fabric-side sockets
193→(for the bridge path). `endpoints` contains all port→URI mappings
194→(for the ZMQ-native path).
195→
196→### 2. ZMQ process handler — `lib/fabric/zmq_agent.ml` (new)
197→
198→Separate module (not in bridge.ml). `make_handler` function:
199→
200→1. Synthesise agent spec (reuse from bridge.ml)
201→2. Set up tool dispatch: bind PULL at tool_req endpoint, bind PUSH
202→ at tool_resp endpoint, run pipe-adapter fibres to existing
203→ `tool_dispatch_loop`
204→3. Spawn agent with `PLUMB_ENDPOINT_*` env vars + credentials
205→4. Wait for `ready` message on ctrl_out PULL socket
206→5. Fork reaper fibre (waitpid)
207→6. Fork per-socket EOF timeout fibres
208→
209→### 3. Rewrite `bin/agent/main.ml`
210→
211→Major changes:
212→- Remove `stdout_mutex`, `mux_write`, `start_stdin_demux` (~90 lines)
213→- Create ZMQ context + sockets from `PLUMB_ENDPOINT_*` env vars
214→- Replace `input_line data_ic` with `T.recv data_in_sock`; raise
215→ `End_of_file` on empty payload
216→- Replace `mux_write "output"` with `T.send data_out_sock`
217→- Replace `mux_write "telemetry"` with `T.send telemetry_sock`
218→- Replace `mux_write "ctrl_out"` with `T.send ctrl_out_sock`
219→- Replace `mux_write "tool_req"` / `input_line tool_ic` with
220→ `T.send tool_req_sock` / `T.recv tool_resp_sock`
221→- Control reader: daemon fibre doing `T.recv ctrl_in_sock`
222→- `control_state`: `Eio.Mutex.t` + `Eio.Condition.t`
223→- Send `ready` on ctrl_out after socket setup
224→- Shutdown: send EOF on all output sockets
225→
226→**Fallback**: if `PLUMB_ENDPOINT_INPUT` is not set, keep existing
227→stdio path (for running agent binary standalone or under the bridge
228→during transition).
229→
230→### 4. Spawn lib — env var support
231→
232→**`lib/spawn/spawn_lib.ml`**:
233→- Add `PLUMB_ENDPOINT_*` vars to `safe_env_keys`
234→- Helper to compute env var list from port→endpoint mapping
235→
236→### 5. Dune dependency
237→
238→**`bin/agent/dune`**: add `transport` to libraries.
239→
240→### 6. Tests (13 minimum)
241→
242→**`test/test_fabric.ml`** — new tests:
243→1. `test_agent_identity_zmq` — identity response via ZMQ
244→2. `test_agent_eof_zmq` — EOF propagation via ZMQ
245→3. `test_agent_eof_cascade_zmq` — `empty ; agent ; id` terminates
246→4. `test_agent_telemetry_zmq` — telemetry arrives on correct socket
247→5. `test_agent_tool_dispatch_zmq` — tool req/resp over ZMQ
248→
249→**`test/test_agent_control.ml`** — port key control tests to ZMQ:
250→6. `test_stop_zmq` — stop via control socket
251→7. `test_stdin_eof_zmq` — data EOF triggers drain
252→8. `test_pause_resume_zmq` — pause/get/set/resume flow
253→9. `test_drain_zmq` — drain completes protocol
254→10. `test_set_memory_malformed_zmq` — rejects malformed memory
255→11. `test_basic_response_zmq` — basic LLM response
256→
257→**Error path tests**:
258→12. `test_missing_endpoint_env` — clear error on missing env var
259→13. `test_startup_ready_signal` — fabric waits for ready before send
260→
261→Extract test helpers into a shared module parameterised by transport
262→backend to avoid duplicating test infrastructure.
263→
264→### 7. Documentation
265→
266→**`doc/agent-runtime.md`**:
267→- Add `PLUMB_ENDPOINT_*` env var table
268→- Update control channel section (fibre, not thread; socket, not fd)
269→- Note presence-based ZMQ detection
270→
271→**`doc/processes.md`**:
272→- Annotate "Multiplexed I/O" section as stdio transport path
273→- Note that ZMQ-native agents use per-port sockets
274→
275→**`doc/architecture.md`**:
276→- Add `lib/fabric/zmq_agent.ml` to module table
277→- Update dependency diagram: `bin/agent` → `transport`
278→
279→## Files to modify
280→
281→| File | Change |
282→|------|--------|
283→| `lib/fabric/fabric.ml` | Subprocess socket creation; extend callback |
284→| `lib/fabric/zmq_agent.ml` | New — ZMQ process handler |
285→| `bin/agent/main.ml` | ZMQ rewrite; threads→fibres |
286→| `bin/agent/dune` | Add `transport` dependency |
287→| `lib/spawn/spawn_lib.ml` | Endpoint env var helpers |
288→| `test/test_fabric.ml` | 5 new ZMQ agent tests |
289→| `test/test_agent_control.ml` | 6 control tests ported to ZMQ |
290→| `test/test_zmq_errors.ml` | 2 error path tests |
291→| `doc/agent-runtime.md` | Env vars, control channel |
292→| `doc/processes.md` | Annotate Multiplexed I/O |
293→| `doc/architecture.md` | Module table, dependency graph |
294→
295→## Implementation order
296→
297→1. **Fabric socket bifurcation** — modify fabric.ml to skip
298→ agent-side sockets for subprocess processes; extend callback
299→ with endpoints. Update bridge's `make_handler` to work with
300→ the new callback signature. `dune build && dune test` — all
301→ existing tests pass.
302→
303→2. **ZMQ handler** — `zmq_agent.ml`: spec synthesis, tool dispatch
304→ wiring, endpoint env vars, agent spawn, ready handshake.
305→ `dune build`.
306→
307→3. **Agent binary rewrite** — main.ml: ZMQ sockets, daemon fibres,
308→ Eio.Mutex/Condition, ready signal, shutdown protocol.
309→ `dune build`.
310→
311→4. **Integration + tests** — wire ZMQ handler into fabric, run all
312→ 13 tests. `dune build && dune test`.
313→
314→5. **Documentation** — agent-runtime.md, processes.md,
315→ architecture.md.
316→
317→## Verification
318→
319→1. After step 1: `dune build && dune test` — all existing bridge
320→ tests pass with extended callback
321→2. After step 3: `dune build` — agent binary compiles with ZMQ
322→3. After step 4: full test suite; all 13 new tests + existing tests
323→4. EOF cascade: `empty ; agent ; id` terminates cleanly (test 3)
324→5. Control protocol: pause/resume/memory works over ZMQ (tests 8-10)
325→6. Tool dispatch: req/resp round-trips over ZMQ (test 5)
326→7. Update PLMB ticket and engineering state in LDC
327→{
"prompt": "Re-review the revised plan for PLMB-037 Step 5c. The plan was substantially revised based on your first review and four other reviewers' feedback.\n\nKey changes from the original:\n1. Dropped the transport abstraction — agent goes ZMQ-only\n2. Fabric binds both sides (PUSH for agent inputs, PULL for agent outputs), agent always connects\n3. Thread→fibre migration simultaneous with ZMQ\n4. Startup handshake (agent sends `ready` on ctrl_out)\n5. Explicit shutdown protocol with per-socket EOF timeouts\n6. ZMQ handler in separate module (`zmq_agent.ml`), not in bridge.ml\n7. PUSH/PULL for tools (not REQ/REP)\n\nHere is the full revised plan:\n\n[Read the plan file at /Users/ww/.claude/plans/abundant-cuddling-oasis.md]\n\nPlease check:\n1. Are there remaining algebraic/categorical concerns?\n2. Does the \"fabric binds PULL at agent output endpoints\" approach compose correctly? The theory-critic recommended this in the first review.\n3. Is the startup `ready` handshake well-defined? What does the message look like? Is it a JSON object on the ctrl_out socket?\n4. The shutdown protocol (D6) — is the ordering sound? Any remaining gaps?\n5. D9 (back-pressure with ZMQ_SNDHWM=0) — is unlimited buffering correct given pipes had finite buffers?",
"subagent_type": "theory-critic",
"run_in_background": true
}Async agent launched successfully. agentId: a0d951e18442017c3 (internal ID - do not mention to user. Use to resume later if needed.) The agent is working in the background. You will be notified automatically when it completes. Do not duplicate this agent's work — avoid working with the same files or topics it is using. Work on non-overlapping tasks, or briefly tell the user what you launched and end your response. output_file: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a0d951e18442017c3.output If asked, you can check progress before completion by using Read or Bash tail on the output file.
{
"prompt": "Re-review the revised plan for PLMB-037 Step 5c. The plan was substantially revised based on your first review and four other reviewers' feedback.\n\nKey changes from your recommendations:\n1. Dropped the transport abstraction — agent ZMQ-only (your recommendation)\n2. Fabric skips agent-side socket creation (your recommendation)\n3. Thread→fibre simultaneous (your recommendation)\n4. ZMQ handler in separate `zmq_agent.ml` (your recommendation)\n5. Added startup handshake and shutdown protocol\n6. 13 tests instead of 2\n\nRead the full revised plan at /Users/ww/.claude/plans/abundant-cuddling-oasis.md\n\nPlease check:\n1. Does the revised plan address your original concerns?\n2. The fabric now binds PULL at agent output endpoints and PUSH at agent input endpoints. Agent connects. Does this work with the existing fabric socket creation loop (fabric.ml lines 100-150)?\n3. The `endpoints` parameter on the process_handler callback — is this the right contract?\n4. The fallback in main.ml (stdio if PLUMB_ENDPOINT_INPUT absent) — you recommended against a two-backend abstraction. Is a conditional branch in main() acceptable, or should it be agent-ZMQ-only with no fallback?\n5. Any remaining boundary or layering concerns?",
"subagent_type": "architect",
"run_in_background": true
}Async agent launched successfully. agentId: a943bcc69c5ad401a (internal ID - do not mention to user. Use to resume later if needed.) The agent is working in the background. You will be notified automatically when it completes. Do not duplicate this agent's work — avoid working with the same files or topics it is using. Work on non-overlapping tasks, or briefly tell the user what you launched and end your response. output_file: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a943bcc69c5ad401a.output If asked, you can check progress before completion by using Read or Bash tail on the output file.
{
"prompt": "Re-review the revised plan for PLMB-037 Step 5c. The plan was substantially revised based on your first review and four other reviewers' feedback.\n\nKey changes addressing your concerns:\n1. No close-and-rebind — fabric binds both sides, agent always connects\n2. Startup handshake — agent sends `ready` on ctrl_out; fabric waits before sending data\n3. Explicit shutdown protocol (D6) with per-socket EOF timeouts\n4. Eio.Mutex not held across ZMQ recv (your deadlock finding)\n5. ZMQ_SNDHWM=0 (unlimited buffering, matching pipe semantics)\n6. 13 tests including error paths\n\nRead the full revised plan at /Users/ww/.claude/plans/abundant-cuddling-oasis.md\n\nPlease check:\n1. Does the startup handshake eliminate the slow-joiner race? The agent sends `ready` on ctrl_out PUSH (connected to fabric's ctrl_out PULL). Is this sufficient?\n2. The shutdown protocol D6 — does it handle: (a) agent crash mid-conversation, (b) fabric crash while agent is running, (c) tool dispatch in-flight during shutdown?\n3. The fabric binds PULL at agent output endpoints. This reverses the normal PUSH-binds pattern. Are there ZMQ gotchas with PULL-bind + PUSH-connect on ipc://?\n4. Thread safety: all agent sockets are in one Eio domain. Control reader is a daemon fibre. Is there a risk of concurrent access to the same socket from multiple fibres?\n5. The pipe adapter pattern for tool dispatch — any remaining concerns?\n6. Any new bugs introduced by the revised approach?",
"subagent_type": "engineer",
"run_in_background": true
}Async agent launched successfully. agentId: a63b6d0989932da9c (internal ID - do not mention to user. Use to resume later if needed.) The agent is working in the background. You will be notified automatically when it completes. Do not duplicate this agent's work — avoid working with the same files or topics it is using. Work on non-overlapping tasks, or briefly tell the user what you launched and end your response. output_file: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a63b6d0989932da9c.output If asked, you can check progress before completion by using Read or Bash tail on the output file.
{
"prompt": "Re-review the revised plan for PLMB-037 Step 5c. The plan was substantially revised based on your first review and four other reviewers' feedback.\n\nKey changes addressing your concerns:\n1. Added 3 doc files to the change list (agent-runtime.md, processes.md, architecture.md)\n2. Naming now uses PLUMB_ENDPOINT_* (matching D8)\n3. Presence-based detection (no separate PLUMB_TRANSPORT selector)\n4. ZMQ handler in separate zmq_agent.ml module\n\nRead the full revised plan at /Users/ww/.claude/plans/abundant-cuddling-oasis.md\n\nPlease check:\n1. Are all documentation updates accounted for?\n2. The env var naming `PLUMB_ENDPOINT_INPUT` vs D8's `PLUMB_ENDPOINT_INPUT` — are these consistent now?\n3. The \"ready\" handshake and shutdown protocol (D5, D6) — do these need documentation beyond agent-runtime.md?\n4. Any remaining doc gaps?",
"subagent_type": "tech-writer",
"run_in_background": true
}Async agent launched successfully. agentId: ab18a2104f62aad79 (internal ID - do not mention to user. Use to resume later if needed.) The agent is working in the background. You will be notified automatically when it completes. Do not duplicate this agent's work — avoid working with the same files or topics it is using. Work on non-overlapping tasks, or briefly tell the user what you launched and end your response. output_file: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/ab18a2104f62aad79.output If asked, you can check progress before completion by using Read or Bash tail on the output file.
{
"prompt": "Re-review the revised plan for PLMB-037 Step 5c. The plan was substantially revised based on your first review and four other reviewers' feedback.\n\nKey changes addressing your concerns:\n1. 13 tests instead of 2, covering control protocol, tool dispatch, EOF cascade, telemetry, error paths\n2. Control tests ported from test_agent_control.ml to ZMQ\n3. Shared test helpers parameterised by transport backend\n4. Error path tests (missing env var, startup ready signal)\n5. EOF cascade test (empty ; agent ; id)\n\nRead the full revised plan at /Users/ww/.claude/plans/abundant-cuddling-oasis.md\n\nPlease check:\n1. Is 13 tests sufficient now? Any remaining gaps?\n2. The test for startup ready signal (test 13) — what should this actually verify?\n3. The shared test helper approach — is parameterisation by transport backend the right strategy, or should ZMQ tests be independent?\n4. The plan says \"extract test helpers into a shared module\" — which helpers specifically? From which files?\n5. Any concerns about test isolation (ZMQ sockets leaking between tests, port conflicts)?\n6. The fallback in main.ml (stdio if PLUMB_ENDPOINT_INPUT absent) — should this fallback be tested?",
"subagent_type": "test-engineer",
"run_in_background": true
}Async agent launched successfully. agentId: a6c3865c3cc0095c8 (internal ID - do not mention to user. Use to resume later if needed.) The agent is working in the background. You will be notified automatically when it completes. Do not duplicate this agent's work — avoid working with the same files or topics it is using. Work on non-overlapping tasks, or briefly tell the user what you launched and end your response. output_file: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a6c3865c3cc0095c8.output If asked, you can check progress before completion by using Read or Bash tail on the output file.
The revised plan addressed all five of my original concerns. The test count is solid, the module boundaries are clean, the fibre migration is properly simultaneous. Four of your five questions have straightforward answers. The fifth -- socket topology for agent outputs -- is where I found a real problem.
The plan states "fabric binds both sides; agent always connects" and specifically "for agent output routes, fabric binds PULL at the agent's endpoint; agent connects PUSH to it." This creates a problem.
Consider a route src=agent, src_port=out, dst=downstream, dst_port=in. Today, fabric creates PUSH at endpoint(agent, out) and PULL at endpoint(downstream, in), then the PULL connects to the PUSH. Data flows directly.
Under the plan: fabric binds PULL at endpoint(agent, out). Agent connects PUSH to it. Great, data arrives at the fabric's PULL socket. But now what? The downstream process reads from endpoint(downstream, in) -- that's a different endpoint. The data is stuck at the fabric's PULL socket with no path to the downstream consumer. You'd need a relay fibre to forward it, which re-introduces a bridge-like intermediary.
The fix is simple: adopt writer-binds, reader-connects. For agent input endpoints, fabric PUSH binds (same as today), agent PULL connects. For agent output endpoints, agent PUSH binds, fabric/downstream PULL connects. The fabric skips creating any socket at the agent's output endpoint -- it just creates the downstream PULL and connects it to the agent's endpoint URI (which the agent binds). The env vars tell the agent where to bind.
This is actually the natural ZMQ convention and preserves the zero-relay direct-path property. Update D3 and Section 1 accordingly.
endpoints callback parameter (Question 3)Acceptable. The bridge ignores endpoints; the ZMQ handler ignores push_sockets/pull_sockets (receives empty lists). This avoids handler-type proliferation. A variant type or two callback slots would be worse. Just be explicit that for ZMQ-native agents, the socket lists are [].
A conditional branch at the top of main() is fine. It is not the transport abstraction record I warned against -- it's a one-time startup dispatch. The two paths must not share mutable state (Eio.Mutex vs Mutex). Add a comment and PLMB ticket for eventually removing the stdio path from the agent binary once only non-OCaml agents use the bridge.
Shutdown D6 step 4: "Fabric sends EOF on ctrl_in (or {"stop":true} + EOF)" -- pick one, don't leave "or" for the implementer. Send stop then EOF, matching the bridge's current behaviour (bridge.ml line 274-282).
ZMQ_SNDHWM = 0: Noted as a known tradeoff. Add a follow-on note about telemetry reporting queue depth -- pipes have 64KB natural back-pressure, ZMQ with HWM=0 has none, and operators need visibility into pressure build-up.
The full review is at /Users/ww/.claude/plans/abundant-cuddling-oasis-agent-a943bcc69c5ad401a.md.
Architect re-review caught a critical topology bug: fabric binding PULL at agent output endpoint creates a dead end — data arrives at the fabric's PULL with no path to downstream. Fix: revert to writer-binds/reader-connects. Agent PUSH binds at its output endpoint; downstream PULL connects to it. Fabric skips creating agent-side sockets entirely.
This aligns with what the theory-critic originally meant but I implemented incorrectly in the plan. Waiting for the other four.