Claude Code transcript - page 3/3

a0d951e18442017c3 toolu_01LbjN3GNjkXtho5WRk225CV /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a0d951e18442017c3.output completed

Agent "Theory re-review" completed

Here is my review of the revised PLMB-037 Step 5c plan.


Overall Verdict

The revised plan is substantially improved. The major categorical concerns from the first review have been addressed correctly: the transport abstraction was dropped (right -- Loop.config is already transport-agnostic), fabric-binds-both-sides eliminates the close-and-rebind problem, and the bridge's role as a natural transformation between representations is cleanly preserved while zmq_agent.ml eliminates the transformation entirely for native agents. Five specific issues remain, two significant.


1. Remaining Algebraic/Categorical Concerns

None of substance. The natural transformation (bridge) is preserved for stdio agents, and the ZMQ handler is correctly understood as a direct embedding rather than a bridge. PUSH/PULL for tool dispatch is right -- session discipline belongs in the application protocol, not the socket type. The single-writer/single-reader invariant per channel is maintained.


2. Fabric-Binds-PULL at Agent Output: Underspecified Forwarding Path

This is the most significant gap. Consider a route (agent, "out") -> (filter, "in"). Currently: - Fabric creates PUSH at agent/out (bind), PULL at filter/in (connect to agent/out) - The inline filter reads from its PULL socket

Under the new plan, the fabric skips creating the agent-side PUSH. But the plan says "fabric binds PULL at the agent's endpoint." How does data reach the downstream filter? Three options:

(a) The PULL bound at the agent's output endpoint IS the downstream morphism's input socket -- same socket object, discovered via find_pull. This is the cleanest: no forwarding hop, no extra fibre.

(b) A forwarding fibre reads from the fabric's PULL and writes to the downstream morphism's PUSH/PULL pair. Adds latency, changes nothing about inline wiring.

(c) The agent's PUSH connects directly to the downstream morphism's PULL endpoint. The agent learns the neighbour's name via env var. Efficient but leaks topology into the agent.

Recommendation: Option (a). State explicitly that the PULL socket bound at the agent's output endpoint is the same socket used by the downstream consumer. No forwarding fibre needed.


3. Startup ready Handshake

Three issues:

Wire format unspecified. The plan says "sends a ready message" without specifying the bytes on the wire. Since ctrl_out carries JSON objects (memory dumps, acknowledgements), the ready signal should be a JSON object: {"status":"ready"}. A bare string would violate the JSON Lines protocol. This is forward-compatible -- you can add version, capabilities, etc. later.

Crash before ready causes deadlock. If the agent exits before sending ready, the fabric's PULL recv blocks forever. The plan lists "fork reaper fibre (waitpid)" as step 5 in the ZMQ handler construction -- but the ready-wait is step 4. The reaper must be forked BEFORE waiting for ready. If the agent crashes during startup, the reaper detects the exit and cancels the ready-wait (via promise failure or switch cancellation). Add a timeout (30s) as a safety net.

Ready signal must be consumed, not forwarded. After the handshake, ctrl_out carries operational messages. The handshake logic must consume the {"status":"ready"} message and not forward it to the ctrl_out PULL socket's normal consumer (e.g. a downstream filter(kind = "memory")). Note this explicitly.


4. Shutdown Protocol (D6)

Ordering is mostly sound. The sequence matches the existing drain-phase semantics. The invariant -- data EOF triggers drain, stop terminates drain -- is preserved.

Gap: who sends stop on ctrl_in? In the bridge, the data mux fibre sends {"stop":true} on ctrl_in after data EOF (bridge.ml lines 274-282). In the ZMQ-native case there is no bridge. The zmq_agent.ml handler must do this. The plan should specify: after the fabric detects that the upstream data_in PUSH has sent EOF, the ZMQ handler sends {"stop":true} on ctrl_in followed by EOF. This must be sequenced AFTER the data EOF reaches the agent.

Missing hard timeout. If the agent is stuck in an LLM API call when EOF arrives, it cannot enter the drain loop until the call returns. The plan should specify a hard backstop: SIGTERM after N seconds of no EOF on output sockets, SIGKILL after N+5.


5. Back-pressure: ZMQ_SNDHWM=0

The plan's claim is factually wrong. It says ZMQ_SNDHWM = 0 matches "the existing pipe semantics (pipes block on full buffer, never drop)." This is incorrect:

  • Unix pipes have a finite buffer (64KB Linux, ~16 pages macOS). When full, write() blocks. This is natural back-pressure.
  • ZMQ_SNDHWM = 0 means unlimited userspace buffering. A fast producer enqueues without bound, consuming unbounded memory. It never blocks.

These are different semantics. Pipes give "never drop, block when full." ZMQ_SNDHWM=0 gives "never drop, never block, unbounded memory."

Is this a problem in practice? No, not for LLM workloads. Messages are large (KB), infrequent (seconds apart), and the API call is the bottleneck. Unbounded buffering of a handful of messages is fine.

Recommendation: Keep ZMQ_SNDHWM = 0 for now, but: 1. Do not claim it matches pipe semantics. It does not. 2. Document it as a deliberate simplification for LLM-speed workloads. 3. Note that high-throughput pipelines will need application-level flow control.


Summary

# Severity Issue
1 Significant Forwarding path from agent output to downstream consumer is underspecified -- clarify socket reuse
2 Significant Reaper fibre must start BEFORE ready-wait, not after (deadlock on agent crash)
3 Medium ready message wire format unspecified -- use {"status":"ready"}
4 Medium Back-pressure claim is factually incorrect -- document the actual tradeoff
5 Medium Who sends stop on ctrl_in in the ZMQ-native path? Specify the sequencing
6 Low Ready-wait needs timeout + reaper-cancellation
7 Low Ready signal consumed by handshake, not forwarded to ctrl_out consumer

None of these are architectural. The overall design is sound. Items 1 and 2 are specification gaps that could cause implementation confusion or deadlock. The rest are documentation precision.

The full review is at /Users/ww/.claude/plans/abundant-cuddling-oasis-agent-a0d951e18442017c3.md. 5211014151037 Full transcript available at: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a0d951e18442017c3.output

Thinking

Theory re-review confirms the same topology issue as architect. Both agree: writer-binds, reader-connects. Agent PUSH binds; downstream PULL connects directly. No intermediate socket.

Additional findings: reaper fibre must start BEFORE ready-wait (deadlock on agent crash), ready message needs wire format spec ({"status":"ready"}), back-pressure claim is factually wrong (HWM=0 ≠ pipe semantics).

Waiting for engineer, tech-writer, test-engineer.

ab18a2104f62aad79 toolu_01193Q4RcPbAcaV9D6mbF2ED /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/ab18a2104f62aad79.output completed

Agent "Tech writer re-review" completed

Here is the full review.


Review: PLMB-037 Step 5c documentation coverage

Q1: Are all documentation updates accounted for?

The three files listed are the right targets, but the scope within them is underspecified in two cases and there are two unlisted files that warrant checking.

doc/agent-runtime.md — correct target, insufficient scope.

The plan says "update control channel section (fibre, not thread; socket, not fd)". That is necessary but four more areas in the same file become false after Step 5c:

  • Telemetry section (lines 38–40): describes telemetry as __port: "telemetry" envelopes on stdout. ZMQ-native agents send telemetry on a dedicated socket — no envelopes, no stdout.
  • Streaming section (line 231): "the process interface remains synchronous JSON Lines on stdin/stdout." ZMQ-native agents have no stdin/stdout at all.
  • Ready handshake (D5): not mentioned in the doc update spec. This is an externally observable contract — any non-OCaml ZMQ-native agent needs to know it must send ready before data arrives.
  • Shutdown protocol (D6): likewise absent. The 7-step sequence is a contract, not an implementation detail.
  • Back-pressure policy (D9): the plan says "Document this choice" in D9 but the doc update step does not name a location or author the text.

doc/processes.md — the Multiplexed I/O annotation is correct but the Port names table (lines 306–321) also needs a note. It lists ports decomposed by file descriptor. ZMQ-native agents expose the same logical ports via named sockets. Without a note, the fd framing implies stdio universally.

doc/architecture.md — the two listed changes are correct and sufficient. One adjacent issue: the architecture.md executor section (lines 46–48) describes the old exec.ml two-pass execution model. This was already partially stale before Step 5c and becomes more misleading after it. Worth a follow-on ticket if not addressed here.

doc/executor.md — not listed, probably fine for this PR. The PLMB-034 mux/demux section (lines 36–52) will continue to describe the bridge path correctly. A brief scope note ("this describes the bridge/stdio path") would prevent confusion. Low priority — file a follow-on if not done here.

doc/plumb-agent.1.md — needs a check. Not listed. If the man page documents the agent's stdin/stdout interface it will require qualification. Worth reading before finalising the change list.


Q2: Env var naming — PLUMB_ENDPOINT_*

Internally consistent throughout the revised plan. D2, the documentation step, and the spawn lib section (item 4) all use PLUMB_ENDPOINT_* with the same seven names. No discrepancy found.

One implementation note worth adding to the doc update spec: the env var table in agent-runtime.md should include the detection logic explicitly ("presence of PLUMB_ENDPOINT_INPUT selects ZMQ mode; absence falls back to stdio"), not just the list of vars.


Q3: Ready handshake and shutdown protocol (D5, D6)

doc/agent-runtime.md is the right home — agreed. The problem is that the planned update ("update control channel section") does not include them.

D5 and D6 are externally observable contracts, not internal implementation notes. A future implementer of a non-OCaml ZMQ-native agent binary needs both:

  • D5: send ready on ctrl_out after connecting all sockets; fabric blocks until this arrives.
  • D6: empty payload on data_in signals drain entry; the agent must send EOF on all output sockets before exiting; per-socket timeouts are enforced by the fabric.

These belong in agent-runtime.md with the same level of specificity as the existing control channel protocol table. The plan's doc step should be expanded to include them.


Q4: Remaining doc gaps

Gap: No cohesive ZMQ-native agent reference.

After Step 5c, a developer writing a ZMQ-native agent binary needs: which env vars to read, what socket types to create, whether to bind or connect (always connect), the ready handshake, the shutdown protocol, and the per-socket message format (JSON Lines, no envelopes). The planned updates distribute this across sections of two files with no single gathering point.

Recommendation: add a "ZMQ transport mode" subsection to doc/agent-runtime.md. A socket summary table (name, ZMQ type, direction, message format) plus D5 and D6 as written protocol specs. The existing Multiplexed I/O annotation in processes.md then cross-references it with a forward link.

Gap: Implementation order.

Documentation is step 5 (last). The env var table (D2), ready handshake (D5), and shutdown protocol (D6) should be drafted at step 2 alongside the ZMQ handler, not after. These are design contracts; writing them first reduces drift between what was designed and what gets built.


Verdict

Env var naming: consistent, no issues.

Documentation coverage: correct targets, but the agent-runtime.md scope is underspecified in ways that would leave false content in the file after the PR merges. The ready/shutdown protocol is missing from the doc task entirely. This is the most significant remaining gap.

Required before implementation: 1. Expand doc/agent-runtime.md update scope to cover: telemetry section, Streaming section, ready handshake as a contract, shutdown protocol as a contract, back-pressure policy note. 2. Add the Port names table to doc/processes.md update scope. 3. Add a "ZMQ transport mode" subsection to doc/agent-runtime.md with a cross-reference from processes.md. 4. Check doc/plumb-agent.1.md before finalising the change list.

Recommended: 5. Move D2/D5/D6 documentation to step 2 (draft before implementing, not after). 4477011131329 Full transcript available at: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/ab18a2104f62aad79.output

Tech writer re-review: doc scope for agent-runtime.md is underspecified — needs telemetry, streaming, ready handshake, shutdown protocol, back-pressure as contracts. Recommends a "ZMQ transport mode" subsection. Wants docs drafted at step 2, not step 5.

Waiting for engineer and test-engineer.

a63b6d0989932da9c toolu_013RhbjLUHUzSH9qnqrzVhEM /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a63b6d0989932da9c.output completed

Agent "Engineer re-review" completed

Here is my review of the revised plan, grounded in the actual source code at /Users/ww/src/plumbing/lib/fabric/fabric.ml, bridge.ml, bin/agent/main.ml, and lib/transport/transport_zmq.ml.


Q1: Does the startup handshake eliminate the slow-joiner race?

Yes, it is sufficient — with one caveat worth documenting.

The timing is safe: fabric binds all sockets in Phase 1 of fabric.ml before any subprocess is spawned in Phase 4. The agent connects its ctrl_out PUSH to an already-bound PULL socket. With ipc:// and SNDHWM=0, the ready message is queued in ZMQ's send buffer immediately on zmq_connect return and delivered once the connection is established. The fabric will receive it. There is no window where the message can be dropped.

The slow-joiner problem only bites when the sender tries to send before the binding process has created the socket at all. That cannot happen here because the fabric binds before it spawns. Add a comment to zmq_agent.ml making the ordering invariant explicit — future developers must not change the spawn order.


Q2: Shutdown protocol D6

(a) Agent crash mid-conversation

This is a real bug in the plan. If the agent crashes (SIGSEGV, kill -9, OOM), it never executes D6 steps 5–6, so it never sends EOF sentinels on data_out, ctrl_out, or telemetry. The fabric's recv fibres on those sockets will block until "per-socket EOF timeouts" fire. But the plan does not specify the implementation of those timeouts.

The bridge.ml avoids this by driving the reaper from the demux fibre exit: waitpid_timed is called after the child's stdout closes naturally. In the ZMQ path, no analogous natural event exists.

The correct fix: start a reaper fibre with Eio_unix.run_in_systhread (fun () -> waitpid pid) immediately after spawning the agent. When waitpid returns (for any reason — normal exit or crash), the reaper must call T.force_close on all fabric-side PULL sockets for this agent. This unblocks any stuck recv fibre. The reaper must run concurrently with the EOF protocol, not after it. Eio.Fiber.first is the right pattern to race normal shutdown against crash detection.

(b) Fabric crash while agent is running

Handled. transport_zmq.ml line 45 sets linger=0 on all sockets. When the fabric dies, the agent's next T.recv or T.send will get a Transport_error. The existing bridge does this silently (line 273: Transport.Transport_error()). The ZMQ agent should log at warn level — an unexpected fabric disappearance is operationally significant and should appear in logs.

(c) Tool dispatch in-flight during shutdown

Needs explicit specification in the plan. Data EOF arrives first (D6 step 1); the agent may be mid-tool-call when that happens. The fabric's tool_req PULL and tool_resp PUSH fibres must stay alive past data EOF. They must not be cancelled until the agent sends EOF on tool_req (or the reaper fires). The plan should state this ordering explicitly: tool sockets remain active until ctrl_out EOF or agent death.


Q3: PULL-bind + PUSH-connect on ipc://

No ZMQ gotchas. PUSH/PULL is symmetric — either side can bind. The confusion about "PUSH-binds being normal" applies to PUB/SUB, not PUSH/PULL. The ZMQ Guide explicitly supports both patterns.

The one real constraint — bind must precede connect — is already satisfied because the fabric binds in Phase 1 before spawning the agent in Phase 4. Socket file permissions are a non-issue since the agent subprocess is spawned by the same process (same UID).


Q4: Thread safety

The plan is correct. Looking at recv_raw in transport_zmq.ml lines 141–160: it uses Eio_unix.await_readable on the ZMQ fd, which yields cooperatively to the Eio scheduler rather than blocking a systhread. The control reader daemon fibre and the main loop fibre access different sockets — this is safe.

The critical invariant must be codified as a comment: one fibre per socket, no sharing. If future work needs to cancel a recv, the mechanism is Eio.Fiber.cancel on the owning fibre, not closing the socket from another fibre (which would destroy it for all users and is not safe on a non-owned socket).


Q5: Pipe adapter pattern — remaining concern

One limitation the plan should document: PUSH/PULL for tool dispatch is forward-compatible with parallel tool calls, but it does not currently deliver them. The pipe adapter serialises requests through tool_dispatch_loop, and the agent's dispatch_tool (main.ml lines 610–621) does a synchronous send-then-input_line pattern. Two concurrent tool calls would interleave their responses with no correlation-ID matching.

This is not a blocker — the agent is single-threaded w.r.t. tool calls today. But the plan should reference a follow-on ticket for correlation-ID routing, otherwise "supports future parallel tool calls" is misleading.


Three new bugs introduced by the revised approach

Bug 1: Breaking change to process_handler callback

The plan adds endpoints:(string * string) list to process_handler. The bridge's make_handler at bridge.ml line 104 creates the concrete handler closure — it must accept this new labelled argument even though it ignores it. The call site in fabric.ml line 219 must also be updated. The plan's implementation step 1 says "Update bridge's make_handler" but the specific change (adding ~endpoints:_ to the inner closure) must be explicit. Missing this causes a build failure but the symptom will look like a type error in fabric.ml, not bridge.ml.

Bug 2: SNDHWM=0 is opt-in, not default

transport_zmq.ml make_socket (lines 43–47) does not call set_unlimited_hwm. The function exists but must be called explicitly. The plan mandates SNDHWM=0 in D9 but does not add it to the changes table for either zmq_agent.ml (fabric-side sockets) or main.ml (agent-side sockets). If missed, PUSH sockets will drop messages silently when the receiver is slow — exactly the failure mode the plan intends to prevent. Recommend making set_unlimited_hwm a call inside make_socket itself (changing the default), or adding an explicit line-by-line checklist in the implementation plan.

Bug 3: ctrl_out PULL socket creation path is unspecified

D2 lists PLUMB_ENDPOINT_CTRL_OUT as an endpoint the agent connects its PUSH to. D3 says "fabric binds PULL at the agent's output endpoints." But ctrl_out is a control channel, not a data route. Looking at fabric.ml Phase 1 (lines 112–136): it creates PULL sockets only for routes in routing_table.routes where r.dst = proc_name. If ctrl_out is not a route in the routing table — which is plausible since it is a control side-channel, not a data wire — no PULL socket is created for it by the general loop.

The plan says zmq_agent.ml step 4 "waits for ready message on ctrl_out PULL socket," implying zmq_agent.ml creates this PULL socket itself. But this is never stated explicitly. If zmq_agent.ml must create it, bind it, and pass its endpoint via PLUMB_ENDPOINT_CTRL_OUT, that must be spelled out in the implementation. If ctrl_out is in the routing table as a route, the general loop handles it but then something downstream must receive from it. The plan is ambiguous and the implementation will diverge on this point.


Summary

Issue Severity Resolution needed
Q1: Handshake sufficient OK Add ordering comment
Q2a: Agent crash → fabric blocks Bug Concurrent reaper fibre; force-close on exit
Q2b: Fabric crash handled by linger=0 OK Add warn log
Q2c: Tool in-flight during shutdown Needs spec Explicitly state tool sockets outlive data EOF
Q3: PULL-bind/PUSH-connect on ipc:// OK No action
Q4: One fibre per socket OK Codify as invariant
Q5: Parallel tools — correlation gap Limitation Document; reference follow-on ticket
Bug 1: process_handler callback breakage Bug Explicit in step 1 checklist
Bug 2: SNDHWM=0 not called by default Bug Make default or add to checklist
Bug 3: ctrl_out PULL creation path Needs clarification State explicitly who creates it and how

Three items are blockers before implementation: Q2a (concurrent reaper), Bug 2 (SNDHWM default), and Bug 3 (ctrl_out socket ownership). Address those and the plan is ready. 5496511162368 Full transcript available at: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a63b6d0989932da9c.output

Thinking

Engineer re-review: 3 blockers found — (1) concurrent reaper fibre needed for agent crash (force-close sockets via Eio.Fiber.first), (2) SNDHWM=0 not default in transport_zmq.ml — must be explicit, (3) ctrl_out PULL socket creation path unspecified (zmq_agent.ml creates it).

Waiting for test-engineer, then I'll incorporate all re-review findings.

a6c3865c3cc0095c8 toolu_01E5maSED47PvUueyYyih5CU /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a6c3865c3cc0095c8.output completed

Agent "Test engineer re-review" completed

Here is my review. I will address each of your six questions directly, then raise a few additional findings.


1. Is 13 tests sufficient? Any remaining gaps?

Close, but I would add three more to reach 16:

  • Concurrent data + control delivery. With stdio, the demux thread serialised everything. With per-socket ZMQ, data and control arrive on independent sockets into independent Eio fibres. The plan replaces Mutex/Condition with Eio.Mutex/Eio.Condition (D4). A test that sends a data message and a pause command concurrently verifies the new synchronisation actually works. This is the semantic property most likely to regress.

  • Multi-message ordering. Send 3+ data messages, verify 3+ responses arrive in order. PUSH/PULL preserves ordering, but the Eio fibre read loop, send loop, and EOF handling could break it. A serial-in/serial-out test with >1 message is the simplest way to catch an accidental reordering.

  • IPC socket file cleanup. After agent exit, verify that /tmp/plumb-{pid}/{proc}/ is removed. ZMQ IPC sockets leave files on the filesystem. The plan says "closes sockets, destroys context, exits" (D6 step 6) but nothing tests it. Leaked socket files cause EADDRINUSE on re-run and accumulate in CI.

The existing stdio tests (test_agent_control.ml and test_fabric.ml agent tests) implicitly cover the fallback path -- see question 6.


2. Test 13 (startup ready signal) -- what should it verify?

The observable property is: the fabric sends zero data messages before it receives ready on the ctrl_out socket.

With D9 (ZMQ_SNDHWM = 0, unlimited buffering), the slow-joiner problem is masked -- PUSH with no connected peer buffers indefinitely rather than dropping. Messages sent before ready will eventually arrive. This means you cannot observe the bug from the agent side (it looks like everything works). The ready handshake prevents unbounded pre-connect buffering, not message loss.

This is best tested as a white-box unit test on zmq_agent.ml. The handler should track whether it has received ready before calling T.send on data sockets. The test instruments or wraps the handler to assert send_count_before_ready == 0. Trying to test this as a black-box integration test (timing-based or with artificial delays) will be flaky.


3. Shared test helpers -- parameterisation by transport backend?

No. The stdio and ZMQ paths have fundamentally different setup: fork+execve vs Eio fibres, pipe fds vs ZMQ sockets, read_line_timeout with Unix.select vs T.recv, mux envelopes vs topic framing. A transport-parameterised helper would need to abstract over all of these, producing a miniature framework that obscures what each test actually does.

What should be shared is the stuff that is genuinely transport-independent: parsing plumb specs, finding the agent binary, generating pipeline IDs, resolving fixture paths, and JSON assertion helpers.

The ZMQ tests should have their own with_fabric_zmq_agent function (analogous to the existing with_fabric_agent) that uses the new zmq_agent.ml handler and T.send/T.recv directly.


4. Which helpers specifically, from which files?

Helper Current location Destination
parse_and_compile test_fabric.ml lines 13-27 test_helpers.ml
find_agent_binary test_fabric.ml lines 44-53, test_agent_control.ml lines 13-22 (duplicated) test_helpers.ml
fresh_id test_fabric.ml lines 8-11 test_helpers.ml
fixture test_agent_control.ml lines 24-33 test_helpers.ml

New helpers to write in test_helpers.ml: - assert_json_string / assert_json_int -- the pattern of matching JSON, extracting a field, and checking it appears ~15 times in test_agent_control.ml - assert_ctrl_kind -- parse a ctrl_out response and assert the kind field value

Things that should NOT be shared: spawn_agent, send_control, read_port, mux_write helpers, read_line_timeout -- all stdio-specific. with_fabric, with_fabric_agent -- architecture-specific, stay in their respective test files.


5. Test isolation concerns?

Three things to watch:

IPC path collisions. fresh_id uses pid + counter, which is safe: parallel dune test executables get different PIDs, and Alcotest runs tests sequentially within an executable.

Leftover socket files. If a test crashes, IPC socket files persist and cause EADDRINUSE on re-run. Every test that creates ZMQ sockets should wrap the body in Fun.protect that calls cleanup_fabric_dir. The existing with_fabric does this via Eio.Switch -- maintain this pattern in the new with_fabric_zmq_agent.

Socket linger on test failure. If the agent dies or the test times out, ZMQ sockets in LINGER state can cause 30-second hangs. Set ZMQ_LINGER = 0 on all sockets created in tests. This is test-only -- production can keep the default.


6. Should the stdio fallback be tested?

The fallback is already implicitly tested by the existing suite: all 13 tests in test_agent_control.ml and the 2 agent tests in test_fabric.ml exercise the stdio path. As long as these remain in the suite and pass after the rewrite, the fallback is verified.

The plan should state this explicitly -- "existing stdio tests (test_agent_control.ml, test_fabric.ml agent suite) serve as regression tests for the stdio fallback path" -- so the implementer doesn't accidentally remove or skip them.


Additional findings:

  • Telemetry test (test 4) is too vague as specified. It should verify the telemetry socket receives a {"kind":"config",...} message and that this message does NOT appear on the data output socket. The whole point of per-socket ZMQ is channel separation; test it.

  • Tool dispatch test (test 5) goes through pipe adapters (D7). The test should cover the full chain: agent PUSH to fabric PULL to pipe adapter to tool loop and back. A mock tool that echoes its input would suffice.

  • EOF cascade test (test 3) is well-chosen. Assert both a clean exit code AND that no zombie processes remain (check that waitpid returns for all spawned children).

  • Missing follow-on: What happens when the agent crashes mid-stream (SIGKILL)? With ZMQ_SNDHWM = 0, the fabric's PUSH to a dead peer blocks forever. This is probably out of scope for 5c but should be filed as a follow-on ticket with a specific test scenario.

The full review is at /Users/ww/.claude/plans/abundant-cuddling-oasis-agent-a6c3865c3cc0095c8.md. 5420612157561 Full transcript available at: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a6c3865c3cc0095c8.output

Thinking

All five re-reviews complete. Let me update the plan with the consolidated findings.

📝 Write abundant-cuddling-oasis.md
/Users/ww/.claude/plans/abundant-cuddling-oasis.md
# PLMB-037 Step 5c: Agent binary ZMQ rewrite

## Context

The fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ
sockets, but agent subprocesses still communicate via stdin/stdout
with `__port` JSON envelopes.  A **bridge** module (`bridge.ml`,
~445 lines) translates between the two.

Step 5c eliminates the bridge for agents by having the agent binary
speak ZMQ directly — one socket per logical port, no envelopes.

**Worktree:** create from `plumbing-1` branch

## Review consensus (5 reviewers, 2 rounds)

### Topology

**Writer-binds, reader-connects** (architect + theory-critic).  For
agent output endpoints, the agent PUSH binds; the fabric's
downstream PULL connects directly to it.  For agent input endpoints,
the upstream PUSH binds (as today); the agent PULL connects.  Fabric
skips creating any socket at agent-side endpoints.  No intermediate
sockets, no forwarding fibres, no close-and-rebind.

### Agent binary

**ZMQ-only** — no transport abstraction.  `Loop.config` is already
transport-agnostic via callbacks.  The bridge IS the stdio compat
layer.  A conditional branch in `main()` falls back to stdio if
`PLUMB_ENDPOINT_INPUT` is absent (for standalone/bridge use during
transition).

**Thread→fibre simultaneous** with ZMQ migration.  No hybrid state.
`Mutex`/`Condition` → `Eio.Mutex`/`Eio.Condition`.  Critical
invariant: never hold `Eio.Mutex` across ZMQ recv.  One fibre per
socket, no sharing.

### Protocols

**Startup handshake**: agent sends `{"status":"ready"}` on ctrl_out
after connecting all sockets.  Fabric waits before sending data.
Reaper fibre starts BEFORE ready-wait (theory-critic: crash before
ready → deadlock otherwise).  Ready signal consumed by handshake,
not forwarded to ctrl_out consumer.  30s timeout.

**Shutdown**: (1) fabric sends EOF on data_in; (2) agent enters
drain phase; (3) agent handles pending control; (4) fabric sends
`{"stop":true}` then EOF on ctrl_in; (5) agent sends EOF on all
output sockets; (6) agent closes sockets, destroys context, exits;
(7) fabric detects EOF on PULLs, reaps child.  Per-socket EOF
timeouts.  Tool sockets remain active until ctrl_out EOF or agent
death.

**Crash handling**: concurrent reaper fibre (`waitpid`) races
normal shutdown via `Eio.Fiber.first`.  On agent death, reaper
force-closes all fabric-side sockets for that agent.  Hard backstop:
SIGTERM after N seconds, SIGKILL after N+5.

**Back-pressure**: `ZMQ_SNDHWM = 0` (unlimited buffering) on all
agent sockets — deliberate simplification for LLM-speed workloads.
Not equivalent to pipe semantics (pipes block on full 64KB buffer;
HWM=0 never blocks, unbounded memory).  Follow-on for telemetry
queue depth observability.

### Module boundaries

**ZMQ handler in `zmq_agent.ml`** — not in bridge.ml (architect:
bridge is a natural transformation; ZMQ handler is a direct
embedding).

**PUSH/PULL for tool dispatch** (not REQ/REP) — no strict
alternation; supports future parallel tool calls.  Pipe adapters
to existing `tool_dispatch_loop`.  Follow-on ticket for
correlation-ID routing.

### Naming

`PLUMB_ENDPOINT_*` (matching implementation plan D8).  Presence of
`PLUMB_ENDPOINT_INPUT` selects ZMQ mode.

## Architecture

### Current (bridge)

```
Fabric                                    Agent process
  PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop
  PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop
```

### After (ZMQ direct)

```
Fabric                     Agent process
  PUSH ─bind──(ZMQ)──connect─ PULL → main fibre
  PULL ────(ZMQ)──connect─bind─ PUSH ← main fibre
```

Writer binds, reader connects.  No pipes, no envelopes, no demux
thread.

## Changes

### 1. Fabric socket bifurcation

**`lib/fabric/fabric.ml`** — modify socket creation loop (lines
100–150):

For routes involving `Subprocess` processes (checked via
`proc_kind` on `process_entry`), skip creating the agent-side
socket.  Specifically:

- Route where `src = subprocess`: skip creating PUSH at
  `endpoint(src, src_port)`.  Agent will bind its own PUSH there.
- Route where `dst = subprocess`: skip creating PULL at
  `endpoint(dst, dst_port)`.  Agent will connect its own PULL to
  the upstream PUSH endpoint.

The connect phase also skips these routes (no PULL to connect).

Extend `process_handler` callback:

```ocaml
?process_handler:(
  sw:Eio.Switch.t ->
  proc_entry:process_entry ->
  push_sockets:(string * [`Push] T.socket) list ->
  pull_sockets:(string * [`Pull] T.socket) list ->
  endpoints:(string * string) list ->
  unit)
```

`push_sockets`/`pull_sockets`: fabric-side sockets only (for
bridge path).  `endpoints`: all port→URI mappings (for ZMQ path).
For bridge handlers, `endpoints` is ignored.  For ZMQ handlers,
socket lists are `[]`.

Update bridge's `make_handler` to accept `~endpoints:_`.

### 2. ZMQ process handler — `lib/fabric/zmq_agent.ml` (new)

`make_handler` function:

1. Synthesise agent spec (reuse `Bridge.synthesise_spec` or
   extract shared helper)
2. Create tool dispatch sockets: bind PUSH at tool_resp endpoint,
   bind PULL at tool_req endpoint.  Set `ZMQ_SNDHWM = 0`.
   Run pipe-adapter fibres to `tool_dispatch_loop`.
3. Create ctrl_out PULL socket: bind at ctrl_out endpoint.  This
   socket is created by zmq_agent.ml, not by the general fabric
   loop (ctrl_out may not appear as a route).
4. Spawn agent with `PLUMB_ENDPOINT_*` env vars + credentials
5. Fork reaper fibre (`waitpid`) — BEFORE ready-wait
6. Wait for `{"status":"ready"}` on ctrl_out PULL (30s timeout;
   if agent dies first, reaper cancels the wait)
7. Consume the ready message (do not forward to downstream)
8. Fork per-socket EOF timeout fibres

Use `Eio.Fiber.first` to race normal shutdown vs crash reaper.
On crash: force-close all fabric-side sockets for this agent.
Hard backstop: SIGTERM after 30s of no output EOF, SIGKILL +5s.

### 3. Rewrite `bin/agent/main.ml`

- Remove `stdout_mutex`, `mux_write`, `start_stdin_demux` (~90
  lines)
- Read `PLUMB_ENDPOINT_*` env vars.  If `PLUMB_ENDPOINT_INPUT`
  absent, fall back to existing stdio path (preserves bridge
  compatibility during transition).
- Create ZMQ context.  For each endpoint env var:
  - Input sockets (data_in, ctrl_in, tool_resp): create PULL,
    connect to endpoint
  - Output sockets (data_out, ctrl_out, telemetry, tool_req):
    create PUSH, bind at endpoint
  - Set `ZMQ_SNDHWM = 0` on all sockets
- Send `{"status":"ready"}` on ctrl_out PUSH
- Main loop: `T.recv data_in_sock` instead of `input_line`;
  raise `End_of_file` on empty payload (EOF sentinel)
- Output: `T.send data_out_sock`, `T.send telemetry_sock`, etc.
- Tool dispatch: `T.send tool_req_sock` / `T.recv tool_resp_sock`
- Control reader: daemon fibre doing `T.recv ctrl_in_sock`
- `control_state`: `Eio.Mutex.t` + `Eio.Condition.t`
- Shutdown: send EOF on all output sockets, close context

### 4. Spawn lib — env var support

**`lib/spawn/spawn_lib.ml`**:
- Add `PLUMB_ENDPOINT_*` to `safe_env_keys`
- Helper to compute env var list from port→endpoint mapping

### 5. Dune dependency

**`bin/agent/dune`**: add `transport` to libraries.

### 6. Tests (16)

**`test/test_fabric.ml`** — ZMQ agent tests:
1. `test_agent_identity_zmq` — identity response
2. `test_agent_eof_zmq` — EOF propagation
3. `test_agent_eof_cascade_zmq` — `empty ; agent ; id` terminates
   (clean exit code + no zombies)
4. `test_agent_telemetry_zmq` — `kind:config` on telemetry socket,
   NOT on data socket (channel separation)
5. `test_agent_tool_dispatch_zmq` — full chain through pipe adapter
6. `test_agent_multi_message_zmq` — 3 messages in, 3 responses in
   order
7. `test_agent_concurrent_data_ctrl_zmq` — data + pause arrive
   simultaneously; fibre synchronisation works

**`test/test_agent_control.ml`** — port key tests to ZMQ:
8. `test_stop_zmq`
9. `test_stdin_eof_zmq` — data EOF triggers drain
10. `test_pause_resume_zmq` — pause/get/set/resume flow
11. `test_drain_zmq` — drain completes protocol
12. `test_set_memory_malformed_zmq`
13. `test_basic_response_zmq`

**Error/lifecycle tests**:
14. `test_missing_endpoint_env` — clear error on missing env var
15. `test_startup_ready_signal` — white-box: assert
    `send_count_before_ready == 0`
16. `test_ipc_cleanup` — no socket files remain after agent exit

Test infrastructure:
- New `with_fabric_zmq_agent` helper (NOT parameterised — separate
  from `with_fabric_agent`)
- Extract transport-independent helpers to `test_helpers.ml`:
  `parse_and_compile`, `find_agent_binary`, `fresh_id`, `fixture`,
  JSON assertion helpers
- Set `ZMQ_LINGER = 0` on all test sockets (prevent 30s hangs)
- `Fun.protect` wrapper for IPC cleanup on test failure
- Existing stdio tests serve as regression for fallback path

### 7. Documentation

**`doc/agent-runtime.md`** — draft at step 2 (contracts before
code):
- New "ZMQ transport mode" subsection: socket summary table
  (name, ZMQ type, bind/connect, message format), env var table
  with detection logic, ready handshake contract (D5), shutdown
  protocol contract (D6), back-pressure policy
- Update telemetry section (dedicated socket, no envelopes)
- Update streaming section (no stdin/stdout in ZMQ mode)
- Update control channel section (fibre, not thread; socket,
  not fd)

**`doc/processes.md`**:
- Annotate "Multiplexed I/O" as stdio transport path
- Note Port names table applies to both paths (same logical
  ports, different physical channels)
- Cross-reference agent-runtime.md ZMQ section

**`doc/architecture.md`**:
- Add `lib/fabric/zmq_agent.ml` to module table
- Update dependency diagram: `bin/agent` → `transport`

## Files to modify

| File | Change |
|------|--------|
| `lib/fabric/fabric.ml` | Socket bifurcation; extend callback |
| `lib/fabric/zmq_agent.ml` | New — ZMQ process handler |
| `bin/agent/main.ml` | ZMQ rewrite; threads→fibres |
| `bin/agent/dune` | Add `transport` dependency |
| `lib/spawn/spawn_lib.ml` | Endpoint env var helpers |
| `test/test_fabric.ml` | 7 new ZMQ agent tests |
| `test/test_agent_control.ml` | 6 control tests ported to ZMQ |
| `test/test_zmq_errors.ml` | 3 error/lifecycle tests |
| `test/test_helpers.ml` | New — shared test infrastructure |
| `doc/agent-runtime.md` | ZMQ transport mode subsection |
| `doc/processes.md` | Annotate Multiplexed I/O |
| `doc/architecture.md` | Module table, dependency graph |

## Implementation order

1. **Documentation contracts** — draft ZMQ transport mode subsection
   in agent-runtime.md: env vars, ready handshake, shutdown
   protocol, socket table.  These are design contracts; writing
   them first reduces drift.

2. **Fabric socket bifurcation** — modify fabric.ml to skip
   agent-side sockets for subprocess processes; extend callback
   with endpoints.  Update bridge's `make_handler` to accept new
   arg.  `dune build && dune test` — all existing tests pass.

3. **ZMQ handler** — `zmq_agent.ml`: spec synthesis, tool dispatch
   wiring, endpoint env vars, agent spawn, reaper, ready handshake.
   `dune build`.

4. **Agent binary rewrite** — main.ml: ZMQ sockets, daemon fibres,
   Eio.Mutex/Condition, ready signal, shutdown protocol, stdio
   fallback.  `dune build`.

5. **Integration + tests** — wire ZMQ handler into fabric, extract
   test helpers, run all 16 tests + existing suite.
   `dune build && dune test`.

6. **Documentation finalisation** — update processes.md,
   architecture.md; review agent-runtime.md against implementation.

## Verification

1. After step 2: `dune build && dune test` — all existing bridge
   tests pass with extended callback
2. After step 4: `dune build` — agent binary compiles with ZMQ
3. After step 5: full test suite; all 16 new tests + existing
4. EOF cascade: `empty ; agent ; id` terminates cleanly (test 3)
5. Control protocol: pause/resume/memory over ZMQ (tests 10-12)
6. Tool dispatch: round-trip through pipe adapter (test 5)
7. Channel separation: telemetry on telemetry socket only (test 4)
8. Crash handling: agent death → fabric cleans up (reaper fibre)
9. IPC cleanup: no socket files after exit (test 16)
10. Update PLMB ticket and engineering state in LDC

## Follow-on tickets

- Remove stdio fallback from agent binary (after bridge retirement)
- Correlation-ID routing for parallel tool calls
- Telemetry queue depth observability (HWM=0 monitoring)
- `doc/executor.md` scope note for bridge/stdio path
The file /Users/ww/.claude/plans/abundant-cuddling-oasis.md has been updated successfully.

Plan updated with all re-review findings:

  • Fixed topology: writer-binds, reader-connects. Agent PUSH binds at output endpoints; downstream PULL connects directly. No intermediate sockets.
  • Concurrent reaper: starts before ready-wait, races via Eio.Fiber.first, force-closes on crash.
  • Ready handshake specified: {"status":"ready"} wire format, consumed not forwarded, 30s timeout.
  • Shutdown specified: stop-then-EOF (matching bridge), tool sockets outlive data EOF, hard SIGTERM/SIGKILL backstop.
  • SNDHWM=0 explicit: must be called on all sockets (not default in transport_zmq.ml).
  • ctrl_out PULL: explicitly created by zmq_agent.ml.
  • Back-pressure claim corrected: documented as deliberate simplification, not pipe-equivalent.
  • 16 tests: added concurrent data+control, multi-message ordering, IPC cleanup.
  • Test infrastructure: separate with_fabric_zmq_agent, shared helpers in test_helpers.ml, ZMQ_LINGER=0.
  • Docs drafted at step 1: contracts before code.
  • Follow-on tickets listed.
ExitPlanMode
{
  "allowedPrompts": [
    {
      "tool": "Bash",
      "prompt": "run nix develop build and test commands"
    },
    {
      "tool": "Bash",
      "prompt": "run git commands for worktree creation, commits, and merges"
    }
  ],
  "plan": "# PLMB-037 Step 5c: Agent binary ZMQ rewrite\n\n## Context\n\nThe fabric runtime (Steps 0–5b) runs inline morphisms on ZMQ\nsockets, but agent subprocesses still communicate via stdin/stdout\nwith `__port` JSON envelopes.  A **bridge** module (`bridge.ml`,\n~445 lines) translates between the two.\n\nStep 5c eliminates the bridge for agents by having the agent binary\nspeak ZMQ directly — one socket per logical port, no envelopes.\n\n**Worktree:** create from `plumbing-1` branch\n\n## Review consensus (5 reviewers, 2 rounds)\n\n### Topology\n\n**Writer-binds, reader-connects** (architect + theory-critic).  For\nagent output endpoints, the agent PUSH binds; the fabric's\ndownstream PULL connects directly to it.  For agent input endpoints,\nthe upstream PUSH binds (as today); the agent PULL connects.  Fabric\nskips creating any socket at agent-side endpoints.  No intermediate\nsockets, no forwarding fibres, no close-and-rebind.\n\n### Agent binary\n\n**ZMQ-only** — no transport abstraction.  `Loop.config` is already\ntransport-agnostic via callbacks.  The bridge IS the stdio compat\nlayer.  A conditional branch in `main()` falls back to stdio if\n`PLUMB_ENDPOINT_INPUT` is absent (for standalone/bridge use during\ntransition).\n\n**Thread→fibre simultaneous** with ZMQ migration.  No hybrid state.\n`Mutex`/`Condition` → `Eio.Mutex`/`Eio.Condition`.  Critical\ninvariant: never hold `Eio.Mutex` across ZMQ recv.  One fibre per\nsocket, no sharing.\n\n### Protocols\n\n**Startup handshake**: agent sends `{\"status\":\"ready\"}` on ctrl_out\nafter connecting all sockets.  Fabric waits before sending data.\nReaper fibre starts BEFORE ready-wait (theory-critic: crash before\nready → deadlock otherwise).  Ready signal consumed by handshake,\nnot forwarded to ctrl_out consumer.  30s timeout.\n\n**Shutdown**: (1) fabric sends EOF on data_in; (2) agent enters\ndrain phase; (3) agent handles pending control; (4) fabric sends\n`{\"stop\":true}` then EOF on ctrl_in; (5) agent sends EOF on all\noutput sockets; (6) agent closes sockets, destroys context, exits;\n(7) fabric detects EOF on PULLs, reaps child.  Per-socket EOF\ntimeouts.  Tool sockets remain active until ctrl_out EOF or agent\ndeath.\n\n**Crash handling**: concurrent reaper fibre (`waitpid`) races\nnormal shutdown via `Eio.Fiber.first`.  On agent death, reaper\nforce-closes all fabric-side sockets for that agent.  Hard backstop:\nSIGTERM after N seconds, SIGKILL after N+5.\n\n**Back-pressure**: `ZMQ_SNDHWM = 0` (unlimited buffering) on all\nagent sockets — deliberate simplification for LLM-speed workloads.\nNot equivalent to pipe semantics (pipes block on full 64KB buffer;\nHWM=0 never blocks, unbounded memory).  Follow-on for telemetry\nqueue depth observability.\n\n### Module boundaries\n\n**ZMQ handler in `zmq_agent.ml`** — not in bridge.ml (architect:\nbridge is a natural transformation; ZMQ handler is a direct\nembedding).\n\n**PUSH/PULL for tool dispatch** (not REQ/REP) — no strict\nalternation; supports future parallel tool calls.  Pipe adapters\nto existing `tool_dispatch_loop`.  Follow-on ticket for\ncorrelation-ID routing.\n\n### Naming\n\n`PLUMB_ENDPOINT_*` (matching implementation plan D8).  Presence of\n`PLUMB_ENDPOINT_INPUT` selects ZMQ mode.\n\n## Architecture\n\n### Current (bridge)\n\n```\nFabric                                    Agent process\n  PULL ──(ZMQ)──→ bridge mux ──(pipe)──→ demux → pipes → loop\n  PUSH ←─(ZMQ)──← bridge demux ←─(pipe)──← mux_write ← loop\n```\n\n### After (ZMQ direct)\n\n```\nFabric                     Agent process\n  PUSH ─bind──(ZMQ)──connect─ PULL → main fibre\n  PULL ────(ZMQ)──connect─bind─ PUSH ← main fibre\n```\n\nWriter binds, reader connects.  No pipes, no envelopes, no demux\nthread.\n\n## Changes\n\n### 1. Fabric socket bifurcation\n\n**`lib/fabric/fabric.ml`** — modify socket creation loop (lines\n100–150):\n\nFor routes involving `Subprocess` processes (checked via\n`proc_kind` on `process_entry`), skip creating the agent-side\nsocket.  Specifically:\n\n- Route where `src = subprocess`: skip creating PUSH at\n  `endpoint(src, src_port)`.  Agent will bind its own PUSH there.\n- Route where `dst = subprocess`: skip creating PULL at\n  `endpoint(dst, dst_port)`.  Agent will connect its own PULL to\n  the upstream PUSH endpoint.\n\nThe connect phase also skips these routes (no PULL to connect).\n\nExtend `process_handler` callback:\n\n```ocaml\n?process_handler:(\n  sw:Eio.Switch.t ->\n  proc_entry:process_entry ->\n  push_sockets:(string * [`Push] T.socket) list ->\n  pull_sockets:(string * [`Pull] T.socket) list ->\n  endpoints:(string * string) list ->\n  unit)\n```\n\n`push_sockets`/`pull_sockets`: fabric-side sockets only (for\nbridge path).  `endpoints`: all port→URI mappings (for ZMQ path).\nFor bridge handlers, `endpoints` is ignored.  For ZMQ handlers,\nsocket lists are `[]`.\n\nUpdate bridge's `make_handler` to accept `~endpoints:_`.\n\n### 2. ZMQ process handler — `lib/fabric/zmq_agent.ml` (new)\n\n`make_handler` function:\n\n1. Synthesise agent spec (reuse `Bridge.synthesise_spec` or\n   extract shared helper)\n2. Create tool dispatch sockets: bind PUSH at tool_resp endpoint,\n   bind PULL at tool_req endpoint.  Set `ZMQ_SNDHWM = 0`.\n   Run pipe-adapter fibres to `tool_dispatch_loop`.\n3. Create ctrl_out PULL socket: bind at ctrl_out endpoint.  This\n   socket is created by zmq_agent.ml, not by the general fabric\n   loop (ctrl_out may not appear as a route).\n4. Spawn agent with `PLUMB_ENDPOINT_*` env vars + credentials\n5. Fork reaper fibre (`waitpid`) — BEFORE ready-wait\n6. Wait for `{\"status\":\"ready\"}` on ctrl_out PULL (30s timeout;\n   if agent dies first, reaper cancels the wait)\n7. Consume the ready message (do not forward to downstream)\n8. Fork per-socket EOF timeout fibres\n\nUse `Eio.Fiber.first` to race normal shutdown vs crash reaper.\nOn crash: force-close all fabric-side sockets for this agent.\nHard backstop: SIGTERM after 30s of no output EOF, SIGKILL +5s.\n\n### 3. Rewrite `bin/agent/main.ml`\n\n- Remove `stdout_mutex`, `mux_write`, `start_stdin_demux` (~90\n  lines)\n- Read `PLUMB_ENDPOINT_*` env vars.  If `PLUMB_ENDPOINT_INPUT`\n  absent, fall back to existing stdio path (preserves bridge\n  compatibility during transition).\n- Create ZMQ context.  For each endpoint env var:\n  - Input sockets (data_in, ctrl_in, tool_resp): create PULL,\n    connect to endpoint\n  - Output sockets (data_out, ctrl_out, telemetry, tool_req):\n    create PUSH, bind at endpoint\n  - Set `ZMQ_SNDHWM = 0` on all sockets\n- Send `{\"status\":\"ready\"}` on ctrl_out PUSH\n- Main loop: `T.recv data_in_sock` instead of `input_line`;\n  raise `End_of_file` on empty payload (EOF sentinel)\n- Output: `T.send data_out_sock`, `T.send telemetry_sock`, etc.\n- Tool dispatch: `T.send tool_req_sock` / `T.recv tool_resp_sock`\n- Control reader: daemon fibre doing `T.recv ctrl_in_sock`\n- `control_state`: `Eio.Mutex.t` + `Eio.Condition.t`\n- Shutdown: send EOF on all output sockets, close context\n\n### 4. Spawn lib — env var support\n\n**`lib/spawn/spawn_lib.ml`**:\n- Add `PLUMB_ENDPOINT_*` to `safe_env_keys`\n- Helper to compute env var list from port→endpoint mapping\n\n### 5. Dune dependency\n\n**`bin/agent/dune`**: add `transport` to libraries.\n\n### 6. Tests (16)\n\n**`test/test_fabric.ml`** — ZMQ agent tests:\n1. `test_agent_identity_zmq` — identity response\n2. `test_agent_eof_zmq` — EOF propagation\n3. `test_agent_eof_cascade_zmq` — `empty ; agent ; id` terminates\n   (clean exit code + no zombies)\n4. `test_agent_telemetry_zmq` — `kind:config` on telemetry socket,\n   NOT on data socket (channel separation)\n5. `test_agent_tool_dispatch_zmq` — full chain through pipe adapter\n6. `test_agent_multi_message_zmq` — 3 messages in, 3 responses in\n   order\n7. `test_agent_concurrent_data_ctrl_zmq` — data + pause arrive\n   simultaneously; fibre synchronisation works\n\n**`test/test_agent_control.ml`** — port key tests to ZMQ:\n8. `test_stop_zmq`\n9. `test_stdin_eof_zmq` — data EOF triggers drain\n10. `test_pause_resume_zmq` — pause/get/set/resume flow\n11. `test_drain_zmq` — drain completes protocol\n12. `test_set_memory_malformed_zmq`\n13. `test_basic_response_zmq`\n\n**Error/lifecycle tests**:\n14. `test_missing_endpoint_env` — clear error on missing env var\n15. `test_startup_ready_signal` — white-box: assert\n    `send_count_before_ready == 0`\n16. `test_ipc_cleanup` — no socket files remain after agent exit\n\nTest infrastructure:\n- New `with_fabric_zmq_agent` helper (NOT parameterised — separate\n  from `with_fabric_agent`)\n- Extract transport-independent helpers to `test_helpers.ml`:\n  `parse_and_compile`, `find_agent_binary`, `fresh_id`, `fixture`,\n  JSON assertion helpers\n- Set `ZMQ_LINGER = 0` on all test sockets (prevent 30s hangs)\n- `Fun.protect` wrapper for IPC cleanup on test failure\n- Existing stdio tests serve as regression for fallback path\n\n### 7. Documentation\n\n**`doc/agent-runtime.md`** — draft at step 2 (contracts before\ncode):\n- New \"ZMQ transport mode\" subsection: socket summary table\n  (name, ZMQ type, bind/connect, message format), env var table\n  with detection logic, ready handshake contract (D5), shutdown\n  protocol contract (D6), back-pressure policy\n- Update telemetry section (dedicated socket, no envelopes)\n- Update streaming section (no stdin/stdout in ZMQ mode)\n- Update control channel section (fibre, not thread; socket,\n  not fd)\n\n**`doc/processes.md`**:\n- Annotate \"Multiplexed I/O\" as stdio transport path\n- Note Port names table applies to both paths (same logical\n  ports, different physical channels)\n- Cross-reference agent-runtime.md ZMQ section\n\n**`doc/architecture.md`**:\n- Add `lib/fabric/zmq_agent.ml` to module table\n- Update dependency diagram: `bin/agent` → `transport`\n\n## Files to modify\n\n| File | Change |\n|------|--------|\n| `lib/fabric/fabric.ml` | Socket bifurcation; extend callback |\n| `lib/fabric/zmq_agent.ml` | New — ZMQ process handler |\n| `bin/agent/main.ml` | ZMQ rewrite; threads→fibres |\n| `bin/agent/dune` | Add `transport` dependency |\n| `lib/spawn/spawn_lib.ml` | Endpoint env var helpers |\n| `test/test_fabric.ml` | 7 new ZMQ agent tests |\n| `test/test_agent_control.ml` | 6 control tests ported to ZMQ |\n| `test/test_zmq_errors.ml` | 3 error/lifecycle tests |\n| `test/test_helpers.ml` | New — shared test infrastructure |\n| `doc/agent-runtime.md` | ZMQ transport mode subsection |\n| `doc/processes.md` | Annotate Multiplexed I/O |\n| `doc/architecture.md` | Module table, dependency graph |\n\n## Implementation order\n\n1. **Documentation contracts** — draft ZMQ transport mode subsection\n   in agent-runtime.md: env vars, ready handshake, shutdown\n   protocol, socket table.  These are design contracts; writing\n   them first reduces drift.\n\n2. **Fabric socket bifurcation** — modify fabric.ml to skip\n   agent-side sockets for subprocess processes; extend callback\n   with endpoints.  Update bridge's `make_handler` to accept new\n   arg.  `dune build && dune test` — all existing tests pass.\n\n3. **ZMQ handler** — `zmq_agent.ml`: spec synthesis, tool dispatch\n   wiring, endpoint env vars, agent spawn, reaper, ready handshake.\n   `dune build`.\n\n4. **Agent binary rewrite** — main.ml: ZMQ sockets, daemon fibres,\n   Eio.Mutex/Condition, ready signal, shutdown protocol, stdio\n   fallback.  `dune build`.\n\n5. **Integration + tests** — wire ZMQ handler into fabric, extract\n   test helpers, run all 16 tests + existing suite.\n   `dune build && dune test`.\n\n6. **Documentation finalisation** — update processes.md,\n   architecture.md; review agent-runtime.md against implementation.\n\n## Verification\n\n1. After step 2: `dune build && dune test` — all existing bridge\n   tests pass with extended callback\n2. After step 4: `dune build` — agent binary compiles with ZMQ\n3. After step 5: full test suite; all 16 new tests + existing\n4. EOF cascade: `empty ; agent ; id` terminates cleanly (test 3)\n5. Control protocol: pause/resume/memory over ZMQ (tests 10-12)\n6. Tool dispatch: round-trip through pipe adapter (test 5)\n7. Channel separation: telemetry on telemetry socket only (test 4)\n8. Crash handling: agent death → fabric cleans up (reaper fibre)\n9. IPC cleanup: no socket files after exit (test 16)\n10. Update PLMB ticket and engineering state in LDC\n\n## Follow-on tickets\n\n- Remove stdio fallback from agent binary (after bridge retirement)\n- Correlation-ID routing for parallel tool calls\n- Telemetry queue depth observability (HWM=0 monitoring)\n- `doc/executor.md` scope note for bridge/stdio path\n"
}