vacant.protocol¶
P6 protocol — A2A / MCP envelope, dispatch (direct vacant-to-vacant,
not routed through registry), per-pair envelope chain replay
protection, capability cards, MCP server adapter, and vacant serve
under FastAPI / uvicorn.
envelope
¶
VacantEnvelope — A2A-compatible message wrapper with per-pair chain.
Each envelope carries:
from_vacant_id/to_vacant_id: the call's endpoints (Ed25519 ids).sequence_no: per-pair monotonic counter starting at 1.timestamp: UTC datetime of issuance.prev_envelope_hash: SHA-equivalent (BLAKE2b) of the prior envelope on this(from, to)pair. The first envelope on a pair usesEMPTY_PREV_HASH(32 zero bytes).payload: anA2AMessagecarrying the actual request/response.signature: Ed25519 signature oversigning_payload().
The chain is per-pair (D009 §B): unlike P0 logbooks (per-vacant) or P4 events (global), the envelope chain links call-level interactions between two specific vacants, used by replay protection.
A2A wire format: to_a2a_jsonrpc(envelope) produces a JSON-RPC 2.0
message/send request whose params.message.metadata["urn:vacant:v1"]
carries the envelope's signature, sequence_no, prev_hash, and
caller/callee ids; from_a2a_jsonrpc(payload) parses + verifies in
the reverse direction.
A2A_VACANT_METADATA_KEY
module-attribute
¶
A2A metadata key under which Vacant envelope fields are mounted (P6 §3.2).
A2APart
¶
Bases: BaseModel
A single part inside an A2A message/send payload.
For MVP we ship text parts only (D009 §F); image/audio/file parts
are reserved field-shape-wise but not implemented.
A2AMessage
¶
Bases: BaseModel
A2A message/send payload (extracted shape, MVP subset).
VacantEnvelope
¶
Bases: BaseModel
A signed A2A message exchanged directly between two vacants.
signing_dict
¶
Canonical dict over which the envelope is signed and hashed.
Excludes signature (the field being computed) but includes
every other field — including prev_envelope_hash and
sequence_no so an attacker can't change either after issuance.
Source code in src/vacant/protocol/envelope.py
compute_hash
¶
BLAKE2b of signing_payload() — used as the next envelope's
prev_envelope_hash on this pair.
signed
¶
signed(signing_key: SigningKey) -> VacantEnvelope
Return a copy with signature produced by signing_key.
The caller is responsible for ensuring signing_key corresponds
to from_vacant_id's pubkey; the envelope's verify(pubkey)
re-checks at the receiving end.
Source code in src/vacant/protocol/envelope.py
verify
¶
True iff signature is a valid Ed25519 sig over
signing_payload() for pubkey.
Source code in src/vacant/protocol/envelope.py
to_a2a_jsonrpc
¶
to_a2a_jsonrpc(env: VacantEnvelope) -> dict[str, Any]
Encode env as an A2A JSON-RPC 2.0 message/send request.
The Vacant-specific fields (caller_signature, sequence_no, prev_hash,
idempotency_key) are mounted under
params.message.metadata[A2A_VACANT_METADATA_KEY] per P6 §3.2.
Source code in src/vacant/protocol/envelope.py
from_a2a_jsonrpc
¶
from_a2a_jsonrpc(body: dict[str, Any]) -> VacantEnvelope
Parse an A2A JSON-RPC body into a VacantEnvelope.
Raises EnvelopeFormatError on missing/invalid fields. Does not
verify the signature — callers should call verify_or_raise on the
returned envelope.
Source code in src/vacant/protocol/envelope.py
dispatch
¶
Outgoing call dispatch.
call_capability(query, requester, ...):
- Look up via the registry's
aggregation.search_capability(query)(excludes LOCAL by default). - (Optionally) score with a
ReputationOracleand pick the UCB winner. - Build a
VacantEnvelope, sign with the requester's key, POST direct tocard.endpoint. The registry is never POSTed through.
call_local(target_card, requester, ...): bypass discovery and post
directly to a known target_card.endpoint — for owner / parent direct
paths against LOCAL-visibility vacants.
DispatchResult
¶
DispatchResult(*, request_envelope: VacantEnvelope, response_envelope: VacantEnvelope, target: CapabilityCard)
Result of a successful dispatch.
Source code in src/vacant/protocol/dispatch.py
build_envelope
¶
build_envelope(*, from_vid: VacantId, to_vid: VacantId, payload: A2AMessage, sequence_no: int = 1, prev_envelope_hash: bytes = EMPTY_PREV_HASH, idempotency_key: str | None = None, timestamp: datetime | None = None, signing_key: SigningKey) -> VacantEnvelope
Build + sign a VacantEnvelope for direct dispatch.
Source code in src/vacant/protocol/dispatch.py
call_capability
async
¶
call_capability(query: str, *, requester: ResidentForm, requester_signing_key: SigningKey, payload: A2AMessage, transport: DispatchTransport, aggregation_search: Callable[..., Awaitable[list[Any]]] | None = None, reputation_oracle: Any | None = None, sequence_no: int = 1, prev_envelope_hash: bytes = EMPTY_PREV_HASH, caller_response_replay_store: ReplayStore | None = None) -> DispatchResult
Discover + call a remote vacant offering query.
aggregation_search is the registry's
vacant.registry.aggregation.search_capability (or a test stub
matching the same signature). The function is kept abstract so
P6 doesn't hard-import P4 — making P6 unit tests independent of
the registry stack.
reputation_oracle.score(vacant_hex, dims) is consulted to pick the
UCB winner if provided; otherwise the first match is used.
The registry is queried for discovery only; the call goes directly
to card.endpoint via transport. No registry write endpoint is
invoked from this path (D009 §C, dispatch acceptance).
Source code in src/vacant/protocol/dispatch.py
call_local
async
¶
call_local(*, target_card: CapabilityCard, requester: ResidentForm, requester_signing_key: SigningKey, payload: A2AMessage, transport: DispatchTransport, sequence_no: int = 1, prev_envelope_hash: bytes = EMPTY_PREV_HASH, caller_response_replay_store: ReplayStore | None = None) -> DispatchResult
Direct call against a known capability card. Used by owner / parent paths to reach LOCAL-visibility vacants the public lookup excludes.
caller_response_replay_store (Pfix3 B6): when provided, the
incoming response envelope is run through check_and_advance on
the (target → requester) chain, so responses can be checked for
replay / out-of-order / chain-fork on the caller side. Default
None keeps existing in-process tests (which use synthetic
transports that don't track response chains) green.
Source code in src/vacant/protocol/dispatch.py
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | |
make_httpx_transport
¶
Build a transport callable that POSTs JSON-RPC via httpx.
Imports httpx lazily so the module is testable without a network
stack — tests pass a custom DispatchTransport callable instead.
Source code in src/vacant/protocol/dispatch.py
capability_card
¶
Capability card serialization + halo_version forward-compat gate.
serialize / deserialize produce / consume canonical JSON for halo
emission. Both halt loudly via UnsupportedHaloVersionError when a
deserialized card carries a halo_version this build does not recognise
— this is the forward-compat hook for future halo schema upgrades.
serialize
¶
serialize(card: CapabilityCard) -> bytes
Canonical JSON bytes for card. Sorted keys + tight separators
so the same card always serialises to identical bytes (cross-check
against card.signing_payload() for signature stability).
Source code in src/vacant/protocol/capability_card.py
deserialize
¶
deserialize(blob: bytes) -> CapabilityCard
Inverse of serialize. Raises:
UnsupportedHaloVersionErrorifhalo_versionis outside the[MIN, MAX]supported range.EnvelopeFormatErroron shape / decode errors.
Source code in src/vacant/protocol/capability_card.py
replay_protect
¶
Replay protection — per-pair sequence + chain-tip tracking.
P6 §6 / dispatch §6: every (from_vacant_id, to_vacant_id) pair has its
own monotonic sequence_no counter and chain_tip (last envelope's
hash). An incoming envelope is rejected if:
sequence_no <= last_seen[(from, to)], ORprev_envelope_hash != stored_chain_tip[(from, to)].
A new pair starts at sequence_no = 1 and chain_tip = EMPTY_PREV_HASH.
Race protection (F-C). The MVP previously stored one row per (from,
to) pair with last_sequence_no updated in place, plus an in-process
asyncio.Lock. Under multi-worker deployment two workers could both
read the same last_sequence_no = N, both pass the monotonicity
check, and both try to advance to N + 1. The fix: store one row
per accepted envelope, with composite primary key
(from_vid_hex, to_vid_hex, sequence_no). Concurrent writes claiming
the same triple collide on the PK at INSERT time and surface as
IntegrityError, which the store re-raises as
ReplayDetectedError. The "current state" of a pair is simply the
row with the largest sequence_no for that pair.
PairKey
dataclass
¶
Unordered (sender, recipient) pair key for the replay store.
ReplayState
dataclass
¶
ReplayStore
¶
Bases: Protocol
Backend contract. Both impls must be safe under concurrent writes.
check_and_advance
async
¶
check_and_advance(env: VacantEnvelope) -> None
Advance the per-pair state for env, raising
ReplayDetectedError / ChainForkError on rejection.
InMemoryReplayStore
¶
Reference impl backed by a dict. Used by tests + demo orchestrator.
Not durable; not shared across processes. The SqliteReplayStore
wraps the same contract over SQLAlchemy.
Source code in src/vacant/protocol/replay_protect.py
seed
¶
seed(key: PairKey, state: ReplayState) -> None
Pre-load the per-pair state from disk / another snapshot.
Used by the CLI (Pfix3 B6) to rehydrate the response chain on
vacant call so a target's reply seq=N+1 is recognised after
a process restart. Synchronous (no lock): callers must seed
before the store sees concurrent traffic.
Source code in src/vacant/protocol/replay_protect.py
SqliteReplayStore
¶
SQLAlchemy/aiosqlite-backed replay store with PK-enforced uniqueness.
Source code in src/vacant/protocol/replay_protect.py
get
async
¶
get(key: PairKey) -> ReplayState
Read the latest row for a pair (largest sequence_no).
The PK already guarantees no duplicate (from, to, seq) so the
ordering is well-defined. New pairs return the empty state.
Source code in src/vacant/protocol/replay_protect.py
check_and_advance
async
¶
check_and_advance(env: VacantEnvelope) -> None
Validate the envelope and atomically record its acceptance.
The fast-path check uses the in-process _lock to avoid wasted
work; the load-bearing race defense is the PK uniqueness on
(from, to, sequence_no). If two workers (or two coroutines
that both hold their own copy of _lock) both pass _check
and both try to insert the same triple, the second INSERT
raises IntegrityError and we surface it as
ReplayDetectedError.
Source code in src/vacant/protocol/replay_protect.py
check_envelope
async
¶
check_envelope(store: ReplayStore, env: VacantEnvelope) -> None
serve
¶
Incoming-call serve: FastAPI router mounted at /a2a (and /mcp).
Per dispatch §4 the inbound flow is:
- Verify envelope signature against
from_vacant_id's pubkey. - Check
state_machine.can_be_called(my_state)— reject SUNK/ARCHIVED with 410 GONE and HIBERNATING/STALE with 423 LOCKED. - Verify
sequence_nomonotonicity for the(from, to)pair viareplay_protect.ReplayStore. - Hand the payload to the vacant's
behavior_bundle(this is where the substrate runs). - Sign and return a response envelope; both directions advance the
per-pair envelope chain via
replay_protect.
The behavior parameter is a callable that takes a VacantEnvelope
and returns an A2AMessage. P7 demo wires this to a real substrate;
unit tests pass a lambda.
make_response_envelope
async
¶
make_response_envelope(*, request: VacantEnvelope, response_payload: A2AMessage, self_signing_key: SigningKey, response_replay_store: ReplayStore, self_form: ResidentForm) -> VacantEnvelope
Build the response envelope (vacant → caller).
Uses the (self → caller) direction of the per-pair chain (ours).
Source code in src/vacant/protocol/serve.py
build_a2a_router
¶
build_a2a_router(*, self_form: ResidentForm, self_signing_key: SigningKey, behavior: BehaviorHandler, replay_store: ReplayStore, state_provider: Callable[[], VacantState] | None = None, prefix: str = '/a2a') -> APIRouter
Build a FastAPI router serving inbound A2A message/send requests.
state_provider (defaults to lambda: self_form.runtime_state)
determines whether the vacant accepts the call:
- SUNK / ARCHIVED → 410 GONE
- HIBERNATING / STALE → 423 LOCKED
- LOCAL / ACTIVE → accepted
Source code in src/vacant/protocol/serve.py
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | |
build_a2a_app
¶
build_a2a_app(*, self_form: ResidentForm, self_signing_key: SigningKey, behavior: BehaviorHandler, replay_store: ReplayStore, state_provider: Callable[[], VacantState] | None = None) -> FastAPI
Convenience: a FastAPI app with the A2A router mounted.
Source code in src/vacant/protocol/serve.py
mcp_adapter
¶
MCP bridge adapters (P6 §3.4 / D009 §G).
Two adapters:
VacantAsMCPServer: wraps a serving vacant + behaviour callback so existing MCP-aware clients (Claude Code, OpenClaw plugin, etc.) can call it via the standardtools/list,tools/callshape. The internal flow is identical toserve.py's/a2a/message/send— same envelope verification, same replay protection.MCPClientSubstrate: lets a vacant call out to an MCP server as part of its behaviour. Implements the P0SubstrateBackendcontract.
The full MCP wire protocol is not re-implemented here (would require
pulling in the MCP SDK). Both adapters take a small transport
callable that the caller wires to a real MCP runtime when needed; for
unit tests we pass an in-process function. P7 demo will swap in the
real transport.
VacantAsMCPServer
dataclass
¶
VacantAsMCPServer(self_form: ResidentForm, self_signing_key: SigningKey, behavior: BehaviorHandler, replay_store: ReplayStore)
Expose a vacant's capabilities as MCP tools.
Tools:
vacant_call: dispatches into the vacant's behaviour through the standard envelope path (signature verify + replay protect).vacant_describe: returns the vacant's capability text + halo version (reads from the vacant's signed capability card).
The MCP transport (real wire protocol) is wired by the caller. This adapter is the bridge — what runs inside the MCP server when an MCP client calls a tool.
list_tools
¶
Mirror P6 §3.4 tools/list.
Source code in src/vacant/protocol/mcp_adapter.py
call_tool
async
¶
Dispatch an MCP tools/call.
Source code in src/vacant/protocol/mcp_adapter.py
MCPClientSubstrate
dataclass
¶
Bases: SubstrateBackend
A SubstrateBackend that calls an MCP server tool as inference.
transport(server_url, body) -> dict is wired by the caller.
tool_name is the MCP tool to invoke; the substrate forwards
req.system_prompt + req.user_prompt as a params.message-shaped
JSON-RPC request to the server.
errors
¶
Error hierarchy for vacant.protocol.
EnvelopeSignatureError
¶
Bases: ProtocolError
Envelope signature failed to verify against the claimed sender.
EnvelopeFormatError
¶
Bases: ProtocolError
Envelope is malformed (wrong shape, missing fields, etc.).
UnsupportedHaloVersionError
¶
Bases: ProtocolError
Capability card halo_version is unknown to this build.
ReplayDetectedError
¶
Bases: ProtocolError
An incoming envelope was a replay (sequence_no <= last seen).
ChainForkError
¶
Bases: ProtocolError
An incoming envelope's prev_envelope_hash does not match the stored per-pair chain tip.
TargetUnavailableError
¶
Bases: ProtocolError
Target vacant cannot accept calls (SUNK / ARCHIVED / HIBERNATING).
TargetNotFoundError
¶
Bases: ProtocolError
Target vacant has no capability card / no endpoint.