Skip to content

vacant.protocol

P6 protocol — A2A / MCP envelope, dispatch (direct vacant-to-vacant, not routed through registry), per-pair envelope chain replay protection, capability cards, MCP server adapter, and vacant serve under FastAPI / uvicorn.

envelope

VacantEnvelope — A2A-compatible message wrapper with per-pair chain.

Each envelope carries:

  • from_vacant_id / to_vacant_id: the call's endpoints (Ed25519 ids).
  • sequence_no: per-pair monotonic counter starting at 1.
  • timestamp: UTC datetime of issuance.
  • prev_envelope_hash: SHA-equivalent (BLAKE2b) of the prior envelope on this (from, to) pair. The first envelope on a pair uses EMPTY_PREV_HASH (32 zero bytes).
  • payload: an A2AMessage carrying the actual request/response.
  • signature: Ed25519 signature over signing_payload().

The chain is per-pair (D009 §B): unlike P0 logbooks (per-vacant) or P4 events (global), the envelope chain links call-level interactions between two specific vacants, used by replay protection.

A2A wire format: to_a2a_jsonrpc(envelope) produces a JSON-RPC 2.0 message/send request whose params.message.metadata["urn:vacant:v1"] carries the envelope's signature, sequence_no, prev_hash, and caller/callee ids; from_a2a_jsonrpc(payload) parses + verifies in the reverse direction.

A2A_VACANT_METADATA_KEY module-attribute

A2A_VACANT_METADATA_KEY: Final[str] = 'urn:vacant:v1'

A2A metadata key under which Vacant envelope fields are mounted (P6 §3.2).

A2APart

Bases: BaseModel

A single part inside an A2A message/send payload.

For MVP we ship text parts only (D009 §F); image/audio/file parts are reserved field-shape-wise but not implemented.

A2AMessage

Bases: BaseModel

A2A message/send payload (extracted shape, MVP subset).

VacantEnvelope

Bases: BaseModel

A signed A2A message exchanged directly between two vacants.

signing_dict

signing_dict() -> dict[str, Any]

Canonical dict over which the envelope is signed and hashed.

Excludes signature (the field being computed) but includes every other field — including prev_envelope_hash and sequence_no so an attacker can't change either after issuance.

Source code in src/vacant/protocol/envelope.py
def signing_dict(self) -> dict[str, Any]:
    """Canonical dict over which the envelope is signed and hashed.

    Excludes `signature` (the field being computed) but includes
    every other field — including `prev_envelope_hash` and
    `sequence_no` so an attacker can't change either after issuance.
    """
    return {
        "from": self.from_vacant_id.hex(),
        "to": self.to_vacant_id.hex(),
        "seq": self.sequence_no,
        "ts": _utc_iso(self.timestamp),
        "prev": self.prev_envelope_hash.hex(),
        "idem": self.idempotency_key,
        "payload": self.payload.canonical_dict(),
    }

compute_hash

compute_hash() -> bytes

BLAKE2b of signing_payload() — used as the next envelope's prev_envelope_hash on this pair.

Source code in src/vacant/protocol/envelope.py
def compute_hash(self) -> bytes:
    """BLAKE2b of `signing_payload()` — used as the next envelope's
    `prev_envelope_hash` on this pair."""
    return hash_blake2b(self.signing_payload())

signed

signed(signing_key: SigningKey) -> VacantEnvelope

Return a copy with signature produced by signing_key.

The caller is responsible for ensuring signing_key corresponds to from_vacant_id's pubkey; the envelope's verify(pubkey) re-checks at the receiving end.

Source code in src/vacant/protocol/envelope.py
def signed(self, signing_key: SigningKey) -> VacantEnvelope:
    """Return a copy with `signature` produced by `signing_key`.

    The caller is responsible for ensuring `signing_key` corresponds
    to `from_vacant_id`'s pubkey; the envelope's `verify(pubkey)`
    re-checks at the receiving end.
    """
    sig = sign(signing_key, self.signing_payload())
    return self.model_copy(update={"signature": sig})

verify

verify(pubkey: VerifyKey) -> bool

True iff signature is a valid Ed25519 sig over signing_payload() for pubkey.

Source code in src/vacant/protocol/envelope.py
def verify(self, pubkey: VerifyKey) -> bool:
    """True iff `signature` is a valid Ed25519 sig over
    `signing_payload()` for `pubkey`."""
    if not self.signature:
        return False
    return verify(pubkey, self.signing_payload(), self.signature)

to_a2a_jsonrpc

to_a2a_jsonrpc(env: VacantEnvelope) -> dict[str, Any]

Encode env as an A2A JSON-RPC 2.0 message/send request.

The Vacant-specific fields (caller_signature, sequence_no, prev_hash, idempotency_key) are mounted under params.message.metadata[A2A_VACANT_METADATA_KEY] per P6 §3.2.

Source code in src/vacant/protocol/envelope.py
def to_a2a_jsonrpc(env: VacantEnvelope) -> dict[str, Any]:
    """Encode `env` as an A2A JSON-RPC 2.0 `message/send` request.

    The Vacant-specific fields (caller_signature, sequence_no, prev_hash,
    idempotency_key) are mounted under
    `params.message.metadata[A2A_VACANT_METADATA_KEY]` per P6 §3.2.
    """
    return {
        "jsonrpc": "2.0",
        "id": env.idempotency_key or env.compute_hash().hex(),
        "method": "message/send",
        "params": {
            "message": {
                "role": env.payload.role,
                "parts": [p.canonical_dict() for p in env.payload.parts],
                "contextId": env.payload.context_id,
                "messageId": env.payload.message_id,
                "metadata": {
                    A2A_VACANT_METADATA_KEY: {
                        "from_vacant_id": env.from_vacant_id.hex(),
                        "to_vacant_id": env.to_vacant_id.hex(),
                        "sequence_no": env.sequence_no,
                        "timestamp": _utc_iso(env.timestamp),
                        "prev_envelope_hash": env.prev_envelope_hash.hex(),
                        "idempotency_key": env.idempotency_key,
                        "caller_signature": env.signature.hex(),
                    },
                },
            },
        },
    }

from_a2a_jsonrpc

from_a2a_jsonrpc(body: dict[str, Any]) -> VacantEnvelope

Parse an A2A JSON-RPC body into a VacantEnvelope.

Raises EnvelopeFormatError on missing/invalid fields. Does not verify the signature — callers should call verify_or_raise on the returned envelope.

Source code in src/vacant/protocol/envelope.py
def from_a2a_jsonrpc(body: dict[str, Any]) -> VacantEnvelope:
    """Parse an A2A JSON-RPC body into a `VacantEnvelope`.

    Raises `EnvelopeFormatError` on missing/invalid fields. Does *not*
    verify the signature — callers should call `verify_or_raise` on the
    returned envelope.
    """
    try:
        msg = body["params"]["message"]
        meta = msg["metadata"][A2A_VACANT_METADATA_KEY]
    except KeyError as exc:
        raise EnvelopeFormatError(f"missing field: {exc}") from exc

    try:
        from_id = VacantId(pubkey_bytes=bytes.fromhex(meta["from_vacant_id"]))
        to_id = VacantId(pubkey_bytes=bytes.fromhex(meta["to_vacant_id"]))
        prev = bytes.fromhex(meta["prev_envelope_hash"])
        sig = bytes.fromhex(meta["caller_signature"])
        ts = datetime.fromisoformat(meta["timestamp"])
        seq = int(meta["sequence_no"])
        idem = str(meta.get("idempotency_key", ""))
    except (ValueError, KeyError, TypeError) as exc:
        raise EnvelopeFormatError(f"invalid metadata: {exc}") from exc

    parts = [A2APart(**p) for p in msg.get("parts", [])]
    payload = A2AMessage(
        role=msg.get("role", "ROLE_USER"),
        parts=parts,
        context_id=msg.get("contextId"),
        message_id=msg.get("messageId"),
    )

    return VacantEnvelope(
        from_vacant_id=from_id,
        to_vacant_id=to_id,
        sequence_no=seq,
        timestamp=ts,
        prev_envelope_hash=prev,
        payload=payload,
        idempotency_key=idem,
        signature=sig,
    )

dispatch

Outgoing call dispatch.

call_capability(query, requester, ...):

  1. Look up via the registry's aggregation.search_capability(query) (excludes LOCAL by default).
  2. (Optionally) score with a ReputationOracle and pick the UCB winner.
  3. Build a VacantEnvelope, sign with the requester's key, POST direct to card.endpoint. The registry is never POSTed through.

call_local(target_card, requester, ...): bypass discovery and post directly to a known target_card.endpoint — for owner / parent direct paths against LOCAL-visibility vacants.

DispatchResult

DispatchResult(*, request_envelope: VacantEnvelope, response_envelope: VacantEnvelope, target: CapabilityCard)

Result of a successful dispatch.

Source code in src/vacant/protocol/dispatch.py
def __init__(
    self,
    *,
    request_envelope: VacantEnvelope,
    response_envelope: VacantEnvelope,
    target: CapabilityCard,
) -> None:
    self.request_envelope = request_envelope
    self.response_envelope = response_envelope
    self.target = target

build_envelope

build_envelope(*, from_vid: VacantId, to_vid: VacantId, payload: A2AMessage, sequence_no: int = 1, prev_envelope_hash: bytes = EMPTY_PREV_HASH, idempotency_key: str | None = None, timestamp: datetime | None = None, signing_key: SigningKey) -> VacantEnvelope

Build + sign a VacantEnvelope for direct dispatch.

Source code in src/vacant/protocol/dispatch.py
def build_envelope(
    *,
    from_vid: VacantId,
    to_vid: VacantId,
    payload: A2AMessage,
    sequence_no: int = 1,
    prev_envelope_hash: bytes = EMPTY_PREV_HASH,
    idempotency_key: str | None = None,
    timestamp: datetime | None = None,
    signing_key: SigningKey,
) -> VacantEnvelope:
    """Build + sign a `VacantEnvelope` for direct dispatch."""
    return VacantEnvelope(
        from_vacant_id=from_vid,
        to_vacant_id=to_vid,
        sequence_no=sequence_no,
        timestamp=timestamp or datetime.now(UTC),
        prev_envelope_hash=prev_envelope_hash,
        payload=payload,
        idempotency_key=idempotency_key or str(uuid.uuid4()),
    ).signed(signing_key)

call_capability async

call_capability(query: str, *, requester: ResidentForm, requester_signing_key: SigningKey, payload: A2AMessage, transport: DispatchTransport, aggregation_search: Callable[..., Awaitable[list[Any]]] | None = None, reputation_oracle: Any | None = None, sequence_no: int = 1, prev_envelope_hash: bytes = EMPTY_PREV_HASH, caller_response_replay_store: ReplayStore | None = None) -> DispatchResult

Discover + call a remote vacant offering query.

aggregation_search is the registry's vacant.registry.aggregation.search_capability (or a test stub matching the same signature). The function is kept abstract so P6 doesn't hard-import P4 — making P6 unit tests independent of the registry stack.

reputation_oracle.score(vacant_hex, dims) is consulted to pick the UCB winner if provided; otherwise the first match is used.

The registry is queried for discovery only; the call goes directly to card.endpoint via transport. No registry write endpoint is invoked from this path (D009 §C, dispatch acceptance).

Source code in src/vacant/protocol/dispatch.py
async def call_capability(
    query: str,
    *,
    requester: ResidentForm,
    requester_signing_key: SigningKey,
    payload: A2AMessage,
    transport: DispatchTransport,
    aggregation_search: Callable[..., Awaitable[list[Any]]] | None = None,
    reputation_oracle: Any | None = None,
    sequence_no: int = 1,
    prev_envelope_hash: bytes = EMPTY_PREV_HASH,
    caller_response_replay_store: ReplayStore | None = None,
) -> DispatchResult:
    """Discover + call a remote vacant offering `query`.

    `aggregation_search` is the registry's
    `vacant.registry.aggregation.search_capability` (or a test stub
    matching the same signature). The function is *kept abstract* so
    P6 doesn't hard-import P4 — making P6 unit tests independent of
    the registry stack.

    `reputation_oracle.score(vacant_hex, dims)` is consulted to pick the
    UCB winner if provided; otherwise the first match is used.

    The registry is queried for discovery only; the call goes directly
    to `card.endpoint` via `transport`. **No registry write endpoint is
    invoked from this path** (D009 §C, dispatch acceptance).
    """
    if aggregation_search is None:
        raise TargetNotFoundError("call_capability: aggregation_search is required for discovery")
    matches = await aggregation_search(query=query, include_local=False, limit=20)
    matches = [m for m in matches if _match_endpoint(m)]
    if not matches:
        raise TargetNotFoundError(f"no public vacant offers capability {query!r}")

    chosen = await _pick_winner(matches, reputation_oracle)
    target_card = _match_to_card(chosen)
    return await call_local(
        target_card=target_card,
        requester=requester,
        requester_signing_key=requester_signing_key,
        payload=payload,
        transport=transport,
        sequence_no=sequence_no,
        prev_envelope_hash=prev_envelope_hash,
        caller_response_replay_store=caller_response_replay_store,
    )

call_local async

call_local(*, target_card: CapabilityCard, requester: ResidentForm, requester_signing_key: SigningKey, payload: A2AMessage, transport: DispatchTransport, sequence_no: int = 1, prev_envelope_hash: bytes = EMPTY_PREV_HASH, caller_response_replay_store: ReplayStore | None = None) -> DispatchResult

Direct call against a known capability card. Used by owner / parent paths to reach LOCAL-visibility vacants the public lookup excludes.

caller_response_replay_store (Pfix3 B6): when provided, the incoming response envelope is run through check_and_advance on the (target → requester) chain, so responses can be checked for replay / out-of-order / chain-fork on the caller side. Default None keeps existing in-process tests (which use synthetic transports that don't track response chains) green.

Source code in src/vacant/protocol/dispatch.py
async def call_local(
    *,
    target_card: CapabilityCard,
    requester: ResidentForm,
    requester_signing_key: SigningKey,
    payload: A2AMessage,
    transport: DispatchTransport,
    sequence_no: int = 1,
    prev_envelope_hash: bytes = EMPTY_PREV_HASH,
    caller_response_replay_store: ReplayStore | None = None,
) -> DispatchResult:
    """Direct call against a known capability card. Used by owner /
    parent paths to reach LOCAL-visibility vacants the public lookup
    excludes.

    ``caller_response_replay_store`` (Pfix3 B6): when provided, the
    incoming response envelope is run through ``check_and_advance`` on
    the ``(target → requester)`` chain, so responses can be checked for
    replay / out-of-order / chain-fork on the caller side. Default
    ``None`` keeps existing in-process tests (which use synthetic
    transports that don't track response chains) green.
    """
    if not target_card.endpoint:
        raise TargetNotFoundError(f"target {target_card.vacant_id.short()} has no endpoint URL")
    if not target_card.verify():
        raise EnvelopeSignatureError(
            f"target capability card for {target_card.vacant_id.short()} does not verify"
        )

    request = build_envelope(
        from_vid=requester.identity,
        to_vid=target_card.vacant_id,
        payload=payload,
        sequence_no=sequence_no,
        prev_envelope_hash=prev_envelope_hash,
        signing_key=requester_signing_key,
    )

    body = to_a2a_jsonrpc(request)
    response_body = await transport(target_card.endpoint, body)

    try:
        result = response_body["result"]
        response_message = result["message"]
        wrapped = {
            "jsonrpc": "2.0",
            "id": "rsp",
            "method": "message/send",
            "params": {"message": response_message},
        }
    except (KeyError, TypeError) as exc:
        raise EnvelopeFormatError(f"transport response is not a valid A2A result: {exc}") from exc

    response_env = from_a2a_jsonrpc(wrapped)
    response_env.verify_or_raise(target_card.vacant_id.verify_key())

    # Pfix3 B6: caller-side response validation. The signature check
    # above only proves "someone with target's key signed this"; the
    # routing checks below prove "this response is on our (target → me)
    # chain", and the replay store catches duplicate / out-of-order /
    # forked responses.
    if response_env.from_vacant_id != target_card.vacant_id:
        raise EnvelopeFormatError(
            "response envelope from_vacant_id "
            f"{response_env.from_vacant_id.short()} != target "
            f"{target_card.vacant_id.short()}"
        )
    if response_env.to_vacant_id != requester.identity:
        raise EnvelopeFormatError(
            "response envelope to_vacant_id "
            f"{response_env.to_vacant_id.short()} != requester "
            f"{requester.identity.short()}"
        )
    if caller_response_replay_store is not None:
        await caller_response_replay_store.check_and_advance(response_env)

    return DispatchResult(
        request_envelope=request,
        response_envelope=response_env,
        target=target_card,
    )

make_httpx_transport

make_httpx_transport(*, timeout: float = 60.0) -> DispatchTransport

Build a transport callable that POSTs JSON-RPC via httpx.

Imports httpx lazily so the module is testable without a network stack — tests pass a custom DispatchTransport callable instead.

Source code in src/vacant/protocol/dispatch.py
def make_httpx_transport(
    *,
    timeout: float = 60.0,
) -> DispatchTransport:
    """Build a transport callable that POSTs JSON-RPC via httpx.

    Imports httpx lazily so the module is testable without a network
    stack — tests pass a custom `DispatchTransport` callable instead.
    """
    import httpx

    async def _transport(url: str, body: dict[str, Any]) -> dict[str, Any]:
        async with httpx.AsyncClient(timeout=timeout) as client:
            r = await client.post(url, json=body)
            r.raise_for_status()
            data: dict[str, Any] = r.json()
            return data

    return _transport

capability_card

Capability card serialization + halo_version forward-compat gate.

serialize / deserialize produce / consume canonical JSON for halo emission. Both halt loudly via UnsupportedHaloVersionError when a deserialized card carries a halo_version this build does not recognise — this is the forward-compat hook for future halo schema upgrades.

serialize

serialize(card: CapabilityCard) -> bytes

Canonical JSON bytes for card. Sorted keys + tight separators so the same card always serialises to identical bytes (cross-check against card.signing_payload() for signature stability).

Source code in src/vacant/protocol/capability_card.py
def serialize(card: CapabilityCard) -> bytes:
    """Canonical JSON bytes for `card`. Sorted keys + tight separators
    so the same card always serialises to identical bytes (cross-check
    against `card.signing_payload()` for signature stability)."""
    return json.dumps(
        _to_dict(card), sort_keys=True, separators=(",", ":"), ensure_ascii=False
    ).encode("utf-8")

deserialize

deserialize(blob: bytes) -> CapabilityCard

Inverse of serialize. Raises:

  • UnsupportedHaloVersionError if halo_version is outside the [MIN, MAX] supported range.
  • EnvelopeFormatError on shape / decode errors.
Source code in src/vacant/protocol/capability_card.py
def deserialize(blob: bytes) -> CapabilityCard:
    """Inverse of `serialize`. Raises:

    - `UnsupportedHaloVersionError` if `halo_version` is outside the
      `[MIN, MAX]` supported range.
    - `EnvelopeFormatError` on shape / decode errors.
    """
    try:
        obj = json.loads(blob.decode("utf-8"))
    except (UnicodeDecodeError, json.JSONDecodeError) as exc:
        raise EnvelopeFormatError(f"capability card not valid JSON: {exc}") from exc
    if not isinstance(obj, dict):
        raise EnvelopeFormatError("capability card must be a JSON object")
    try:
        version = int(obj.get("halo_version", DEFAULT_HALO_VERSION))
    except (ValueError, TypeError) as exc:
        raise EnvelopeFormatError(
            f"capability card halo_version must be int; got {obj.get('halo_version')!r}"
        ) from exc
    if not (MIN_SUPPORTED_HALO_VERSION <= version <= MAX_SUPPORTED_HALO_VERSION):
        raise UnsupportedHaloVersionError(
            f"halo_version {version} not in [{MIN_SUPPORTED_HALO_VERSION}, "
            f"{MAX_SUPPORTED_HALO_VERSION}]"
        )
    try:
        spec = obj.get("substrate_spec", {}) or {}
        substrate_spec = SubstrateSpec(
            allowed_substrates=list(spec.get("allowed_substrates", [])),
            policy=dict(spec.get("policy", {})),
        )
        return CapabilityCard(
            vacant_id=VacantId(pubkey_bytes=bytes.fromhex(obj["vacant_id"])),
            capability_text=str(obj["capability_text"]),
            substrate_spec=substrate_spec,
            halo_version=version,
            endpoint=obj.get("endpoint"),
            signature=bytes.fromhex(obj.get("signature", "")),
        )
    except (KeyError, ValueError, TypeError) as exc:
        raise EnvelopeFormatError(f"invalid capability card: {exc}") from exc

replay_protect

Replay protection — per-pair sequence + chain-tip tracking.

P6 §6 / dispatch §6: every (from_vacant_id, to_vacant_id) pair has its own monotonic sequence_no counter and chain_tip (last envelope's hash). An incoming envelope is rejected if:

  • sequence_no <= last_seen[(from, to)], OR
  • prev_envelope_hash != stored_chain_tip[(from, to)].

A new pair starts at sequence_no = 1 and chain_tip = EMPTY_PREV_HASH.

Race protection (F-C). The MVP previously stored one row per (from, to) pair with last_sequence_no updated in place, plus an in-process asyncio.Lock. Under multi-worker deployment two workers could both read the same last_sequence_no = N, both pass the monotonicity check, and both try to advance to N + 1. The fix: store one row per accepted envelope, with composite primary key (from_vid_hex, to_vid_hex, sequence_no). Concurrent writes claiming the same triple collide on the PK at INSERT time and surface as IntegrityError, which the store re-raises as ReplayDetectedError. The "current state" of a pair is simply the row with the largest sequence_no for that pair.

PairKey dataclass

PairKey(from_vid: VacantId, to_vid: VacantId)

Unordered (sender, recipient) pair key for the replay store.

ReplayState dataclass

ReplayState(last_sequence_no: int, chain_tip: bytes)

Replay store state for one pair.

last_sequence_no instance-attribute

last_sequence_no: int

Last accepted envelope's sequence_no on this pair (0 = none yet).

chain_tip instance-attribute

chain_tip: bytes

Hash of the last accepted envelope (or EMPTY_PREV_HASH for new pair).

ReplayStore

Bases: Protocol

Backend contract. Both impls must be safe under concurrent writes.

check_and_advance async

check_and_advance(env: VacantEnvelope) -> None

Advance the per-pair state for env, raising ReplayDetectedError / ChainForkError on rejection.

Source code in src/vacant/protocol/replay_protect.py
async def check_and_advance(
    self,
    env: VacantEnvelope,
) -> None:
    """Advance the per-pair state for `env`, raising
    `ReplayDetectedError` / `ChainForkError` on rejection."""
    ...

InMemoryReplayStore

InMemoryReplayStore()

Reference impl backed by a dict. Used by tests + demo orchestrator.

Not durable; not shared across processes. The SqliteReplayStore wraps the same contract over SQLAlchemy.

Source code in src/vacant/protocol/replay_protect.py
def __init__(self) -> None:
    self._state: dict[PairKey, ReplayState] = {}
    self._lock = asyncio.Lock()

seed

seed(key: PairKey, state: ReplayState) -> None

Pre-load the per-pair state from disk / another snapshot.

Used by the CLI (Pfix3 B6) to rehydrate the response chain on vacant call so a target's reply seq=N+1 is recognised after a process restart. Synchronous (no lock): callers must seed before the store sees concurrent traffic.

Source code in src/vacant/protocol/replay_protect.py
def seed(self, key: PairKey, state: ReplayState) -> None:
    """Pre-load the per-pair state from disk / another snapshot.

    Used by the CLI (Pfix3 B6) to rehydrate the response chain on
    ``vacant call`` so a target's reply seq=N+1 is recognised after
    a process restart. Synchronous (no lock): callers must seed
    before the store sees concurrent traffic.
    """
    self._state[key] = state

SqliteReplayStore

SqliteReplayStore(engine: AsyncEngine)

SQLAlchemy/aiosqlite-backed replay store with PK-enforced uniqueness.

Source code in src/vacant/protocol/replay_protect.py
def __init__(self, engine: AsyncEngine) -> None:
    self._engine = engine
    self._sm = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    self._lock = asyncio.Lock()

get async

get(key: PairKey) -> ReplayState

Read the latest row for a pair (largest sequence_no).

The PK already guarantees no duplicate (from, to, seq) so the ordering is well-defined. New pairs return the empty state.

Source code in src/vacant/protocol/replay_protect.py
async def get(self, key: PairKey) -> ReplayState:
    """Read the latest row for a pair (largest `sequence_no`).

    The PK already guarantees no duplicate `(from, to, seq)` so the
    ordering is well-defined. New pairs return the empty state.
    """
    async with self._sm() as s:
        row = await s.execute(
            select(_ReplayRow)
            .where(
                _ReplayRow.from_vid_hex == key.from_vid.hex(),
                _ReplayRow.to_vid_hex == key.to_vid.hex(),
            )
            .order_by(sa_desc(_ReplayRow.sequence_no))  # type: ignore[arg-type]
            .limit(1)
        )
        r = row.scalar_one_or_none()
        if r is None:
            return ReplayState(last_sequence_no=0, chain_tip=EMPTY_PREV_HASH)
        return ReplayState(last_sequence_no=r.sequence_no, chain_tip=r.chain_tip)

check_and_advance async

check_and_advance(env: VacantEnvelope) -> None

Validate the envelope and atomically record its acceptance.

The fast-path check uses the in-process _lock to avoid wasted work; the load-bearing race defense is the PK uniqueness on (from, to, sequence_no). If two workers (or two coroutines that both hold their own copy of _lock) both pass _check and both try to insert the same triple, the second INSERT raises IntegrityError and we surface it as ReplayDetectedError.

Source code in src/vacant/protocol/replay_protect.py
async def check_and_advance(self, env: VacantEnvelope) -> None:
    """Validate the envelope and atomically record its acceptance.

    The fast-path check uses the in-process `_lock` to avoid wasted
    work; the load-bearing race defense is the PK uniqueness on
    `(from, to, sequence_no)`. If two workers (or two coroutines
    that both hold their own copy of `_lock`) both pass `_check`
    and both try to insert the same triple, the second INSERT
    raises `IntegrityError` and we surface it as
    `ReplayDetectedError`.
    """
    key = PairKey.from_envelope(env)
    async with self._lock:
        cur = await self.get(key)
        _check(env, cur)
        row = _ReplayRow(
            from_vid_hex=key.from_vid.hex(),
            to_vid_hex=key.to_vid.hex(),
            sequence_no=env.sequence_no,
            chain_tip=env.compute_hash(),
        )
        async with self._sm() as s:
            s.add(row)
            try:
                await s.commit()
            except IntegrityError as exc:
                await s.rollback()
                raise ReplayDetectedError(
                    f"replay/race: envelope (from={key.from_vid.short()}, "
                    f"to={key.to_vid.short()}, seq={env.sequence_no}) "
                    "already accepted (PK collision; concurrent writer beat us)"
                ) from exc

check_envelope async

check_envelope(store: ReplayStore, env: VacantEnvelope) -> None

Convenience wrapper: delegate to store.check_and_advance.

Source code in src/vacant/protocol/replay_protect.py
async def check_envelope(store: ReplayStore, env: VacantEnvelope) -> None:
    """Convenience wrapper: delegate to `store.check_and_advance`."""
    await store.check_and_advance(env)

serve

Incoming-call serve: FastAPI router mounted at /a2a (and /mcp).

Per dispatch §4 the inbound flow is:

  1. Verify envelope signature against from_vacant_id's pubkey.
  2. Check state_machine.can_be_called(my_state) — reject SUNK/ARCHIVED with 410 GONE and HIBERNATING/STALE with 423 LOCKED.
  3. Verify sequence_no monotonicity for the (from, to) pair via replay_protect.ReplayStore.
  4. Hand the payload to the vacant's behavior_bundle (this is where the substrate runs).
  5. Sign and return a response envelope; both directions advance the per-pair envelope chain via replay_protect.

The behavior parameter is a callable that takes a VacantEnvelope and returns an A2AMessage. P7 demo wires this to a real substrate; unit tests pass a lambda.

make_response_envelope async

make_response_envelope(*, request: VacantEnvelope, response_payload: A2AMessage, self_signing_key: SigningKey, response_replay_store: ReplayStore, self_form: ResidentForm) -> VacantEnvelope

Build the response envelope (vacant → caller).

Uses the (self → caller) direction of the per-pair chain (ours).

Source code in src/vacant/protocol/serve.py
async def make_response_envelope(
    *,
    request: VacantEnvelope,
    response_payload: A2AMessage,
    self_signing_key: SigningKey,
    response_replay_store: ReplayStore,
    self_form: ResidentForm,
) -> VacantEnvelope:
    """Build the response envelope (vacant → caller).

    Uses the `(self → caller)` direction of the per-pair chain (ours).
    """
    inverse_key = PairKey(from_vid=self_form.identity, to_vid=request.from_vacant_id)
    cur = await response_replay_store.get(inverse_key)
    response = VacantEnvelope(
        from_vacant_id=self_form.identity,
        to_vacant_id=request.from_vacant_id,
        sequence_no=cur.last_sequence_no + 1,
        timestamp=datetime.now(UTC),
        prev_envelope_hash=cur.chain_tip if cur.last_sequence_no > 0 else EMPTY_PREV_HASH,
        payload=response_payload,
        idempotency_key=str(uuid.uuid4()),
    ).signed(self_signing_key)
    # Record the response so the next response on this pair chains correctly.
    await response_replay_store.check_and_advance(response)
    return response

build_a2a_router

build_a2a_router(*, self_form: ResidentForm, self_signing_key: SigningKey, behavior: BehaviorHandler, replay_store: ReplayStore, state_provider: Callable[[], VacantState] | None = None, prefix: str = '/a2a') -> APIRouter

Build a FastAPI router serving inbound A2A message/send requests.

state_provider (defaults to lambda: self_form.runtime_state) determines whether the vacant accepts the call:

  • SUNK / ARCHIVED → 410 GONE
  • HIBERNATING / STALE → 423 LOCKED
  • LOCAL / ACTIVE → accepted
Source code in src/vacant/protocol/serve.py
def build_a2a_router(
    *,
    self_form: ResidentForm,
    self_signing_key: SigningKey,
    behavior: BehaviorHandler,
    replay_store: ReplayStore,
    state_provider: Callable[[], VacantState] | None = None,
    prefix: str = "/a2a",
) -> APIRouter:
    """Build a FastAPI router serving inbound A2A `message/send` requests.

    `state_provider` (defaults to `lambda: self_form.runtime_state`)
    determines whether the vacant accepts the call:

    - SUNK / ARCHIVED → 410 GONE
    - HIBERNATING / STALE → 423 LOCKED
    - LOCAL / ACTIVE → accepted
    """
    router = APIRouter(prefix=prefix)
    state_fn = state_provider or (lambda: self_form.runtime_state)

    @router.post("/message/send")
    async def message_send(request: Request, body: dict[str, Any]) -> dict[str, Any]:
        # F3: spec-shape validation BEFORE we hand bytes to the parser.
        # FastAPI normally parses application/json automatically, but a
        # client sending text/plain or a non-JSON-RPC envelope must be
        # rejected with a structured 400 — silently coercing causes
        # subtle replay-protection bugs downstream.
        content_type = (request.headers.get("content-type") or "").split(";")[0].strip().lower()
        if content_type and content_type != "application/json":
            raise HTTPException(
                status_code=415,
                detail=f"unsupported content-type {content_type!r}; expected application/json",
            )
        if body.get("jsonrpc") != "2.0":
            raise HTTPException(
                status_code=400,
                detail=f"jsonrpc field must be '2.0'; got {body.get('jsonrpc')!r}",
            )
        method = body.get("method")
        if method != "message/send":
            raise HTTPException(
                status_code=400,
                detail=f"method must be 'message/send'; got {method!r}",
            )

        # 1. Parse envelope.
        try:
            request_env = from_a2a_jsonrpc(body)
        except EnvelopeFormatError as exc:
            raise HTTPException(status_code=400, detail=str(exc)) from exc

        if request_env.to_vacant_id != self_form.identity:
            raise HTTPException(
                status_code=421,  # Misdirected Request
                detail=(
                    f"envelope addressed to {request_env.to_vacant_id.short()}; "
                    f"this server is {self_form.identity.short()}"
                ),
            )

        # 2. Verify state can_be_called.
        state = state_fn()
        if state in (VacantState.SUNK, VacantState.ARCHIVED):
            raise HTTPException(
                status_code=410,
                detail=f"vacant is {state.value}; calls permanently rejected",
            )
        if not can_be_called(state):
            raise HTTPException(
                status_code=423,
                detail=f"vacant is {state.value}; not accepting calls",
            )

        # 3. Verify signature.
        try:
            request_env.verify_or_raise(request_env.from_vacant_id.verify_key())
        except EnvelopeSignatureError as exc:
            raise HTTPException(status_code=401, detail=str(exc)) from exc

        # 4. Replay-protect on the (caller → self) chain.
        try:
            await replay_store.check_and_advance(request_env)
        except Exception as exc:
            raise HTTPException(status_code=409, detail=str(exc)) from exc

        # 5. Dispatch to behavior.
        response_payload = await behavior(request_env)

        # 6. Build + return response envelope on the (self → caller) chain.
        response_env = await make_response_envelope(
            request=request_env,
            response_payload=response_payload,
            self_signing_key=self_signing_key,
            response_replay_store=replay_store,
            self_form=self_form,
        )
        # Wrap the response envelope inside a JSON-RPC 2.0 `result`. The
        # dispatcher unwraps `result.message` and re-parses it via
        # `from_a2a_jsonrpc`, so the carried message keeps the same
        # `params.message` field shape.
        wire = to_a2a_jsonrpc(response_env)
        return {
            "jsonrpc": "2.0",
            "id": body.get("id"),
            "result": {"message": wire["params"]["message"]},
        }

    return router

build_a2a_app

build_a2a_app(*, self_form: ResidentForm, self_signing_key: SigningKey, behavior: BehaviorHandler, replay_store: ReplayStore, state_provider: Callable[[], VacantState] | None = None) -> FastAPI

Convenience: a FastAPI app with the A2A router mounted.

Source code in src/vacant/protocol/serve.py
def build_a2a_app(
    *,
    self_form: ResidentForm,
    self_signing_key: SigningKey,
    behavior: BehaviorHandler,
    replay_store: ReplayStore,
    state_provider: Callable[[], VacantState] | None = None,
) -> FastAPI:
    """Convenience: a `FastAPI` app with the A2A router mounted."""
    app = FastAPI(
        title=f"Vacant A2A serve ({self_form.identity.short()})",
        version="0.1.0",
    )
    app.include_router(
        build_a2a_router(
            self_form=self_form,
            self_signing_key=self_signing_key,
            behavior=behavior,
            replay_store=replay_store,
            state_provider=state_provider,
        )
    )
    return app

mcp_adapter

MCP bridge adapters (P6 §3.4 / D009 §G).

Two adapters:

  • VacantAsMCPServer: wraps a serving vacant + behaviour callback so existing MCP-aware clients (Claude Code, OpenClaw plugin, etc.) can call it via the standard tools/list, tools/call shape. The internal flow is identical to serve.py's /a2a/message/send — same envelope verification, same replay protection.
  • MCPClientSubstrate: lets a vacant call out to an MCP server as part of its behaviour. Implements the P0 SubstrateBackend contract.

The full MCP wire protocol is not re-implemented here (would require pulling in the MCP SDK). Both adapters take a small transport callable that the caller wires to a real MCP runtime when needed; for unit tests we pass an in-process function. P7 demo will swap in the real transport.

VacantAsMCPServer dataclass

VacantAsMCPServer(self_form: ResidentForm, self_signing_key: SigningKey, behavior: BehaviorHandler, replay_store: ReplayStore)

Expose a vacant's capabilities as MCP tools.

Tools:

  • vacant_call: dispatches into the vacant's behaviour through the standard envelope path (signature verify + replay protect).
  • vacant_describe: returns the vacant's capability text + halo version (reads from the vacant's signed capability card).

The MCP transport (real wire protocol) is wired by the caller. This adapter is the bridge — what runs inside the MCP server when an MCP client calls a tool.

list_tools

list_tools() -> list[dict[str, Any]]

Mirror P6 §3.4 tools/list.

Source code in src/vacant/protocol/mcp_adapter.py
def list_tools(self) -> list[dict[str, Any]]:
    """Mirror P6 §3.4 `tools/list`."""
    return [
        {
            "name": "vacant_call",
            "title": "Call this vacant",
            "description": (
                "Send a task to this vacant via the standard A2A "
                "envelope path. Builds and verifies the envelope "
                "internally. Returns the signed response."
            ),
            "inputSchema": {
                "type": "object",
                "required": ["envelope"],
                "properties": {
                    "envelope": {
                        "type": "object",
                        "description": "A signed A2A JSON-RPC body",
                    }
                },
            },
        },
        {
            "name": "vacant_describe",
            "title": "Describe this vacant",
            "description": "Return capability text + halo metadata.",
            "inputSchema": {"type": "object", "properties": {}},
        },
    ]

call_tool async

call_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]

Dispatch an MCP tools/call.

Source code in src/vacant/protocol/mcp_adapter.py
async def call_tool(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]:
    """Dispatch an MCP `tools/call`."""
    if name == "vacant_describe":
        card = self.self_form.capability_card
        return {
            "vacant_id": self.self_form.identity.hex(),
            "capability_text": card.capability_text if card else None,
            "halo_version": card.halo_version if card else None,
            "endpoint": card.endpoint if card else None,
        }
    if name == "vacant_call":
        envelope_body = arguments["envelope"]
        request_env = from_a2a_jsonrpc(envelope_body)

        if request_env.to_vacant_id != self.self_form.identity:
            return {
                "error": (
                    "envelope_to_mismatch: expected "
                    f"{self.self_form.identity.hex()}, got "
                    f"{request_env.to_vacant_id.hex()}"
                )
            }
        if not can_be_called(self.self_form.runtime_state):
            return {
                "error": f"vacant {self.self_form.runtime_state.value}; not accepting calls"
            }
        request_env.verify_or_raise(request_env.from_vacant_id.verify_key())

        await self.replay_store.check_and_advance(request_env)
        response_payload = await self.behavior(request_env)
        response_env = await make_response_envelope(
            request=request_env,
            response_payload=response_payload,
            self_signing_key=self.self_signing_key,
            response_replay_store=self.replay_store,
            self_form=self.self_form,
        )
        wire = to_a2a_jsonrpc(response_env)
        return {"message": wire["params"]["message"]}

    return {"error": f"unknown tool {name!r}"}

MCPClientSubstrate dataclass

MCPClientSubstrate(server_url: str, tool_name: str, transport: MCPTransport)

Bases: SubstrateBackend

A SubstrateBackend that calls an MCP server tool as inference.

transport(server_url, body) -> dict is wired by the caller. tool_name is the MCP tool to invoke; the substrate forwards req.system_prompt + req.user_prompt as a params.message-shaped JSON-RPC request to the server.

errors

Error hierarchy for vacant.protocol.

ProtocolError

Bases: CoreError

Base class for vacant.protocol errors.

EnvelopeSignatureError

Bases: ProtocolError

Envelope signature failed to verify against the claimed sender.

EnvelopeFormatError

Bases: ProtocolError

Envelope is malformed (wrong shape, missing fields, etc.).

UnsupportedHaloVersionError

Bases: ProtocolError

Capability card halo_version is unknown to this build.

ReplayDetectedError

Bases: ProtocolError

An incoming envelope was a replay (sequence_no <= last seen).

ChainForkError

Bases: ProtocolError

An incoming envelope's prev_envelope_hash does not match the stored per-pair chain tip.

TargetUnavailableError

Bases: ProtocolError

Target vacant cannot accept calls (SUNK / ARCHIVED / HIBERNATING).

TargetNotFoundError

Bases: ProtocolError

Target vacant has no capability card / no endpoint.