Claude Code transcript - page 2/2

ae12f2b3ebbbfe98c toolu_012GdcEgN2LbQcD8oinMhRzV /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/ae12f2b3ebbbfe98c.output completed

Agent "Theory-critic review D6 plan" completed

Good. Now I have a thorough understanding of both the pipe-based reference implementation and the ZMQ fabric. Let me render my review.


Review: D6 EOF/Disconnect Sentinel Semantics

1. The categorical question: is in-band EOF sound?

Yes, but only if you treat it correctly. Here is the precise framing.

In the pipe-based runtime, EOF is not a message -- it is the absence of future messages. It is a property of the channel, not a value in the channel's type. The read_lines loop in forward.ml discovers EOF via End_of_file from input_line -- a side effect of the underlying fd being closed. This is out-of-band: EOF inhabits the transport layer, not the data layer.

The sentinel {"__eof":true} promotes EOF into the data layer. This is a categorically different thing. You are now sending a value on a channel whose type is, say, !T (a stream of T). But {"__eof":true} is not a T. You have two options:

(a) EOF is outside the type. The sentinel bypasses validation -- recv_loop checks for it before type-checking. This means the actual wire protocol is not !T but !(T + EOF), where EOF is a distinguished sum injection that every morphism must handle. The type system says !T; the runtime implements !(T + EOF). This is a lie, but a disciplined one, as long as the sentinel is never confused with a legitimate T value.

(b) EOF is inside the type. You extend the type algebra with an explicit EOF or Bottom type. This is cleaner but invasive.

Your plan chooses (a). This is pragmatically correct and categorically defensible if and only if the sentinel is intercepted at the recv_loop level, before any morphism logic sees it. The sentinel must be invisible to the morphism's type contract. Looking at the current recv_loop:

let recv_loop ~label sock f =
  try while true do
    let (_topic, payload) = T.recv sock in
    if String.length payload > 0 then begin
      let json = ... in
      f json       (* <-- f must NEVER see the EOF sentinel *)
    end
  done with ...

The sentinel check must go here, between the recv and the call to f. If it leaks into f, you have broken the type contract. This is straightforward but the implementation must be disciplined about it.

Verdict on question 1: Sound, with the caveat below.

2. The critical caveat: sentinel uniqueness

The sentinel {"__eof":true} lives in the same namespace as legitimate JSON payloads. If any schema permits an object with a __eof key, you have a collision. The double-underscore convention is a social contract, not a type-theoretic guarantee.

In the pipe-based runtime, this problem does not exist because EOF is not a value -- it is fd closure, which is categorically impossible to confuse with a JSON message.

This is the single most important weakness of the sentinel approach. Three mitigations, in order of preference:

  1. Validate at schema boundaries. The type checker should reject schemas that permit a __eof field. This makes the sentinel's reserved status a type-level guarantee, not a naming convention.

  2. Use a non-JSON-object sentinel. A bare string like "__EOF__" is less likely to collide with structured payloads, but has the same fundamental problem.

  3. Use a transport-level sentinel. Encode EOF in the topic frame rather than the payload. Your existing frame format is <4-byte topic length><topic><payload>. A zero-length payload with a reserved topic (e.g., topic = __eof) would be unambiguous and invisible to the JSON layer. This is strictly better: it keeps EOF out-of-band at the wire level while remaining in-band at the ZMQ level (no separate socket).

I strongly recommend option 3. It preserves the architectural simplicity of "no separate control channel" while avoiding the type-safety hole of a JSON sentinel. The check in recv_loop becomes a topic check, not a payload parse:

let recv_loop ~label sock f =
  try while true do
    let (topic, payload) = T.recv sock in
    if topic = "__eof" then raise Eof_received;
    if String.length payload > 0 then ...
  done with ...

This is the simplest thing that works. The sentinel is in the framing layer, not the data layer. No risk of collision with any T.

3. Composition: does ; remain associative?

Consider f ; g ; h where each morphism has one input and one output. EOF propagation must satisfy:

  • f receives EOF, sends EOF downstream, exits.
  • g receives EOF from f, sends EOF downstream, exits.
  • h receives EOF from g, sends EOF downstream, exits.

This is a chain of cause-and-effect. The question is whether (f ; g) ; h and f ; (g ; h) produce the same observable behaviour. They do, because EOF propagation is a causal property: each morphism's EOF output depends only on its EOF input(s), and the sentinel travels through the same data channel as regular messages, so it is ordered with respect to them. FIFO ordering of PUSH/PULL guarantees that all data messages from f arrive at g before the EOF sentinel. This is correct.

The pipe-based runtime has the same property -- fd closure happens after all buffered data is flushed -- and the sentinel preserves it because ZMQ PUSH/PULL is FIFO.

Verdict: associativity of sequential composition is preserved.

4. Copy and discard: naturality

Copy (A -> A x A): On EOF, copy must send EOF on both outputs. The plan states this. Correct. The natural transformation diagram commutes: copy ; (f x g) receives EOF, copy sends EOF on both legs, f and g each receive EOF independently. This is equivalent to (f ; g) ; copy receiving EOF... wait, that does not type-check (copy is not post-composable with a pair in general). The point is that copy's EOF behaviour is componentwise, which is what naturality demands.

Discard (A -> I): On EOF, discard has nothing to propagate (it has no output). It simply exits. Correct -- the counit of the comonoid requires no downstream notification because there is no downstream.

Verdict: copy and discard are fine.

5. Merge: the all-EOF vs first-EOF distinction

This is where the design gets interesting. You have two merge variants:

  • merge (all-EOF): both inputs must EOF before output EOFs. This is the categorical coproduct injection merge: A + B -> C. The merge continues accepting messages from either input until both are exhausted.

  • merge_any (first-EOF): first input EOF triggers output EOF. This is the "race" merge used in session protocol envelopes.

Both are categorically legitimate -- they are different morphisms with different types and different operational semantics. The question is whether each is internally consistent.

merge (all-EOF): Currently uses Eio.Fiber.both, which waits for both reader fibres to complete. A reader fibre completes when recv_loop exits, which currently happens on transport error. With the sentinel, recv_loop will raise Eof_received, the reader catches it, and the fibre returns. Fiber.both waits for both returns, then sends EOF on the output. This is correct.

merge_any (first-EOF): Currently uses Eio.Fiber.first, which cancels the other fibre when one completes. With the sentinel, the first reader to receive EOF returns, Fiber.first cancels the other reader (via Eio cancellation, which will interrupt the Eio_unix.await_readable in recv_raw), then sends EOF on the output. This is also correct.

However, there is a subtlety. When Fiber.first cancels the other reader, that reader may have already received data messages that have not yet been forwarded to the output. Eio cancellation is asynchronous -- the cancelled fibre may be mid-send_json. If the cancellation interrupts between the recv and the send, a message is silently dropped.

In the pipe-based forward_merge_any, this is handled by force-closing both input channels (close_in_noerr ic0; close_in_noerr ic1), which causes the surviving reader to get a Sys_error on its next input_line and exit via the read_lines exception handler. The message currently being processed completes before the next read attempt. This is orderly shutdown.

Eio cancellation is not orderly in the same way. Eio.Cancel.protect blocks would be needed around the recv-validate-send sequence to ensure atomicity. Without this, merge_any can drop messages.

Recommendation: Wrap the body of each reader's loop iteration in Eio.Cancel.protect (or equivalent) so that a message in flight is always either fully processed or not started. Alternatively, restructure merge_any to use a shared flag (like the pipe-based version does with its closed ref) rather than relying on Eio cancellation for the "cancel other reader" semantics.

6. Barrier: any-EOF with partial round drop

The barrier's any-EOF semantics are correct: once one input closes, no complete round can ever be assembled, so the barrier must exit. Partial rounds are dropped. This matches the pipe-based implementation.

The sentinel approach actually simplifies the barrier compared to the pipe-based version, because you do not need the write-fd-closing trick to unblock readers (the macOS close(read_fd) problem documented in forward.ml lines 368-372). In the ZMQ version, the sentinel arrives as a normal message, and the reader processes it without any fd-closure gymnastics. This is a genuine improvement.

One concern: the current barrier implementation (lines 224-227 of inline.ml) sets eof := true and broadcasts after recv_loop returns:

    (* Mark EOF *)
    Eio.Mutex.use_rw ~protect:false m (fun () ->
      eof := true;
      Eio.Condition.broadcast cond)

With the sentinel approach, recv_loop will raise Eof_received instead of returning. You need to ensure this exception is caught and the EOF flag is still set. The current structure has the EOF-flag code after recv_loop, which works because recv_loop currently returns normally when the transport closes. If Eof_received is an exception, this code will be skipped unless it is in a handler. Make sure the implementation handles this.

7. Edge cases

Empty pipeline (id only): id receives EOF, sends EOF, exits. Correct.

Copy followed by discard on one leg: copy ; (f x discard). Copy sends EOF on both outputs. f receives EOF, propagates. discard receives EOF, exits (no output). The live leg completes correctly. Fine.

Filter that drops everything: A filter that drops all messages will never send any data downstream, but it will receive the EOF sentinel and must propagate it. This means filter must not suppress the EOF sentinel -- it must be intercepted before the filter predicate runs. If you implement the sentinel check inside recv_loop (as recommended), this is automatic.

Multiple EOFs: What if a morphism receives EOF more than once? (E.g., ZMQ socket receives from multiple upstream PUSHers that each send EOF.) This cannot happen with PUSH/PULL in the current wiring -- each PULL socket connects to exactly one PUSH socket per the routing table. But verify this invariant. If fan-in at the ZMQ level is ever introduced (multiple PUSHers to one PULLer), you would need EOF counting, not just EOF detection. For now this is not a concern, but document the assumption.

Race between data and EOF from different branches in a tensor product: Consider (f x g) ; barrier. If f sends its last data message and then EOF, but g has more data to send, the barrier must process all of g's remaining data (forming complete rounds with f's last message... no, barrier is synchronised: one message per input per round). Actually, once f sends EOF, the barrier's reader for f sets eof := true, and all other readers exit on the next Condition.wait. Any message g sent before f's EOF will have been deposited in g's slot. If f's EOF arrives when f's slot is filled and g's is not, the round is incomplete and correctly dropped. If both slots are filled (round complete), the product is emitted, then f's EOF arrives and sets the flag. This is correct.

8. Overall verdict

The sentinel approach is sound and preserves the algebraic properties of the language, subject to three requirements:

  1. Use a topic-level sentinel, not a JSON-level sentinel. This eliminates the type-safety hole where {"__eof":true} could collide with a legitimate payload. The framing layer already has a topic field; use it.

  2. Protect merge_any's reader loop iterations from Eio cancellation. Without this, messages in flight can be silently dropped when the first reader EOFs and Fiber.first cancels the other.

  3. Ensure barrier's EOF-flag code executes even when recv_loop raises an exception. Structure it as a Fun.protect ~finally or explicit exception handler.

The decision to avoid a separate control channel is correct for the current architecture (inline Eio fibres). The sentinel-on-data-channel approach is the right level of abstraction for now, and extends naturally to subprocesses in Step 5 as described.

The merge/merge_any distinction is categorically sound and operationally well-motivated. The barrier's any-EOF semantics are the only correct choice (partial rounds are meaningless).

This is a clean design. The three issues above are implementation concerns, not architectural ones. 396327106662 Full transcript available at: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/ae12f2b3ebbbfe98c.output

Theory-critic review is in. Key findings:

  1. Strong recommendation: Use a topic-level sentinel (topic = "__eof") instead of a JSON payload sentinel. This keeps EOF out of the data layer entirely — no collision risk, no type-safety hole. The framing layer already has a topic field.
  2. Must do: Protect merge_any reader loop iterations from Eio cancellation (messages in flight can be dropped).
  3. Must do: Ensure barrier's EOF-flag code executes even when recv_loop raises (use Fun.protect ~finally or explicit handler).
  4. Confirms: The sentinel approach is categorically sound, composition remains associative, copy/discard naturality preserved.

The topic-level sentinel is a significant improvement — simpler check, no collision risk. Waiting for the engineer review.

what about code simplification?

Thinking

The engineer review (which has the most detailed code simplification questions) is still running. The architect already flagged one simplification: integrate EOF handling into recv_loop itself rather than requiring every morphism to independently catch Eof_received and send EOF — the loop could take the output sockets and handle propagation automatically.

Let me check on the engineer review.

$ Bash
Check engineer review progress
tail -20 /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a3ddf887b1c34264b.output 2>/dev/null || echo "Still running or file not ready"
<persisted-output>
Output too large (53.9KB). Full output saved to: /Users/ww/.claude/projects/-Users-ww-src-plumbing/d1d325d0-be10-4874-bce5-4034de0cde74/tool-results/borxnvi4w.txt

Preview (first 2KB):
{"parentUuid":"4ac54085-b298-4c47-8b54-c344681af3ca","isSidechain":true,"agentId":"a3ddf887b1c34264b","message":{"model":"claude-sonnet-4-6","id":"msg_01CqU42ddwVSGir5nn7iX98f","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_01Ndwap8tUSSPTLW8217gGju","name":"Glob","input":{"pattern":"/Users/ww/src/plumbing/lib/transport/*.ml"},"caller":{"type":"direct"}}],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":1,"cache_creation_input_tokens":9935,"cache_read_input_tokens":16568,"cache_creation":{"ephemeral_5m_input_tokens":9935,"ephemeral_1h_input_tokens":0},"output_tokens":4,"service_tier":"standard","inference_geo":"not_available"}},"requestId":"req_011CZ1zFCB2tQX7SmF96Cb1z","type":"assistant","uuid":"18f8f4e4-924d-4ea8-94d1-4854cc1e1c74","timestamp":"2026-03-13T23:59:24.209Z","userType":"external","cwd":"/Users/ww/src/plumbing","sessionId":"d1d325d0-be10-4874-bce5-4034de0cde74","version":"2.1.75","gitBranch":"plumbing-1","slug":"abundant-cuddling-oasis"}
{"parentUuid":"18f8f4e4-924d-4ea8-94d1-4854cc1e1c74","isSidechain":true,"agentId":"a3ddf887b1c34264b","type":"progress","data":{"type":"hook_progress","hookEvent":"PostToolUse","hookName":"PostToolUse:Glob","command":"callback"},"parentToolUseID":"toolu_01Ndwap8tUSSPTLW8217gGju","toolUseID":"toolu_01Ndwap8tUSSPTLW8217gGju","timestamp":"2026-03-13T23:59:24.234Z","uuid":"d40236bd-d1a9-4386-9b6e-2222f0b2967a","userType":"external","cwd":"/Users/ww/src/plumbing","sessionId":"d1d325d0-be10-4874-bce5-4034de0cde74","version":"2.1.75","gitBranch":"plumbing-1","slug":"abundant-cuddling-oasis"}
...
</persisted-output>

Still running — it's doing thorough code exploration. The output file is growing. I'll incorporate its findings once it completes. The key simplification theme emerging from the first two reviews is:

Architect: Push EOF propagation into recv_loop itself — instead of every morphism doing try recv_loop ... with Eof_received -> ()); send_eof ..., have the loop take output sockets and send EOF automatically on exit. Each morphism just specifies the data transform.

Theory-critic: Use topic-level sentinel (topic = "__eof") — this is simpler than the JSON payload check and eliminates the type collision concern. One if topic = "__eof" check vs string equality on the whole payload.

Both point toward the same thing: less boilerplate per morphism, fewer places to get it wrong.

a3ddf887b1c34264b toolu_0144cjCCd2uJgk7GNUke9zYA /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a3ddf887b1c34264b.output completed

Agent "Engineer review D6 plan" completed

Here is the full review.


Review: D6 EOF/disconnect semantics

Overall verdict

The approach is directionally correct but has several correctness holes that need resolution before implementation. The most serious are a structural flaw in the sentinel design, exception flow blindspots in merge and barrier, and a significant simplification opportunity that should be taken rather than left open as a question.


Issue 1 — Sentinel design: wrong abstraction level (serious)

Sending {"__eof":true} as a regular data message on the PUSH socket is fragile for two reasons:

Ambiguity: Any legitimate message whose type happens to contain an __eof field would be misidentified. The type system does not structurally prevent this — it is a silent convention.

Framing: The sentinel travels through the same encode_frame/decode_frame path as data. The exact-string check fires on the payload after decoding. This is checkable, but it is an implicit dependency on yojson's serialisation producing {"__eof":true} with exactly those bytes — a fact that is true today but is not guaranteed by any interface contract.

Better alternative: Add a frame-type byte to encode_frame. One byte prepended to every frame: 0x00 = data, 0x01 = EOF. Checked in recv before JSON parse, structurally impossible to confuse with a data payload, and requires no change to the type system. The change to transport_zmq.ml is small: modify encode_frame and decode_frame to handle the prefix byte, and add a send_eof_frame function.

If you stay with in-band JSON: define eof_sentinel as a constant opaque byte string (not assembled via Yojson.Safe.to_string), document it explicitly, and add a test that a message containing __eof: true as a schema-valid field is not treated as EOF.


Issue 2 — merge: mixed-failure case is unhandled (serious)

The plan proposes each reader catching Eof_received internally and returning (), then Fiber.both waiting for both, then sending EOF downstream.

The hole: if reader0 catches Eof_received and returns (), but reader1 raises Morphism_fatal (which is not caught inside the reader), Fiber.both raises Morphism_fatal and the EOF send is skipped entirely. The downstream morphism blocks forever.

The fix is for readers to return a typed result rather than ():

type term_reason = Clean_eof | Transport_closed | Crashed of exn

After Fiber.both (or in a handler around it), inspect both reasons. If both are Clean_eof or Transport_closed, send EOF downstream. If either is Crashed, reraise the exception — do not silently send EOF as though it was a clean shutdown.


Issue 3 — merge_any: cancellation interaction needs analysis (serious)

Fiber.first cancels the non-winning fibre by raising Eio.Cancel.Cancelled into await_readable. This propagates up through recv_loop to Fiber.first, which is correct Eio behaviour — do not add a Cancelled catch in recv_loop.

The problem: if the plan's readers catch Eof_received and return (), then a reader that crashes via Morphism_fatal will also return () if the fatal exception is caught inside the reader. Fiber.first cannot distinguish "finished cleanly with EOF" from "finished due to crash". Same typed-result fix applies.


Issue 4 — Barrier: post-loop code is skipped when Eof_received raises (serious)

The barrier has this structure:

let reader idx in_type recv_sock =
  recv_loop ~label recv_sock (fun json -> ...);
  (* post-loop: *)
  Eio.Mutex.use_rw ~protect:false m (fun () ->
    eof := true;
    Eio.Condition.broadcast cond)

The eof := true; broadcast is on the line after recv_loop. If recv_loop raises Eof_received (as the plan proposes), the post-loop code is never reached. Other barrier readers waiting in Eio.Condition.await cond m will block forever.

This is directly fixed by the simplification below.


Issue 5 — Take the simplification: return value instead of exception (design decision)

You asked whether recv_loop should return a value instead of raising. Yes. Do it. This resolves Issues 2, 3, and 4 simultaneously.

Change recv_loop from:

val recv_loop : label:string -> 'a T.socket -> (Yojson.Safe.t -> unit) -> unit

to:

type loop_exit = Eof | Closed | Error

val recv_loop : label:string -> 'a T.socket -> (Yojson.Safe.t -> unit) -> loop_exit
  • Eof: sentinel received
  • Closed: Transport_error (disconnect)
  • Error: callback raised Exit (validation/parse error)

Each morphism then matches on the return value:

let exit = recv_loop ~label recv_sock (fun json -> ...) in
(match exit with
 | Eof -> send_eof send_sock
 | Closed | Error -> ())

The barrier post-loop code becomes unconditionally reachable because recv_loop now returns rather than raises. The try ... with Eof_received -> () boilerplate in every morphism disappears. Eof_received as an exception is eliminated entirely — no daemon-level catch needed.

The only existing exception that needs preserving is raise Exit inside callbacks as the early-abort signal. That stays; it is the callback's mechanism, not the loop's termination mechanism. recv_loop catches it internally and returns Error.


Issue 6 — send_eof must suppress Transport_error (moderate)

If a downstream morphism has already exited when EOF is sent (its PULL socket closed), T.send raises Transport_error. The send_eof function must catch and suppress this with a debug log — same pattern as forward_copy:

(try T.send send_sock ~topic:"" eof_payload with
 | Transport.Transport_error _ ->
   Runtime.Log.debug label ["event", `String "eof_send_skipped_closed"])

Without this, a clean shutdown path raises Transport_error at the fabric daemon handler and logs as an error. Misleading and noisy.


Issue 7 — Tests: timeouts are mandatory, not optional (moderate)

recv_until_eof blocking forever is not a test reliability concern — it is a test correctness concern. A broken EOF propagation (e.g., the barrier issue above) would cause the test suite to hang indefinitely with no indication of which test failed.

The ZMQ receive timeout set in make_socket (1000ms) retries — it does not surface as an error until the socket is closed. It does not help here.

Required: use Eio.Time.with_timeout_exn around the receive side of each EOF test. The with_fabric helper needs to pass the Eio environment to f:

let with_fabric src f =
  ...
  Eio_main.run (fun env ->
    Eio.Switch.run (fun sw ->
      let fabric = Fabric.execute ~sw ~routing_table ~type_env ~pipeline_id () in
      Unix.sleepf 0.1;
      f env push_in pull_out;
      fabric.cancel ()))

Five seconds is a reasonable timeout for these tests.


Issue 8 — Copy: sequential EOF on two sockets is correct (minor)

Confirmed correct. ZMQ PUSH/PULL is FIFO per socket. Each downstream morphism receives EOF after all preceding data because EOF travels in the same queue. No race.


Issue 9 — Fabric daemon handler (minor)

The plan says to catch Eof_received in the daemon handler. If the return-value approach is adopted, there is no Eof_received exception to catch. The daemon handler stays exactly as it is. If you stay with exceptions, the handler needs the catch — but its presence signals that Eof_received is escaping morphisms when it should not.


Required changes before implementation

# What Severity
1 Replace JSON sentinel with frame-type byte in encode_frame/decode_frame, or carefully document and test the in-band sentinel Serious
2 recv_loop returns loop_exit (not unit, not raises) Serious
3 Merge and merge_any readers return typed results, not () Serious
4 Barrier post-loop code is unconditionally reachable (solved by 2) Serious
5 send_eof catches and suppresses Transport_error Moderate
6 with_fabric passes Eio env to f; all EOF tests use with_timeout_exn Moderate
7 No Cancelled catch anywhere in recv_loop Correctness

Items 2–4 are one change: switch recv_loop to return a typed exit status. That single design decision resolves the three most serious correctness issues and eliminates more code than it adds. 4020810156947 Full transcript available at: /private/tmp/claude-501/-Users-ww-src-plumbing/0e6b86b5-3d3c-4483-9c52-ce740324d2d9/tasks/a3ddf887b1c34264b.output

Thinking

All three reviews are in. Strong consensus on three key changes:

  1. Topic-level sentinel (theory-critic + engineer): Don't use JSON payload. Use topic = "__eof" — simpler, no collision risk, no transport changes needed.
  2. Return value, not exception (architect + engineer): recv_loop returns Eof | Closed | Error, not unit. Fixes barrier post-loop bug, eliminates per-morphism boilerplate.
  3. EOF on failure (architect + engineer): Fabric daemon handler must inject EOF on outputs when a morphism crashes.

Plus: send_eof must suppress Transport_error, tests need timeouts, merge_any cancellation needs care.

Let me update the plan.

📝 Write abundant-cuddling-oasis.md
/Users/ww/.claude/plans/abundant-cuddling-oasis.md
# D6: EOF sentinel for ZMQ fabric

## Context

The ZMQ fabric (PLMB-037 Steps 0–4) has no EOF propagation. The only
shutdown path is `fabric.cancel()` which force-closes all sockets.
In the pipe-based runtime, EOF propagates naturally when a process
exits and its fds close. ZMQ sockets have no EOF — `recv` blocks
until data arrives (via `Eio_unix.await_readable`, cancellable).

This blocks Step 5 (agent subprocess integration) and proper
pipeline lifecycle.

## Approach: topic-level EOF sentinel

Send an EOF marker using the existing topic field: `topic = "__eof"`,
empty payload. No new socket types, no routing table changes, no
transport layer changes. The sentinel travels the same FIFO path as
data, arriving after all data messages.

**Why topic-level, not JSON payload** (theory-critic review): A JSON
sentinel `{"__eof":true}` lives in the data layer — it could collide
with a legitimate message type. The topic field is already part of
every frame (framing layer, not data layer). Checking `topic` is a
simple string comparison before JSON parsing. No type-safety hole.

**Why not control PAIR sockets**: All morphisms are inline Eio fibres.
Sentinel is simpler, transport-agnostic, extends to subprocesses
(Step 5).

## Design decisions from review

### R1: `recv_loop` returns typed exit status (not exception)

All three reviewers agreed. Returning a value instead of raising
`Eof_received` fixes:
- Barrier post-loop code being skipped (was unreachable on exception)
- Per-morphism `try...with Eof_received` boilerplate
- Mixed-failure cases in merge (crash vs clean EOF)

```ocaml
type loop_exit = Eof | Closed | Error

let recv_loop ~label sock f : loop_exit =
  try while true do
    let (topic, payload) = T.recv sock in
    if topic = eof_topic then raise_notrace Exit_eof;
    if String.length payload > 0 then begin
      let json = ... in f json
    end
  done; Error (* unreachable *)
  with
  | Exit_eof -> Eof
  | Exit -> Error
  | Transport.Transport_error _ -> Closed
```

Local exception `Exit_eof` is internal to `recv_loop` — never
escapes. Callers match on the return value.

### R2: EOF-on-failure in fabric daemon handler

When a morphism crashes (`Morphism_fatal` or `Transport_error`),
the fabric must inject EOF on all its output sockets. Without this,
downstream morphisms block forever. The daemon handler already knows
the process entry, so it knows the output ports.

### R3: `send_eof` suppresses `Transport_error`

If a downstream socket is already closed, `T.send` raises. The
`send_eof` helper catches and logs at debug level — same pattern
as `forward_copy`'s output-closed handling.

### R4: Test timeouts are mandatory

Use `Eio.Time.with_timeout_exn` around the receive side of EOF
tests. `with_fabric` passes `env` to the test function.

## Changes

### 1. EOF primitives in `inline.ml`

```ocaml
let eof_topic = "__eof"

type loop_exit = Eof | Closed | Error

let send_eof sock ~topic:_ =
  try T.send sock ~topic:eof_topic ""
  with Transport.Transport_error _ -> ()
```

`send_eof` ignores the caller's topic — the topic is always
`"__eof"`. The `~topic:_` parameter keeps the call-site API
consistent but is unused. Suppresses transport errors.

### 2. Modify `recv_loop`

Return `loop_exit` instead of `unit`. Check topic for EOF before
JSON parsing. Internal `Exit_eof` exception never escapes.

### 3. Update each morphism

**Single-input, single-output** (identity, map, filter, format_json,
parse_json, project, tagger):

```ocaml
let forward_id ~name type_env input_type recv_sock send_sock =
  let label = Printf.sprintf "id[%s]" name in
  let exit = recv_loop ~label recv_sock (fun json ->
    match Validate.check type_env input_type json with
    | Ok cleaned -> send_json send_sock ~topic:name cleaned
    | Error errs -> ...; raise Exit)
  in
  if exit = Eof then send_eof send_sock ~topic:name
```

**Copy** (single-input, two outputs):

```ocaml
let exit = recv_loop ~label recv_sock (fun json -> ...) in
if exit = Eof then begin
  send_eof send_sock0 ~topic:name;
  send_eof send_sock1 ~topic:name
end
```

**Discard / log** (no output): ignore exit status.

**Merge** (two inputs, all-EOF — `Eio.Fiber.both`):

Each reader returns `loop_exit`. After both complete, send EOF
only if both exited cleanly:

```ocaml
let exit0 = ref Error in
let exit1 = ref Error in
Eio.Fiber.both
  (fun () -> exit0 := recv_loop ~label:(label ^ "/0") recv_sock0
    (fun json -> ...))
  (fun () -> exit1 := recv_loop ~label:(label ^ "/1") recv_sock1
    (fun json -> ...));
if !exit0 = Eof || !exit1 = Eof then
  send_eof send_sock ~topic:name
```

Note: if one reader gets `Eof` and the other gets `Closed`, we
still send EOF downstream — at least one input completed normally.

**Merge_any** (two inputs, first-EOF — `Eio.Fiber.first`):

`Fiber.first` cancels the other fibre when one returns. The
cancelled fibre gets `Eio.Cancel.Cancelled` which propagates out
of `await_readable` — `recv_loop` catches `Transport_error` and
returns `Closed`. After `Fiber.first` returns, send EOF:

```ocaml
let _exit = Eio.Fiber.first
  (fun () -> recv_loop ~label:(label ^ "/0") recv_sock0
    (fun json -> ...))
  (fun () -> recv_loop ~label:(label ^ "/1") recv_sock1
    (fun json -> ...))
in
send_eof send_sock ~topic:name
```

`Eio.Cancel.Cancelled` propagates through `recv_raw` →
`Transport_error` → `recv_loop` returns `Closed`. Do NOT catch
`Cancelled` in `recv_loop`.

**Barrier** (N inputs, any-EOF):

`recv_loop` returns, so post-loop code is always reached:

```ocaml
let reader idx in_type recv_sock =
  let sub_label = Printf.sprintf "barrier[%s]/%d" name idx in
  let _exit = recv_loop ~label:sub_label recv_sock (fun json ->
    ...) in
  Eio.Mutex.use_rw ~protect:false m (fun () ->
    eof := true;
    Eio.Condition.broadcast cond)
in
Eio.Fiber.all (List.map ...);
send_eof send_sock ~topic:barrier_label
```

The `eof := true; broadcast` now executes whether `recv_loop`
returned `Eof`, `Closed`, or `Error`.

### 4. Fabric daemon handler — EOF on failure

In Phase 4 of `fabric.ml`, the `fork_daemon` wrapper catches
`Morphism_fatal` and `Transport_error`. Add EOF injection on all
output sockets of the failed morphism:

```ocaml
Eio.Fiber.fork_daemon ~sw (fun () ->
  let output_socks = find_output_socks pe in
  (try match pe.proc_impl with
    | Id -> Inline.forward_id ...
    | ...
  with
  | Inline.Morphism_fatal (label, _) ->
    Runtime.Log.error "fabric" [...];
    List.iter (fun s -> Inline.send_eof s ~topic:pe.proc_name)
      output_socks
  | Transport.Transport_error (_, msg) ->
    Runtime.Log.debug "fabric" [...];
    List.iter (fun s -> Inline.send_eof s ~topic:pe.proc_name)
      output_socks);
  `Stop_daemon)
```

`find_output_socks` looks up PUSH sockets for the process's output
ports in the socket table. Built from the routing table routes where
`r.src = pe.proc_name`.

### 5. Fabric boundary helpers

```ocaml
let send_boundary_eof t port_name =
  let sock = boundary_input t port_name in
  Inline.send_eof sock ~topic:port_name
```

### 6. Update `with_fabric` in tests

Pass `env` so tests can use timeouts:

```ocaml
let with_fabric src f =
  let routing_table, type_env = parse_and_compile src in
  let pipeline_id = fresh_id () in
  Eio_main.run (fun env ->
    Eio.Switch.run (fun sw ->
      let fabric = Fabric.execute ~sw ~routing_table ~type_env
        ~pipeline_id () in
      Unix.sleepf 0.1;
      let push_in = Fabric.boundary_input fabric "input" in
      let pull_out = Fabric.boundary_output fabric "output" in
      f env push_in pull_out;
      fabric.cancel ()))
```

Helper to receive until EOF with timeout:

```ocaml
let recv_until_eof env sock =
  let msgs = ref [] in
  Eio.Time.with_timeout_exn (Eio.Stdenv.clock env) 5.0 (fun () ->
    let rec loop () =
      let (topic, payload) = T.recv sock in
      if topic = Inline.eof_topic then ()
      else begin
        msgs := payload :: !msgs;
        loop ()
      end
    in loop ());
  List.rev !msgs
```

### 7. Tests

| Test | Asserts |
|------|---------|
| `test_identity_eof` | Send msg + EOF → receive msg, then EOF on output |
| `test_copy_merge_eof` | Msg + EOF through copy+merge → 2 copies + EOF |
| `test_eof_chain` | 3-stage (id→id→id), EOF propagates end-to-end |
| `test_merge_any_eof` | EOF on one input → output EOF, other cancelled |

## Files to modify

| File | Change |
|------|--------|
| `lib/fabric/inline.ml` | EOF topic, `loop_exit` type, `recv_loop` returns status, all morphisms propagate EOF |
| `lib/fabric/fabric.ml` | EOF-on-failure in daemon handler, `send_boundary_eof`, `find_output_socks` |
| `test/test_fabric.ml` | `with_fabric` passes env, `recv_until_eof` helper, 4 EOF tests |

## What does NOT change

- `lib/transport/transport_zmq.ml` — sentinel is application-level
- `lib/transport/transport_intf.ml` — no new types
- `lib/runtime/routing.ml` — no new channel kinds
- `lib/supervisor/supervisor.ml` — no changes for inline morphisms

## Verification

1. `nix develop --command dune build --root=.` in worktree
2. `nix develop --command dune test --root=.` — all tests pass
3. New EOF tests pass with timeouts (no hangs)
4. Existing identity and copy+merge tests still pass

## Review issues addressed

| Issue | Source | Resolution |
|-------|--------|------------|
| JSON sentinel collision risk | Theory-critic, Engineer | Topic-level sentinel (`"__eof"`) |
| `recv_loop` should return, not raise | All three | `loop_exit` return type |
| Barrier post-loop code skipped | Engineer | Fixed by return-value approach |
| Merge mixed-failure case | Engineer | Check both exit statuses |
| EOF on morphism failure | Architect | Daemon handler injects EOF |
| `send_eof` must suppress errors | Engineer | Catch `Transport_error` |
| Test timeouts mandatory | Engineer | `with_timeout_exn`, 5s |
| merge_any cancellation safety | Theory-critic | `Cancelled` → `Transport_error` → `Closed` (no special handling) |
| Post-EOF send guard | Architect | Deferred — low risk for inline morphisms |

## Future extension (Step 5)

When agent subprocesses are added, the boundary adapter fibre
translates between pipe EOF and the topic sentinel. Subprocess
never sees `"__eof"` — it sees native pipe EOF. Both directions
(pipe→ZMQ and ZMQ→pipe) handled by the same adapter. No changes
to the sentinel protocol.
The file /Users/ww/.claude/plans/abundant-cuddling-oasis.md has been updated successfully.
ExitPlanMode
{
  "allowedPrompts": [
    {
      "tool": "Bash",
      "prompt": "build the project with dune"
    },
    {
      "tool": "Bash",
      "prompt": "run tests with dune"
    },
    {
      "tool": "Bash",
      "prompt": "create git worktree and branch"
    },
    {
      "tool": "Bash",
      "prompt": "git operations (status, diff, add, commit)"
    },
    {
      "tool": "Bash",
      "prompt": "remove git worktree"
    }
  ],
  "plan": "# D6: EOF sentinel for ZMQ fabric\n\n## Context\n\nThe ZMQ fabric (PLMB-037 Steps 0–4) has no EOF propagation. The only\nshutdown path is `fabric.cancel()` which force-closes all sockets.\nIn the pipe-based runtime, EOF propagates naturally when a process\nexits and its fds close. ZMQ sockets have no EOF — `recv` blocks\nuntil data arrives (via `Eio_unix.await_readable`, cancellable).\n\nThis blocks Step 5 (agent subprocess integration) and proper\npipeline lifecycle.\n\n## Approach: topic-level EOF sentinel\n\nSend an EOF marker using the existing topic field: `topic = \"__eof\"`,\nempty payload. No new socket types, no routing table changes, no\ntransport layer changes. The sentinel travels the same FIFO path as\ndata, arriving after all data messages.\n\n**Why topic-level, not JSON payload** (theory-critic review): A JSON\nsentinel `{\"__eof\":true}` lives in the data layer — it could collide\nwith a legitimate message type. The topic field is already part of\nevery frame (framing layer, not data layer). Checking `topic` is a\nsimple string comparison before JSON parsing. No type-safety hole.\n\n**Why not control PAIR sockets**: All morphisms are inline Eio fibres.\nSentinel is simpler, transport-agnostic, extends to subprocesses\n(Step 5).\n\n## Design decisions from review\n\n### R1: `recv_loop` returns typed exit status (not exception)\n\nAll three reviewers agreed. Returning a value instead of raising\n`Eof_received` fixes:\n- Barrier post-loop code being skipped (was unreachable on exception)\n- Per-morphism `try...with Eof_received` boilerplate\n- Mixed-failure cases in merge (crash vs clean EOF)\n\n```ocaml\ntype loop_exit = Eof | Closed | Error\n\nlet recv_loop ~label sock f : loop_exit =\n  try while true do\n    let (topic, payload) = T.recv sock in\n    if topic = eof_topic then raise_notrace Exit_eof;\n    if String.length payload > 0 then begin\n      let json = ... in f json\n    end\n  done; Error (* unreachable *)\n  with\n  | Exit_eof -> Eof\n  | Exit -> Error\n  | Transport.Transport_error _ -> Closed\n```\n\nLocal exception `Exit_eof` is internal to `recv_loop` — never\nescapes. Callers match on the return value.\n\n### R2: EOF-on-failure in fabric daemon handler\n\nWhen a morphism crashes (`Morphism_fatal` or `Transport_error`),\nthe fabric must inject EOF on all its output sockets. Without this,\ndownstream morphisms block forever. The daemon handler already knows\nthe process entry, so it knows the output ports.\n\n### R3: `send_eof` suppresses `Transport_error`\n\nIf a downstream socket is already closed, `T.send` raises. The\n`send_eof` helper catches and logs at debug level — same pattern\nas `forward_copy`'s output-closed handling.\n\n### R4: Test timeouts are mandatory\n\nUse `Eio.Time.with_timeout_exn` around the receive side of EOF\ntests. `with_fabric` passes `env` to the test function.\n\n## Changes\n\n### 1. EOF primitives in `inline.ml`\n\n```ocaml\nlet eof_topic = \"__eof\"\n\ntype loop_exit = Eof | Closed | Error\n\nlet send_eof sock ~topic:_ =\n  try T.send sock ~topic:eof_topic \"\"\n  with Transport.Transport_error _ -> ()\n```\n\n`send_eof` ignores the caller's topic — the topic is always\n`\"__eof\"`. The `~topic:_` parameter keeps the call-site API\nconsistent but is unused. Suppresses transport errors.\n\n### 2. Modify `recv_loop`\n\nReturn `loop_exit` instead of `unit`. Check topic for EOF before\nJSON parsing. Internal `Exit_eof` exception never escapes.\n\n### 3. Update each morphism\n\n**Single-input, single-output** (identity, map, filter, format_json,\nparse_json, project, tagger):\n\n```ocaml\nlet forward_id ~name type_env input_type recv_sock send_sock =\n  let label = Printf.sprintf \"id[%s]\" name in\n  let exit = recv_loop ~label recv_sock (fun json ->\n    match Validate.check type_env input_type json with\n    | Ok cleaned -> send_json send_sock ~topic:name cleaned\n    | Error errs -> ...; raise Exit)\n  in\n  if exit = Eof then send_eof send_sock ~topic:name\n```\n\n**Copy** (single-input, two outputs):\n\n```ocaml\nlet exit = recv_loop ~label recv_sock (fun json -> ...) in\nif exit = Eof then begin\n  send_eof send_sock0 ~topic:name;\n  send_eof send_sock1 ~topic:name\nend\n```\n\n**Discard / log** (no output): ignore exit status.\n\n**Merge** (two inputs, all-EOF — `Eio.Fiber.both`):\n\nEach reader returns `loop_exit`. After both complete, send EOF\nonly if both exited cleanly:\n\n```ocaml\nlet exit0 = ref Error in\nlet exit1 = ref Error in\nEio.Fiber.both\n  (fun () -> exit0 := recv_loop ~label:(label ^ \"/0\") recv_sock0\n    (fun json -> ...))\n  (fun () -> exit1 := recv_loop ~label:(label ^ \"/1\") recv_sock1\n    (fun json -> ...));\nif !exit0 = Eof || !exit1 = Eof then\n  send_eof send_sock ~topic:name\n```\n\nNote: if one reader gets `Eof` and the other gets `Closed`, we\nstill send EOF downstream — at least one input completed normally.\n\n**Merge_any** (two inputs, first-EOF — `Eio.Fiber.first`):\n\n`Fiber.first` cancels the other fibre when one returns. The\ncancelled fibre gets `Eio.Cancel.Cancelled` which propagates out\nof `await_readable` — `recv_loop` catches `Transport_error` and\nreturns `Closed`. After `Fiber.first` returns, send EOF:\n\n```ocaml\nlet _exit = Eio.Fiber.first\n  (fun () -> recv_loop ~label:(label ^ \"/0\") recv_sock0\n    (fun json -> ...))\n  (fun () -> recv_loop ~label:(label ^ \"/1\") recv_sock1\n    (fun json -> ...))\nin\nsend_eof send_sock ~topic:name\n```\n\n`Eio.Cancel.Cancelled` propagates through `recv_raw` →\n`Transport_error` → `recv_loop` returns `Closed`. Do NOT catch\n`Cancelled` in `recv_loop`.\n\n**Barrier** (N inputs, any-EOF):\n\n`recv_loop` returns, so post-loop code is always reached:\n\n```ocaml\nlet reader idx in_type recv_sock =\n  let sub_label = Printf.sprintf \"barrier[%s]/%d\" name idx in\n  let _exit = recv_loop ~label:sub_label recv_sock (fun json ->\n    ...) in\n  Eio.Mutex.use_rw ~protect:false m (fun () ->\n    eof := true;\n    Eio.Condition.broadcast cond)\nin\nEio.Fiber.all (List.map ...);\nsend_eof send_sock ~topic:barrier_label\n```\n\nThe `eof := true; broadcast` now executes whether `recv_loop`\nreturned `Eof`, `Closed`, or `Error`.\n\n### 4. Fabric daemon handler — EOF on failure\n\nIn Phase 4 of `fabric.ml`, the `fork_daemon` wrapper catches\n`Morphism_fatal` and `Transport_error`. Add EOF injection on all\noutput sockets of the failed morphism:\n\n```ocaml\nEio.Fiber.fork_daemon ~sw (fun () ->\n  let output_socks = find_output_socks pe in\n  (try match pe.proc_impl with\n    | Id -> Inline.forward_id ...\n    | ...\n  with\n  | Inline.Morphism_fatal (label, _) ->\n    Runtime.Log.error \"fabric\" [...];\n    List.iter (fun s -> Inline.send_eof s ~topic:pe.proc_name)\n      output_socks\n  | Transport.Transport_error (_, msg) ->\n    Runtime.Log.debug \"fabric\" [...];\n    List.iter (fun s -> Inline.send_eof s ~topic:pe.proc_name)\n      output_socks);\n  `Stop_daemon)\n```\n\n`find_output_socks` looks up PUSH sockets for the process's output\nports in the socket table. Built from the routing table routes where\n`r.src = pe.proc_name`.\n\n### 5. Fabric boundary helpers\n\n```ocaml\nlet send_boundary_eof t port_name =\n  let sock = boundary_input t port_name in\n  Inline.send_eof sock ~topic:port_name\n```\n\n### 6. Update `with_fabric` in tests\n\nPass `env` so tests can use timeouts:\n\n```ocaml\nlet with_fabric src f =\n  let routing_table, type_env = parse_and_compile src in\n  let pipeline_id = fresh_id () in\n  Eio_main.run (fun env ->\n    Eio.Switch.run (fun sw ->\n      let fabric = Fabric.execute ~sw ~routing_table ~type_env\n        ~pipeline_id () in\n      Unix.sleepf 0.1;\n      let push_in = Fabric.boundary_input fabric \"input\" in\n      let pull_out = Fabric.boundary_output fabric \"output\" in\n      f env push_in pull_out;\n      fabric.cancel ()))\n```\n\nHelper to receive until EOF with timeout:\n\n```ocaml\nlet recv_until_eof env sock =\n  let msgs = ref [] in\n  Eio.Time.with_timeout_exn (Eio.Stdenv.clock env) 5.0 (fun () ->\n    let rec loop () =\n      let (topic, payload) = T.recv sock in\n      if topic = Inline.eof_topic then ()\n      else begin\n        msgs := payload :: !msgs;\n        loop ()\n      end\n    in loop ());\n  List.rev !msgs\n```\n\n### 7. Tests\n\n| Test | Asserts |\n|------|---------|\n| `test_identity_eof` | Send msg + EOF → receive msg, then EOF on output |\n| `test_copy_merge_eof` | Msg + EOF through copy+merge → 2 copies + EOF |\n| `test_eof_chain` | 3-stage (id→id→id), EOF propagates end-to-end |\n| `test_merge_any_eof` | EOF on one input → output EOF, other cancelled |\n\n## Files to modify\n\n| File | Change |\n|------|--------|\n| `lib/fabric/inline.ml` | EOF topic, `loop_exit` type, `recv_loop` returns status, all morphisms propagate EOF |\n| `lib/fabric/fabric.ml` | EOF-on-failure in daemon handler, `send_boundary_eof`, `find_output_socks` |\n| `test/test_fabric.ml` | `with_fabric` passes env, `recv_until_eof` helper, 4 EOF tests |\n\n## What does NOT change\n\n- `lib/transport/transport_zmq.ml` — sentinel is application-level\n- `lib/transport/transport_intf.ml` — no new types\n- `lib/runtime/routing.ml` — no new channel kinds\n- `lib/supervisor/supervisor.ml` — no changes for inline morphisms\n\n## Verification\n\n1. `nix develop --command dune build --root=.` in worktree\n2. `nix develop --command dune test --root=.` — all tests pass\n3. New EOF tests pass with timeouts (no hangs)\n4. Existing identity and copy+merge tests still pass\n\n## Review issues addressed\n\n| Issue | Source | Resolution |\n|-------|--------|------------|\n| JSON sentinel collision risk | Theory-critic, Engineer | Topic-level sentinel (`\"__eof\"`) |\n| `recv_loop` should return, not raise | All three | `loop_exit` return type |\n| Barrier post-loop code skipped | Engineer | Fixed by return-value approach |\n| Merge mixed-failure case | Engineer | Check both exit statuses |\n| EOF on morphism failure | Architect | Daemon handler injects EOF |\n| `send_eof` must suppress errors | Engineer | Catch `Transport_error` |\n| Test timeouts mandatory | Engineer | `with_timeout_exn`, 5s |\n| merge_any cancellation safety | Theory-critic | `Cancelled` → `Transport_error` → `Closed` (no special handling) |\n| Post-EOF send guard | Architect | Deferred — low risk for inline morphisms |\n\n## Future extension (Step 5)\n\nWhen agent subprocesses are added, the boundary adapter fibre\ntranslates between pipe EOF and the topic sentinel. Subprocess\nnever sees `\"__eof\"` — it sees native pipe EOF. Both directions\n(pipe→ZMQ and ZMQ→pipe) handled by the same adapter. No changes\nto the sentinel protocol.\n"
}