3. The transport¶
What¶
src/zsnoop_mcp/transport.py — the
async layer that owns one persistent subprocess per host, frames JSON-RPC
over its stdio, and reconnects on failure.
Why this design¶
We want low per-request latency and simple operational story. The combination we landed on:
| Choice | Reason |
|---|---|
| One persistent subprocess per host | Pay the SSH handshake once per session, not per call. Per-request overhead is < 1 ms after the connection is up. |
Serial RPCs per connection (asyncio.Lock) |
Avoids interleaved responses on stdin/stdout. Different hosts can still run concurrently because each has its own subprocess. |
Transparent reconnect on EOFError / BrokenPipe |
SSH connections do die. We retry once silently; the second failure raises. |
Bounded recv_timeout (60 s) |
A hung zfs find / shouldn't block the MCP server forever. |
Bounded NDJSON line size (MAX_LINE_BYTES = 16 MiB) |
Comfortably clears every agent-side hard cap; an over-budget response surfaces as a TransportError, not a raw asyncio ValueError. |
| Drain stderr to a logger AND a tail buffer | Used to be just the logger; we now also keep the last 50 lines in memory so failure messages include the actual underlying error. |
How — guided tour¶
Command construction¶
The transport never spawns SSH directly. Three small builders:
def build_ssh_argv(config: HostConfig, agent_source: str) -> list[str]:
"""ssh -T BatchMode=yes ... -- <target> <remote-shell-command>"""
def build_local_argv(config: HostConfig, agent_source: str) -> list[str]:
"""Just the remote command, no SSH wrapper."""
def build_argv(config: HostConfig, agent_source: str) -> list[str]:
"""Dispatch on config.transport."""
Both ssh and local share _remote_command:
def _remote_command(config: HostConfig, agent_source: str) -> list[str]:
parts: list[str] = []
if config.sudo:
parts.append("sudo")
if config.agent_mode == "bootstrap":
parts.extend([config.remote_python, "-c", _bootstrap_stub(agent_source)])
else:
parts.append(config.agent_path)
return parts
For SSH, those parts are then joined with shlex.quote and appended as a
single argv to ssh, since ssh host arg1 arg2 … runs sh -c "arg1 arg2 …"
on the remote.
Bootstrap-on-connect — the base64 trick¶
_bootstrap_stub is the cute bit:
def _bootstrap_stub(agent_source: str) -> str:
encoded = base64.b64encode(agent_source.encode("utf-8")).decode("ascii")
return (
f"import base64\n"
f"exec(compile(base64.b64decode('{encoded}').decode(), '<zfs-snoop-agent>', 'exec'))\n"
)
The remote shell ends up running:
python3 -c 'import base64
exec(compile(base64.b64decode("…<26 KB of base64>…").decode(), "<zfs-snoop-agent>", "exec"))
'
Why this and not cat | python3:
stdin is sacred
Once Python starts, its stdin is the SSH stdin, which is the JSON-RPC stream we need to send requests over. We can't use stdin to deliver the script — it has to come in via the command line.
The compile(..., '<zfs-snoop-agent>', 'exec') part gives tracebacks the
nice filename <zfs-snoop-agent> instead of <string>.
AgentConnection — one process, one lock, one tail¶
The state machine, simplified:
┌──────────┐
│ idle │
└────┬─────┘
│ first call()
▼
┌──────────┐ EOFError ┌──────────┐
send/recv ──►│ alive │───────────────►│ closed │
└────┬─────┘ └────┬─────┘
│ close() │ next call() (one retry)
▼ ▼
┌──────────┐ ┌──────────┐
│ closed │ │ alive │
└──────────┘ └──────────┘
Implementation highlights from transport.py:
class AgentConnection:
def __init__(self, name, argv, *, max_reconnects=1, spawn_timeout=10.0, recv_timeout=60.0):
self._proc: asyncio.subprocess.Process | None = None
self._lock = asyncio.Lock() # one in-flight RPC per host
self._next_id = 1
self._stderr_task: asyncio.Task[None] | None = None
self._stderr_tail: list[str] = [] # last N stderr lines, surfaced on failure
Each call() acquires the lock, then loops up to max_reconnects + 1 times:
async def call(self, method, params=None):
async with self._lock:
attempts = self._max_reconnects + 1
for attempt in range(attempts):
try:
return await self._call_once(method, params)
except (BrokenPipeError, ConnectionResetError, EOFError) as e:
log.warning(...)
stderr_blob = await self._capture_remaining_stderr()
await self._close_proc()
if attempt == attempts - 1:
msg = f"agent on {self.name!r} unreachable after {attempts} attempts: {e}"
if stderr_blob:
msg += f"\nagent stderr:\n{stderr_blob}"
raise TransportError(msg) from e
The stderr surfacing fix¶
A real bug we hit (and have a test for): the agent dies on a stripped env
(e.g. no SSH_AUTH_SOCK); SSH writes Permission denied (publickey) to
stderr; we previously swallowed that and reported a useless "agent
unreachable". The fix lives in
_capture_remaining_stderr:
async def _capture_remaining_stderr(self) -> str:
# Poll the tail briefly so the background drainer can flush pending
# lines. We don't read the stream directly — two concurrent readers
# on a StreamReader produce undefined behaviour.
deadline = ... + self._STDERR_FINAL_DRAIN_SECS
last_seen = -1
while loop.time() < deadline:
if len(self._stderr_tail) == last_seen:
await asyncio.sleep(0.02)
if len(self._stderr_tail) == last_seen:
break
last_seen = len(self._stderr_tail)
await asyncio.sleep(0.02)
return "\n".join(self._stderr_tail)
The drainer task itself appends to a bounded tail (max 50 lines) AND logs.
On failure we wait briefly for the drainer to flush, then return whatever
landed. Test:
test_transport_local_stderr.py.
ConnectionPool — many hosts, one connection each¶
Thin wrapper around AgentConnection:
class ConnectionPool:
def __init__(self, config: Config, agent_source: str): ...
async def call(self, host: str, method: str, params=None) -> dict[str, Any]:
conn = await self._get(host)
return await conn.call(method, params)
async def _get(self, host: str) -> AgentConnection:
async with self._pool_lock:
if host not in self._connections:
cfg = self._config.host(host)
argv = build_argv(cfg, self._agent_source) # SSH or local
self._connections[host] = AgentConnection(host, argv)
return self._connections[host]
Two locks: the pool lock guards the connection-map, each per-host
AgentConnection has its own lock guarding its subprocess. Different hosts
make calls concurrently; same host serialises.
What to read next¶
→ The MCP server — how the layer above turns FastMCP tool
calls into pool.call(host, method, params).