Skip to content

vacant.registry

P4 registry — SQLite-backed halo store, RPC endpoints, halo aggregation (per-vacant, not routed-through), anti-tamper, and the Visibility axis that decides whether a halo is in the public index.

halo

Halo emission — per-vacant CapabilityCard publication + revocation.

Per THEORY_V5 §7.1 (Registry ontology), each vacant carries its own capability card; the registry stores a signed copy plus index entries so direct vacant-to-vacant calls can bypass it. LOCAL-state vacants are not stored centrally (visibility=NONE).

HaloRecord dataclass

HaloRecord(vacant_id: str, visibility: Visibility, event_seq: int, capability_card_hash: bytes)

Result of a successful publish_halo.

RevocationRecord dataclass

RevocationRecord(vacant_id: str, event_seq: int, reason: str)

Result of a successful revoke_halo.

RegisterEventDraftInputs dataclass

RegisterEventDraftInputs(vacant_id: str, capability_card_hash: bytes, halo_version: int, visibility: Visibility, ts_ms: int, actor_seq: int, idempotency_key: str)

Bag of inputs that name a single register event draft.

Both the client (CLI publishing over HTTP) and the server (HTTP handler verifying the request) construct the same canonical bytes from these fields, so the signature is bit-stable between sides.

publish_halo async

publish_halo(*, store: RegistryStore, card: CapabilityCard, runtime_state: VacantState, signing_key: SigningKey, base_model: str | None = None, base_model_family: str | None = None, owner_org: str | None = None, declared_capabilities: list[str] | None = None, parent_id: str | None = None, version: str | None = None, visibility: Visibility = PUBLIC) -> HaloRecord

Insert / update a vacant's halo + emit a register event.

Visibility rules (D006 §G + dispatch §"Visibility"): - LOCAL state forces Visibility.NONE regardless of visibility. - LOCAL halos are stored (so owner/parent direct lookup works) but effective_visibility returns NONE → discovery filters them out.

Republish kwargs policy (Pfix3 F2): base_model, base_model_family, owner_org, version are None-default. On new publish, they fall back to defaults ("unknown" / "0.0.1" / None). On republish, a None argument means "leave the existing column untouched"; only non-None arguments overwrite the row. This avoids accidentally clobbering version="0.5.0" with the default "0.0.1" when a caller republishes only to flip visibility. Card-derived columns (capability_card_*, declared_capabilities_json, visibility) always overwrite on republish — they are intrinsic to the new card.

Source code in src/vacant/registry/halo.py
async def publish_halo(
    *,
    store: RegistryStore,
    card: CapabilityCard,
    runtime_state: VacantState,
    signing_key: SigningKey,
    base_model: str | None = None,
    base_model_family: str | None = None,
    owner_org: str | None = None,
    declared_capabilities: list[str] | None = None,
    parent_id: str | None = None,
    version: str | None = None,
    visibility: Visibility = Visibility.PUBLIC,
) -> HaloRecord:
    """Insert / update a vacant's halo + emit a `register` event.

    Visibility rules (D006 §G + dispatch §"Visibility"):
    - LOCAL state forces `Visibility.NONE` regardless of `visibility`.
    - LOCAL halos are stored (so owner/parent direct lookup works) but
      `effective_visibility` returns NONE → discovery filters them out.

    Republish kwargs policy (Pfix3 F2): ``base_model``,
    ``base_model_family``, ``owner_org``, ``version`` are
    ``None``-default. On *new* publish, they fall back to defaults
    (``"unknown"`` / ``"0.0.1"`` / ``None``). On *republish*, a
    ``None`` argument means "leave the existing column untouched";
    only non-``None`` arguments overwrite the row. This avoids
    accidentally clobbering `version="0.5.0"` with the default
    `"0.0.1"` when a caller republishes only to flip visibility.
    Card-derived columns (``capability_card_*``,
    ``declared_capabilities_json``, ``visibility``) always overwrite
    on republish — they are intrinsic to the new card.
    """
    if not card.verify():
        raise RegistryWriteError("publish_halo: capability card signature invalid")

    vacant_id = card.vacant_id.hex()
    eff_vis = effective_visibility(runtime_state, visibility)
    capabilities = declared_capabilities or [card.capability_text]
    capability_card_hash = _capability_card_hash(card)
    capability_card_blob = serialize_card(card)
    ts = now_ms()

    existing = await store.get_vacant(vacant_id)
    vacant_to_insert: Vacant | None = None
    vacant_field_updates: dict[str, object] | None = None
    if existing is None:
        vacant_to_insert = Vacant(
            vacant_id=vacant_id,
            public_key=card.vacant_id.pubkey_bytes,
            owner_org=owner_org,
            base_model=base_model or "unknown",
            base_model_family=base_model_family or "unknown",
            parent_id=parent_id,
            version=version or "0.0.1",
            declared_capabilities_json=json.dumps(capabilities),
            capability_card_hash=capability_card_hash,
            capability_card_sig=card.signature,
            capability_card_blob=capability_card_blob,
            status="active",
            visibility=eff_vis.value,
            registered_at=ts,
        )
        next_seq = 1
        prev_halo_version = 0
    else:
        # Republish: enforce identity-custody invariants then build a
        # *partial* field-update payload — the card-derived columns
        # always overwrite (they ARE the card), but caller-supplied
        # metadata kwargs only overwrite when explicitly passed.
        last = await store.latest_event_for_actor(vacant_id)
        prev_halo_version = _extract_halo_version(last.payload_json) if last else 0
        _check_republish_invariants(
            existing=existing,
            card=card,
            new_parent_id=parent_id,
            prev_halo_version=prev_halo_version,
        )
        next_seq = (last.actor_seq if last else 0) + 1
        vacant_field_updates = {
            "capability_card_hash": capability_card_hash,
            "capability_card_sig": card.signature,
            "capability_card_blob": capability_card_blob,
            "declared_capabilities_json": json.dumps(capabilities),
            "visibility": eff_vis.value,
        }
        if base_model is not None:
            vacant_field_updates["base_model"] = base_model
        if base_model_family is not None:
            vacant_field_updates["base_model_family"] = base_model_family
        if owner_org is not None:
            vacant_field_updates["owner_org"] = owner_org
        if version is not None:
            vacant_field_updates["version"] = version

    # Emit signed `register` event so the publish lands in the audit chain.
    # F-A: vacant insert/update + event submit are bundled into a single
    # DB transaction by `submit_register_event_atomic`. If the event
    # fails (signature reject, idempotency conflict, race lost), the
    # vacant row insert / visibility flip is rolled back together — so
    # the public state and the audit chain can never diverge.
    draft_payload = {
        "vacant_id": vacant_id,
        "card_hash": capability_card_hash.hex(),
        "halo_version": card.halo_version,
        "visibility": eff_vis.value,
    }
    idempotency_key = f"register:{vacant_id}:{ts}:{uuid.uuid4()}"
    canonical_payload = json.dumps(draft_payload, sort_keys=True, separators=(",", ":")).encode(
        "utf-8"
    )
    payload_hash = hash_blake2b(canonical_payload)
    canonical = canonical_event_bytes(
        event_type="register",
        actor_vacant_id=vacant_id,
        subject_vacant_id=None,
        payload_hash=payload_hash,
        idempotency_key=idempotency_key,
        signed_by_pubkey=card.vacant_id.pubkey_bytes,
        ts=ts,
        actor_seq=next_seq,
    )
    sig = sign(signing_key, canonical)
    draft = SignedEventDraft(
        event_type="register",
        actor_vacant_id=vacant_id,
        subject_vacant_id=None,
        payload=draft_payload,
        idempotency_key=idempotency_key,
        signed_by_pubkey=card.vacant_id.pubkey_bytes,
        signature=sig,
        actor_seq=next_seq,
        ts=ts,
    )
    event = await store.submit_register_event_atomic(
        vacant_to_insert=vacant_to_insert,
        vacant_id_to_update=None if vacant_to_insert is not None else vacant_id,
        new_visibility=None if vacant_to_insert is not None else eff_vis.value,
        draft=draft,
        vacant_field_updates=vacant_field_updates,
    )

    return HaloRecord(
        vacant_id=vacant_id,
        visibility=eff_vis,
        event_seq=event.seq or 0,
        capability_card_hash=capability_card_hash,
    )

register_event_payload

register_event_payload(inp: RegisterEventDraftInputs) -> dict[str, object]

Canonical payload dict the register event carries.

Mirrors the in-process publish_halo payload (lines above) so HTTP-published rows produce the same audit footprint as direct calls.

Source code in src/vacant/registry/halo.py
def register_event_payload(inp: RegisterEventDraftInputs) -> dict[str, object]:
    """Canonical payload dict the ``register`` event carries.

    Mirrors the in-process ``publish_halo`` payload (lines above) so
    HTTP-published rows produce the same audit footprint as direct
    calls."""
    return {
        "vacant_id": inp.vacant_id,
        "card_hash": inp.capability_card_hash.hex(),
        "halo_version": inp.halo_version,
        "visibility": inp.visibility.value,
    }

register_event_canonical_bytes

register_event_canonical_bytes(inp: RegisterEventDraftInputs, *, signed_by_pubkey: bytes) -> bytes

Canonical Ed25519-signing payload for a register event.

The CLI publishes via HTTP by computing this byte-string, signing it under the vacant's own key, and POSTing card_blob + the signature to /v1/halo. The server reconstructs the same bytes and verifies before letting the row land in the audit chain.

Source code in src/vacant/registry/halo.py
def register_event_canonical_bytes(
    inp: RegisterEventDraftInputs, *, signed_by_pubkey: bytes
) -> bytes:
    """Canonical Ed25519-signing payload for a ``register`` event.

    The CLI publishes via HTTP by computing this byte-string, signing
    it under the vacant's own key, and POSTing card_blob + the
    signature to ``/v1/halo``. The server reconstructs the same bytes
    and verifies before letting the row land in the audit chain.
    """
    payload_bytes = json.dumps(
        register_event_payload(inp), sort_keys=True, separators=(",", ":")
    ).encode("utf-8")
    payload_hash = hash_blake2b(payload_bytes)
    return canonical_event_bytes(
        event_type="register",
        actor_vacant_id=inp.vacant_id,
        subject_vacant_id=None,
        payload_hash=payload_hash,
        idempotency_key=inp.idempotency_key,
        signed_by_pubkey=signed_by_pubkey,
        ts=inp.ts_ms,
        actor_seq=inp.actor_seq,
    )

publish_halo_signed async

publish_halo_signed(*, store: RegistryStore, card: CapabilityCard, runtime_state: VacantState, visibility: Visibility = PUBLIC, base_model: str | None = None, base_model_family: str | None = None, owner_org: str | None = None, declared_capabilities: list[str] | None = None, parent_id: str | None = None, version: str | None = None, event_ts_ms: int, event_actor_seq: int, event_idempotency_key: str, event_signature: bytes) -> HaloRecord

HTTP-friendly variant of publish_halo: the caller pre-signs the register event so the registry never needs the vacant's private key.

Server-side flow:

  1. Verify the capability card's own signature.
  2. Insert the vacant row if missing (so submit_event's actor lookup can succeed).
  3. Submit the pre-signed register event via store.submit_event, which re-runs L1 signature verification + L2 sequence check.

Republish kwargs policy mirrors publish_halo (Pfix3 F2): None arguments leave the existing column untouched on republish; only non-None values overwrite. New publishes fall back to "unknown" / "0.0.1" defaults.

Source code in src/vacant/registry/halo.py
async def publish_halo_signed(
    *,
    store: RegistryStore,
    card: CapabilityCard,
    runtime_state: VacantState,
    visibility: Visibility = Visibility.PUBLIC,
    base_model: str | None = None,
    base_model_family: str | None = None,
    owner_org: str | None = None,
    declared_capabilities: list[str] | None = None,
    parent_id: str | None = None,
    version: str | None = None,
    event_ts_ms: int,
    event_actor_seq: int,
    event_idempotency_key: str,
    event_signature: bytes,
) -> HaloRecord:
    """HTTP-friendly variant of `publish_halo`: the caller pre-signs the
    register event so the registry never needs the vacant's private key.

    Server-side flow:

    1. Verify the capability card's own signature.
    2. Insert the vacant row if missing (so ``submit_event``'s actor
       lookup can succeed).
    3. Submit the pre-signed register event via ``store.submit_event``,
       which re-runs L1 signature verification + L2 sequence check.

    Republish kwargs policy mirrors ``publish_halo`` (Pfix3 F2):
    ``None`` arguments leave the existing column untouched on
    republish; only non-``None`` values overwrite. New publishes fall
    back to ``"unknown"`` / ``"0.0.1"`` defaults.
    """
    if not card.verify():
        raise RegistryWriteError("publish_halo_signed: capability card signature invalid")

    vacant_id = card.vacant_id.hex()
    eff_vis = effective_visibility(runtime_state, visibility)
    capabilities = declared_capabilities or [card.capability_text]
    capability_card_hash = _capability_card_hash(card)
    capability_card_blob = serialize_card(card)

    existing = await store.get_vacant(vacant_id)
    vacant_to_insert: Vacant | None = None
    vacant_field_updates: dict[str, object] | None = None
    if existing is None:
        vacant_to_insert = Vacant(
            vacant_id=vacant_id,
            public_key=card.vacant_id.pubkey_bytes,
            owner_org=owner_org,
            base_model=base_model or "unknown",
            base_model_family=base_model_family or "unknown",
            parent_id=parent_id,
            version=version or "0.0.1",
            declared_capabilities_json=json.dumps(capabilities),
            capability_card_hash=capability_card_hash,
            capability_card_sig=card.signature,
            capability_card_blob=capability_card_blob,
            status="active",
            visibility=eff_vis.value,
            registered_at=event_ts_ms,
        )
    else:
        # Republish: enforce identity-custody invariants then build a
        # *partial* field-update payload — see publish_halo for the
        # detailed kwargs policy.
        last = await store.latest_event_for_actor(vacant_id)
        prev_halo_version = _extract_halo_version(last.payload_json) if last else 0
        _check_republish_invariants(
            existing=existing,
            card=card,
            new_parent_id=parent_id,
            prev_halo_version=prev_halo_version,
        )
        vacant_field_updates = {
            "capability_card_hash": capability_card_hash,
            "capability_card_sig": card.signature,
            "capability_card_blob": capability_card_blob,
            "declared_capabilities_json": json.dumps(capabilities),
            "visibility": eff_vis.value,
        }
        if base_model is not None:
            vacant_field_updates["base_model"] = base_model
        if base_model_family is not None:
            vacant_field_updates["base_model_family"] = base_model_family
        if owner_org is not None:
            vacant_field_updates["owner_org"] = owner_org
        if version is not None:
            vacant_field_updates["version"] = version

    inputs = RegisterEventDraftInputs(
        vacant_id=vacant_id,
        capability_card_hash=capability_card_hash,
        halo_version=card.halo_version,
        visibility=eff_vis,
        ts_ms=event_ts_ms,
        actor_seq=event_actor_seq,
        idempotency_key=event_idempotency_key,
    )
    draft = SignedEventDraft(
        event_type="register",
        actor_vacant_id=vacant_id,
        subject_vacant_id=None,
        payload=register_event_payload(inputs),
        idempotency_key=event_idempotency_key,
        signed_by_pubkey=card.vacant_id.pubkey_bytes,
        signature=event_signature,
        actor_seq=event_actor_seq,
        ts=event_ts_ms,
    )
    # F-A: vacant insert/update + register event are submitted in one
    # DB transaction so a failed `submit_event` (bad signature, race
    # lost, idempotency conflict) rolls back the row insert and we
    # never end up with a publicly-visible halo whose register event
    # is missing from the audit chain.
    event = await store.submit_register_event_atomic(
        vacant_to_insert=vacant_to_insert,
        vacant_id_to_update=None if vacant_to_insert is not None else vacant_id,
        new_visibility=None if vacant_to_insert is not None else eff_vis.value,
        draft=draft,
        vacant_field_updates=vacant_field_updates,
    )
    return HaloRecord(
        vacant_id=vacant_id,
        visibility=eff_vis,
        event_seq=event.seq or 0,
        capability_card_hash=capability_card_hash,
    )

revoke_halo async

revoke_halo(*, store: RegistryStore, vacant_id: str, reason: str, signing_key: SigningKey, pubkey_bytes: bytes) -> RevocationRecord

Mark a vacant as revoked — emit a signed revoke event and flip status to revoked. Append-only: the historical capability card stays in the table.

Source code in src/vacant/registry/halo.py
async def revoke_halo(
    *,
    store: RegistryStore,
    vacant_id: str,
    reason: str,
    signing_key: SigningKey,
    pubkey_bytes: bytes,
) -> RevocationRecord:
    """Mark a vacant as revoked — emit a signed `revoke` event and flip
    `status` to `revoked`. Append-only: the historical capability card
    stays in the table.
    """
    if not reason.strip():
        raise RegistryWriteError("revoke_halo: reason must be non-empty")
    v = await store.get_vacant(vacant_id)
    if v is None:
        raise RegistryWriteError(f"revoke_halo: vacant {vacant_id} not found")

    ts = now_ms()
    payload: dict[str, Any] = {"vacant_id": vacant_id, "reason": reason}
    payload_bytes = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
    payload_hash = hash_blake2b(payload_bytes)
    last = await store.latest_event_for_actor(vacant_id)
    next_seq = (last.actor_seq if last else 0) + 1
    from vacant.registry.antitamper import canonical_event_bytes

    canonical = canonical_event_bytes(
        event_type="revoke",
        actor_vacant_id=vacant_id,
        subject_vacant_id=None,
        payload_hash=payload_hash,
        idempotency_key=f"revoke:{vacant_id}:{ts}",
        signed_by_pubkey=pubkey_bytes,
        ts=ts,
        actor_seq=next_seq,
    )
    sig = sign(signing_key, canonical)
    draft = SignedEventDraft(
        event_type="revoke",
        actor_vacant_id=vacant_id,
        subject_vacant_id=None,
        payload=payload,
        idempotency_key=f"revoke:{vacant_id}:{ts}",
        signed_by_pubkey=pubkey_bytes,
        signature=sig,
        actor_seq=next_seq,
        ts=ts,
    )
    event = await store.submit_event(draft)
    await store.update_vacant_status(vacant_id, "revoked")
    return RevocationRecord(vacant_id=vacant_id, event_seq=event.seq or 0, reason=reason)

aggregation

Aggregation/index layer over the central store.

search_capability does substring + filter matching on the index columns; rank_by_reputation defers to a ReputationOracle Protocol that P3 plugs in. For now the default oracle returns 0.0 for everyone, so ordering falls back to insertion order — but the API is stable.

lineage_query walks the lineage edge in either direction.

Result objects always include the halo signature so consumers can verify the card independently of the registry's say-so (THEORY_V5 §7.1 trust-anchor-not-trust-origin).

HaloMatch dataclass

HaloMatch(vacant_id: str, capability_card_hash: bytes, capability_card_sig: bytes, declared_capabilities_json: str, base_model_family: str, visibility: Visibility, score: float = 0.0, capability_card: CapabilityCard | None = None)

A single search/rank result.

Carries the halo signature and the full signed CapabilityCard (D015 §C) so the caller can verify the card and dispatch directly to card.endpoint in a single round-trip — no registry rehydration. capability_card is optional only for legacy rows written before the blob column existed; new rows always carry one.

ReputationOracle

Bases: Protocol

P3 plugs in here; for P4 we ship a stub that returns 0.0.

search_capability async

search_capability(*, store: RegistryStore, query: str | None = None, family: str | None = None, limit: int = 20, include_local: bool = False) -> list[HaloMatch]

Search the registry index. NONE-visibility halos are excluded by default — include_local=True is for owner/parent direct paths and callers must additionally enforce that the requester is the owner.

Source code in src/vacant/registry/aggregation.py
async def search_capability(
    *,
    store: RegistryStore,
    query: str | None = None,
    family: str | None = None,
    limit: int = 20,
    include_local: bool = False,
) -> list[HaloMatch]:
    """Search the registry index. NONE-visibility halos are excluded by
    default — `include_local=True` is for owner/parent direct paths and
    callers must additionally enforce that the requester is the owner.
    """
    visibility_filter = None if include_local else Visibility.PUBLIC.value
    rows = await store.search_capability(
        capability=query,
        family=family,
        status="active",
        visibility=visibility_filter,
        limit=limit,
    )
    return [_to_match(r) for r in rows]

rank_by_reputation async

rank_by_reputation(matches: Sequence[HaloMatch], *, dimensions: Sequence[str] = ('factual', 'logical', 'relevance'), oracle: ReputationOracle = DEFAULT_REPUTATION_ORACLE) -> list[HaloMatch]

Re-score and sort matches using oracle. Stable for ties.

Source code in src/vacant/registry/aggregation.py
async def rank_by_reputation(
    matches: Sequence[HaloMatch],
    *,
    dimensions: Sequence[str] = ("factual", "logical", "relevance"),
    oracle: ReputationOracle = DEFAULT_REPUTATION_ORACLE,
) -> list[HaloMatch]:
    """Re-score and sort `matches` using `oracle`. Stable for ties."""
    scored: list[HaloMatch] = []
    for m in matches:
        s = await oracle.score(m.vacant_id, dimensions)
        scored.append(
            HaloMatch(
                vacant_id=m.vacant_id,
                capability_card_hash=m.capability_card_hash,
                capability_card_sig=m.capability_card_sig,
                declared_capabilities_json=m.declared_capabilities_json,
                base_model_family=m.base_model_family,
                visibility=m.visibility,
                score=s,
                capability_card=m.capability_card,
            )
        )
    scored.sort(key=lambda m: m.score, reverse=True)
    return scored

lineage_query async

lineage_query(*, store: RegistryStore, vacant_id: str, direction: Literal['descendants', 'ancestors'] = 'descendants', depth: int = 8) -> list[str]

Walk the lineage edge and return a list of vacant_ids.

Source code in src/vacant/registry/aggregation.py
async def lineage_query(
    *,
    store: RegistryStore,
    vacant_id: str,
    direction: Literal["descendants", "ancestors"] = "descendants",
    depth: int = 8,
) -> list[str]:
    """Walk the lineage edge and return a list of vacant_ids."""
    target = await store.get_vacant(vacant_id)
    if target is None:
        raise NotFoundError(vacant_id)
    if direction == "descendants":
        rows = await store.list_descendants(vacant_id, max_depth=depth)
    else:
        rows = await store.list_ancestors(vacant_id, max_depth=depth)
    return [r.vacant_id for r in rows]

store

Central-MVP registry store (SQLAlchemy AsyncEngine + aiosqlite).

Implements RegistryBackend for SQLite. Anti-tamper layers L1-L3 are checked here at write time; L4 (Merkle snapshots) is exposed via seal_epoch(); L5 (anomaly counters) is wired into submit_event as a post-write signal; L6 (append-only) is enforced by exposing no delete_* methods + raising AppendOnlyViolation if a caller tries SQL-direct DELETE.

Async only. Construction takes a SQLAlchemy AsyncEngine (DI seam — tests use sqlite+aiosqlite:///:memory:; production uses a file path).

SignedEventDraft dataclass

SignedEventDraft(event_type: str, actor_vacant_id: str, subject_vacant_id: str | None, payload: dict[str, object], idempotency_key: str, signed_by_pubkey: bytes, signature: bytes, actor_seq: int, ts: int)

A pre-signed event draft handed to the store. The store re-derives the hash chain + verifies the signature before insert.

RegistryStore

RegistryStore(engine: AsyncEngine)

SQLite-backed RegistryBackend impl with anti-tamper hooks.

Source code in src/vacant/registry/store.py
def __init__(self, engine: AsyncEngine) -> None:
    self._engine = engine
    self._sessionmaker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    # L6: prohibit DELETE on append-only tables.
    self._install_append_only_guard()
    self._write_lock = asyncio.Lock()

init_schema async

init_schema() -> None

Create all tables. Idempotent.

Source code in src/vacant/registry/store.py
async def init_schema(self) -> None:
    """Create all tables. Idempotent."""
    async with self._engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

submit_event async

submit_event(draft: SignedEventDraft) -> Event

The hot path: idempotency → sig verify → seq monotone → chain → insert. Returns the persisted Event with seq and event_hash.

Race protection (F-B): the in-process asyncio.Lock is a fast-path mutex within a single worker; the load-bearing defense is the (actor_vacant_id, actor_seq) UNIQUE on the event table, which turns concurrent inserts of the same actor_seq into IntegrityError and back into SequenceMonotonicityError for the loser of the race.

Source code in src/vacant/registry/store.py
async def submit_event(self, draft: SignedEventDraft) -> Event:
    """The hot path: idempotency → sig verify → seq monotone → chain
    → insert. Returns the persisted `Event` with `seq` and `event_hash`.

    Race protection (F-B): the in-process `asyncio.Lock` is a
    fast-path mutex within a single worker; the load-bearing defense
    is the `(actor_vacant_id, actor_seq)` UNIQUE on the `event`
    table, which turns concurrent inserts of the same `actor_seq`
    into `IntegrityError` and back into `SequenceMonotonicityError`
    for the loser of the race.
    """
    async with self._write_lock:
        async with self._sessionmaker() as s:
            async with s.begin():
                return await self._submit_event_in_session(s, draft)

submit_register_event_atomic async

submit_register_event_atomic(*, vacant_to_insert: Vacant | None, vacant_id_to_update: str | None, new_visibility: str | None, draft: SignedEventDraft, vacant_field_updates: dict[str, object] | None = None) -> Event

F-A defense: insert/update vacant + submit register event in ONE transaction. If submit_event fails (signature rejected, idempotency conflict, sequence race), the vacant row insert / visibility update is rolled back, so the audit chain and the publicly-visible state can never diverge.

Exactly one of vacant_to_insert or vacant_id_to_update should be non-None per call site. If both are None, only the event is submitted (used by tests).

vacant_field_updates (Pfix3 B5): when vacant_id_to_update is set, callers can pass a dict of column-name → new-value to apply onto the existing row before the register event lands. Used by publish_halo republish so the row's capability_card_* columns track the new card instead of going stale while the audit chain advances. new_visibility is the legacy single-field path; if vacant_field_updates contains a visibility key it takes precedence.

Source code in src/vacant/registry/store.py
async def submit_register_event_atomic(
    self,
    *,
    vacant_to_insert: Vacant | None,
    vacant_id_to_update: str | None,
    new_visibility: str | None,
    draft: SignedEventDraft,
    vacant_field_updates: dict[str, object] | None = None,
) -> Event:
    """F-A defense: insert/update vacant + submit register event in
    ONE transaction. If `submit_event` fails (signature rejected,
    idempotency conflict, sequence race), the vacant row insert /
    visibility update is rolled back, so the audit chain and the
    publicly-visible state can never diverge.

    Exactly one of `vacant_to_insert` or `vacant_id_to_update` should
    be non-None per call site. If both are None, only the event is
    submitted (used by tests).

    ``vacant_field_updates`` (Pfix3 B5): when ``vacant_id_to_update``
    is set, callers can pass a dict of column-name → new-value to
    apply onto the existing row before the register event lands.
    Used by ``publish_halo`` republish so the row's
    ``capability_card_*`` columns track the new card instead of
    going stale while the audit chain advances. ``new_visibility``
    is the legacy single-field path; if ``vacant_field_updates``
    contains a ``visibility`` key it takes precedence.
    """
    async with self._write_lock:
        async with self._sessionmaker() as s:
            async with s.begin():
                if vacant_to_insert is not None:
                    s.add(vacant_to_insert)
                    try:
                        await s.flush()
                    except IntegrityError as exc:
                        raise RegistryWriteError(
                            f"vacant {vacant_to_insert.vacant_id} already exists"
                        ) from exc
                elif vacant_id_to_update is not None:
                    v = await s.get(Vacant, vacant_id_to_update)
                    if v is None:
                        raise NotFoundError(f"vacant {vacant_id_to_update} not found")
                    if vacant_field_updates:
                        for fname, fval in vacant_field_updates.items():
                            setattr(v, fname, fval)
                    elif new_visibility is not None:
                        v.visibility = new_visibility
                    await s.flush()
                return await self._submit_event_in_session(s, draft)

verify_event_chain async

verify_event_chain() -> bool

Recompute every stored event's payload_hash, signature, and event_hash from payload_json + canonical bytes. Returns False on any mismatch — i.e. detects in-place tampering that bypassed the signed write path (UPDATE instead of INSERT). The append-only guard (L6) catches DELETE; this catches UPDATE.

Source code in src/vacant/registry/store.py
async def verify_event_chain(self) -> bool:
    """Recompute every stored event's `payload_hash`, signature, and
    `event_hash` from `payload_json` + canonical bytes. Returns False on
    any mismatch — i.e. detects in-place tampering that bypassed the
    signed write path (UPDATE instead of INSERT). The append-only
    guard (L6) catches DELETE; this catches UPDATE.
    """
    expected_prev = b"\x00" * 32
    async with self._sessionmaker() as s:
        res = await s.execute(select(Event).order_by(Event.seq))  # type: ignore[arg-type]
        for ev in res.scalars().all():
            if ev.prev_event_hash != expected_prev:
                return False
            # Recompute payload_hash from stored canonical payload_json.
            recomputed_payload_hash = hash_blake2b(ev.payload_json.encode("utf-8"))
            if recomputed_payload_hash != ev.payload_hash:
                return False
            # Recompute canonical bytes.
            canonical = canonical_event_bytes(
                event_type=ev.event_type,
                actor_vacant_id=ev.actor_vacant_id,
                subject_vacant_id=ev.subject_vacant_id,
                payload_hash=ev.payload_hash,
                idempotency_key=ev.idempotency_key,
                signed_by_pubkey=ev.signed_by_pubkey,
                ts=ev.ts,
                actor_seq=ev.actor_seq,
            )
            # Re-verify signature.
            try:
                verify_event_signature(
                    pubkey_bytes=ev.signed_by_pubkey,
                    canonical_bytes=canonical,
                    signature=ev.signature,
                )
            except Exception:
                return False
            # Re-derive event_hash.
            recomputed_hash = compute_event_hash(
                prev_event_hash=ev.prev_event_hash,
                canonical_bytes=canonical,
                signature=ev.signature,
            )
            if recomputed_hash != ev.event_hash:
                return False
            expected_prev = ev.event_hash
    return True

verify_vacant_index_consistent async

verify_vacant_index_consistent(vacant_id: str) -> bool

True iff the indexed vacant.visibility column matches the visibility recorded on the most recent register event for that vacant. Catches direct SQL UPDATE of the visibility column — Padv-P4 §2.

Returns True if the vacant has no register events on file (nothing to compare against; rejected at higher levels).

Source code in src/vacant/registry/store.py
async def verify_vacant_index_consistent(self, vacant_id: str) -> bool:
    """True iff the indexed `vacant.visibility` column matches the
    visibility recorded on the most recent `register` event for that
    vacant. Catches direct SQL UPDATE of the visibility column —
    Padv-P4 §2.

    Returns True if the vacant has no register events on file (nothing
    to compare against; rejected at higher levels).
    """
    v = await self.get_vacant(vacant_id)
    if v is None:
        return False
    async with self._sessionmaker() as s:
        res = await s.execute(
            select(Event)
            .where(Event.actor_vacant_id == vacant_id)
            .where(Event.event_type == "register")
            .order_by(sa_desc(Event.seq))  # type: ignore[arg-type]
            .limit(1)
        )
        latest_register = res.scalar_one_or_none()
    if latest_register is None:
        # No register events: nothing to compare. Treat as inconsistent
        # because every vacant in the index *must* have a register event.
        return False
    try:
        payload = json.loads(latest_register.payload_json)
    except json.JSONDecodeError:
        return False
    return bool(v.visibility == payload.get("visibility"))

seal_epoch async

seal_epoch(*, signing_key: SigningKey) -> MerkleEpoch

Build a Merkle root over all unsealed events, store it, and attach epoch_id back to each leaf event. Operator-signed.

Source code in src/vacant/registry/store.py
async def seal_epoch(self, *, signing_key: SigningKey) -> MerkleEpoch:
    """Build a Merkle root over all unsealed events, store it, and
    attach `epoch_id` back to each leaf event. Operator-signed.
    """
    unsealed = await self.list_unsealed_events()
    if not unsealed:
        raise RegistryWriteError("seal_epoch: no unsealed events")
    leaves = [e.event_hash for e in unsealed]
    root = build_merkle_root(leaves)
    sig = sign_epoch_root(root=root, signing_key=signing_key)
    epoch = MerkleEpoch(
        first_seq=unsealed[0].seq or 0,
        last_seq=unsealed[-1].seq or 0,
        tree_size=len(unsealed),
        root_hash=root,
        sealed_at=now_ms(),
        registry_signature=sig,
    )
    async with self._sessionmaker() as s:
        s.add(epoch)
        await s.commit()
        await s.refresh(epoch)
        # Assign epoch_id to events.
        for e in unsealed:
            e_db = await s.get(Event, e.seq)
            if e_db is not None:
                e_db.epoch_id = epoch.epoch_id
        await s.commit()
    return epoch

lookup_halo_for_caller async

lookup_halo_for_caller(target_vacant_id: str, *, caller_pubkey_hex: str | None = None) -> Vacant

Visibility-aware halo lookup. Returns the halo iff:

  • target.visibility == PUBLIC, OR
  • caller == target (owner-direct), OR
  • caller == target.parent (parent-direct).

Raises VisibilityViolation for stranger lookups against NONE halos. Raises NotFoundError if the target doesn't exist.

Source code in src/vacant/registry/store.py
async def lookup_halo_for_caller(
    self,
    target_vacant_id: str,
    *,
    caller_pubkey_hex: str | None = None,
) -> Vacant:
    """Visibility-aware halo lookup. Returns the halo iff:

    - `target.visibility == PUBLIC`, OR
    - `caller == target` (owner-direct), OR
    - `caller == target.parent` (parent-direct).

    Raises `VisibilityViolation` for stranger lookups against NONE
    halos. Raises `NotFoundError` if the target doesn't exist.
    """
    v = await self.get_vacant(target_vacant_id)
    if v is None:
        raise NotFoundError(target_vacant_id)
    # Compute effective visibility from runtime status.
    runtime = self._status_to_state(v.status)
    eff = effective_visibility(runtime, Visibility(v.visibility))
    if eff == Visibility.NONE:
        if caller_pubkey_hex is None:
            raise VisibilityViolation(
                f"vacant {target_vacant_id} is NONE-visibility; caller required"
            )
        if caller_pubkey_hex == v.vacant_id:
            return v
        if v.parent_id is not None and caller_pubkey_hex == v.parent_id:
            return v
        raise VisibilityViolation(
            f"vacant {target_vacant_id} is NONE-visibility; caller is not owner/parent"
        )
    return v

now_ms

now_ms() -> int

Current time in millis since epoch (P4 §3.1 timestamp convention).

Source code in src/vacant/registry/store.py
def now_ms() -> int:
    """Current time in millis since epoch (P4 §3.1 timestamp convention)."""
    return int(time.time() * 1000)

canonical_json

canonical_json(payload: dict[str, object]) -> str

JSON canonicalisation for payload_json storage. D006 §F: same sort_keys + tight separators form used by P0 logbooks; JCS-strict is future work.

Source code in src/vacant/registry/store.py
def canonical_json(payload: dict[str, object]) -> str:
    """JSON canonicalisation for `payload_json` storage. D006 §F: same
    `sort_keys + tight separators` form used by P0 logbooks; JCS-strict
    is future work."""
    return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False)

backend

RegistryBackend Protocol — the seam for swapping central → federated/DHT.

Acceptance criterion (dispatch §"Acceptance"): "Architected so the swap from central → federated/DHT is local to one module (a RegistryBackend Protocol)". This module declares the contract; central.py (mid-PR class wired into store.py) implements it for SQLite. Federated and DHT backends are post-MVP.

RegistryBackend

Bases: Protocol

Storage contract every backend implementation honours.

Methods are async; return types use SQLModel rows directly so the aggregation layer can index them without a translation step. A federated backend would implement the same interface using cross-shard reads + witness-verified writes (post-MVP).

models

SQLModel tables for the central-MVP registry backend.

13 tables per architecture/components/P4_registry.md §3.1, mapped onto SQLModel for typed CRUD + Alembic-generated migrations. Field names that collide with Python keywords are suffixed _.

Schema decisions reconciled in D006: - Hashes are 32-byte BLAKE2b digests (HASH_DIGEST_BYTES), not BLAKE3. - Timestamps are stored as int milliseconds since epoch (matches spec §3.1 "all timestamps millis since epoch"). - vacant_id stored as the lowercase hex pubkey (matches VacantId.hex()).

Vacant

Bases: SQLModel

Per-vacant capability-card snapshot. P4 §3.1 table 1.

capability_card_blob class-attribute instance-attribute

capability_card_blob: bytes = Field(default=b'', sa_column=Column(LargeBinary, nullable=False))

Canonical-JSON serialized signed CapabilityCard (D015 §C). Carried verbatim through HaloMatch so dispatch can call card.endpoint without rehydrating from individual columns.

Attestation

Bases: SQLModel

Identity attestation issued by a developer / org / peer / oracle.

Event

Bases: SQLModel

Append-only signed event log. P4 §3.1 table 3.

The (actor_vacant_id, actor_seq) UniqueConstraint is the load-bearing race defense against codex F-B: an in-process asyncio.Lock only guards concurrent submits inside a single worker, so two workers reading the same latest_event_for_actor could both pass check_sequence_monotonic and both try to insert the same actor_seq. The DB-level UNIQUE turns that race into an IntegrityError at insert time, which the store layer catches and re-raises as SequenceMonotonicityError.

actor_seq class-attribute instance-attribute

actor_seq: int = Field(default=0, index=True)

Per-actor monotonic sequence (anti-tamper L2).

EventFinalization

Bases: SQLModel

N-of-M attestation finalization for an event. P4 §3.1 table 4.

MerkleEpoch

Bases: SQLModel

Periodic Merkle root over the event log. P4 §3.1 table 5.

EpochWitness

Bases: SQLModel

L6 federated witness cosignature on an epoch root.

ReputationSnapshot

Bases: SQLModel

Per-vacant + per-epoch five-dimensional reputation snapshot.

Bases: SQLModel

Bilateral composition agreement between two vacants.

SinkRecord

Bases: SQLModel

Sunk vacant terminal record. P4 §3.1 table 9.

Freeze

Bases: SQLModel

Temporary freeze (anomaly / governance / self).

Revocation

Bases: SQLModel

Public-key revocation record.

ReadAudit

Bases: SQLModel

Optional read-side audit log (P4 §3.1 table 12; off by default).

AnomalyWindow

Bases: SQLModel

Rolling-window anomaly counter (rule-based MVP, P4 §3.2 table).

rpc

FastAPI RPC surface — 25 endpoints documented in OpenAPI.

Per dispatch §"Acceptance": "13 tables present, 25 RPC endpoints documented in OpenAPI". This module wires every endpoint listed in architecture/components/P4_registry.md §3.2 to a Pydantic v2 request/response model and a thin handler that delegates to RegistryStore / aggregation.py / halo.py.

Endpoints whose backing logic belongs to other components (P3 reputation snapshots, P5 composition links, P6 envelope dispatch) carry a not_implemented_in_p4 flag in the response so callers can plan around the stubs without the endpoint disappearing later.

HaloPublishRequest

Bases: _Base

Body for POST /v1/halo.

The caller serialises a signed CapabilityCard (via vacant.protocol.capability_card.serialize) and pre-signs the audit-chain register event under their own Ed25519 key. The server reconstructs the canonical event bytes from these fields, re-verifies the signature, and submits the event to the store.

capability_card_blob_hex instance-attribute

capability_card_blob_hex: str

Hex-encoded serialize(card) bytes — full signed CapabilityCard.

HaloMatchResponse

Bases: _Base

capability_card_blob_hex class-attribute instance-attribute

capability_card_blob_hex: str = ''

Hex of the canonical-JSON serialized signed card. Empty for legacy rows written before the blob column existed; clients should treat empty as an indication to fall back to capability_card_sig_hex + the index columns.

StubResponse

Bases: _Base

Returned by endpoints whose backing logic belongs to a later component.

build_app

build_app(store: RegistryStore, *, reputation_oracle: ReputationOracle | None = None) -> FastAPI

Build the FastAPI app with all 25 endpoints wired to store.

reputation_oracle is consulted by /v1/query_capability to rank halo matches by 5-D Beta means (P3). When omitted the DEFAULT_REPUTATION_ORACLE (zero-score stub) is used — that falls back to insertion order, which is fine for unit tests but not for the demo dashboard / production. The MVP demo wires a real vacant.reputation.aggregator.Aggregator here (F6).

Source code in src/vacant/registry/rpc.py
def build_app(
    store: RegistryStore,
    *,
    reputation_oracle: ReputationOracle | None = None,
) -> FastAPI:
    """Build the FastAPI app with all 25 endpoints wired to `store`.

    `reputation_oracle` is consulted by `/v1/query_capability` to
    rank halo matches by 5-D Beta means (P3). When omitted the
    `DEFAULT_REPUTATION_ORACLE` (zero-score stub) is used — that
    falls back to insertion order, which is fine for unit tests but
    not for the demo dashboard / production. The MVP demo wires a
    real `vacant.reputation.aggregator.Aggregator` here (F6).
    """
    oracle: ReputationOracle = reputation_oracle or DEFAULT_REPUTATION_ORACLE

    app = FastAPI(
        title="Vacant Registry (P4 — central MVP)",
        version="0.1.0",
        description=(
            "Per-vacant capability-card publication + aggregation index. "
            "13 tables, 25 endpoints, 6 anti-tamper layers. See P4_registry.md."
        ),
    )

    # --- writes (12) -------------------------------------------------------

    @app.post("/v1/halo", response_model=HaloResponse, tags=["write"])
    async def publish(req: HaloPublishRequest) -> HaloResponse:
        # F5: the registry must accept HTTP halo publishes so the
        # `vacant publish` CLI command can put a signed capability card
        # on the wire without going through Python imports. The caller
        # pre-signs the register-event canonical bytes; the server
        # reconstructs them inside `publish_halo_signed` and rejects
        # bad signatures via `submit_event`'s L1 verifier.
        try:
            blob = bytes.fromhex(req.capability_card_blob_hex)
        except ValueError as exc:
            raise HTTPException(
                status_code=400, detail=f"capability_card_blob_hex not hex: {exc}"
            ) from exc
        try:
            card = deserialize_card(blob)
        except (EnvelopeFormatError, UnsupportedHaloVersionError) as exc:
            raise HTTPException(
                status_code=400, detail=f"capability card parse failed: {exc}"
            ) from exc
        try:
            signature = bytes.fromhex(req.event_signature_hex)
        except ValueError as exc:
            raise HTTPException(
                status_code=400, detail=f"event_signature_hex not hex: {exc}"
            ) from exc

        try:
            record = await publish_halo_signed(
                store=store,
                card=card,
                runtime_state=VacantState(req.runtime_state),
                visibility=Visibility(req.visibility),
                base_model=req.base_model,
                base_model_family=req.base_model_family,
                owner_org=req.owner_org,
                declared_capabilities=req.declared_capabilities,
                parent_id=req.parent_id,
                version=req.version,
                event_ts_ms=req.event_ts_ms,
                event_actor_seq=req.event_actor_seq,
                event_idempotency_key=req.event_idempotency_key,
                event_signature=signature,
            )
        except SignatureRejected as exc:
            raise HTTPException(status_code=401, detail=str(exc)) from exc
        except SequenceMonotonicityError as exc:
            raise HTTPException(status_code=409, detail=str(exc)) from exc
        except IdempotencyConflict as exc:
            raise HTTPException(status_code=409, detail=str(exc)) from exc
        except RegistryWriteError as exc:
            raise HTTPException(status_code=400, detail=str(exc)) from exc
        return HaloResponse(
            vacant_id=record.vacant_id,
            visibility=record.visibility.value,
            event_seq=record.event_seq,
            capability_card_hash_hex=record.capability_card_hash.hex(),
        )

    @app.post("/v1/revoke_halo", response_model=RevokeHaloResponse, tags=["write"])
    async def revoke(req: RevokeHaloRequest) -> RevokeHaloResponse:
        try:
            pubkey_bytes = bytes.fromhex(req.pubkey_hex)
            signature = bytes.fromhex(req.signature_hex)
        except ValueError as exc:
            raise HTTPException(status_code=400, detail=f"hex decode failed: {exc}") from exc
        try:
            from vacant.core.crypto import SigningKey  # noqa: F401

            # We cannot reconstruct a SigningKey from the public key alone;
            # this endpoint expects the caller to have the private key.
            # P6 envelope work will replace this with a signed envelope.
            _ = (pubkey_bytes, signature)
            raise HTTPException(
                status_code=501,
                detail="revoke_halo HTTP path lands with P6 envelope; use halo.revoke_halo()",
            )
        except RegistryWriteError as exc:
            raise HTTPException(status_code=400, detail=str(exc)) from exc

    @app.post("/v1/submit_event", response_model=StubResponse, tags=["write"])
    async def submit_event() -> StubResponse:
        return StubResponse(component="P6", message="generic envelope dispatcher")

    @app.post("/v1/submit_review", response_model=StubResponse, tags=["write"])
    async def submit_review() -> StubResponse:
        return StubResponse(component="P3", message="reviews land with P3 reputation")

    @app.post("/v1/submit_peer_review", response_model=StubResponse, tags=["write"])
    async def submit_peer_review() -> StubResponse:
        return StubResponse(component="P3", message="peer reviews land with P3 reputation")

    @app.post("/v1/spawn", response_model=StubResponse, tags=["write"])
    async def spawn() -> StubResponse:
        return StubResponse(
            component="P1+P5",
            message="spawn flow goes through runtime/spawn + composite ChildManifest",
        )

    @app.post("/v1/submit_composition_link", response_model=StubResponse, tags=["write"])
    async def submit_composition_link() -> StubResponse:
        return StubResponse(component="P5", message="composition links land with P5")

    @app.post("/v1/submit_finalization", response_model=StubResponse, tags=["write"])
    async def submit_finalization() -> StubResponse:
        return StubResponse(
            component="P3",
            message="N-of-M finalization signals land with P3 reputation",
        )

    @app.post("/v1/submit_attestation", response_model=StubResponse, tags=["write"])
    async def submit_attestation() -> StubResponse:
        return StubResponse(
            component="P2",
            message=(
                "use vacant.identity.issue_attestation + halo.publish; HTTP "
                "envelope schema lands with P6"
            ),
        )

    @app.post("/v1/sink", response_model=StubResponse, tags=["write"])
    async def sink() -> StubResponse:
        return StubResponse(component="P1", message="sink flow lives in runtime")

    @app.post("/v1/report_anomaly", response_model=StubResponse, tags=["write"])
    async def report_anomaly() -> StubResponse:
        return StubResponse(
            component="P3",
            message=("report-only stub; auto-freeze rules wired in P3 + P4 anomaly engine"),
        )

    @app.post("/v1/seal_epoch", response_model=EpochResponse, tags=["write"])
    async def seal_epoch_endpoint() -> EpochResponse:
        # Internal — exposed for ops scripts. Production cron drives this.
        raise HTTPException(
            status_code=501,
            detail="seal_epoch HTTP path is internal; call store.seal_epoch() from cron",
        )

    # --- reads (13) --------------------------------------------------------

    @app.get(
        "/v1/capability_card/{vacant_id}",
        response_model=HaloMatchResponse,
        tags=["read"],
    )
    async def get_capability_card(
        vacant_id: str,
        caller: str | None = Query(default=None, description="caller vacant_id hex"),
    ) -> HaloMatchResponse:
        try:
            v = await store.lookup_halo_for_caller(vacant_id, caller_pubkey_hex=caller)
        except NotFoundError as exc:
            raise HTTPException(status_code=404, detail=str(exc)) from exc
        except VisibilityViolation as exc:
            raise HTTPException(status_code=403, detail=str(exc)) from exc
        from vacant.registry.aggregation import _to_match

        return _match_to_response(_to_match(v))

    @app.post(
        "/v1/query_capability",
        response_model=CapabilitySearchResponse,
        tags=["read"],
    )
    async def query_capability(
        capability: str | None = Query(default=None),
        family: str | None = Query(default=None),
        limit: int = Query(default=20, ge=1, le=100),
    ) -> CapabilitySearchResponse:
        matches = await search_capability(store=store, query=capability, family=family, limit=limit)
        # F6: use the wired-in oracle (P3 Aggregator in production /
        # demo) instead of the zero-score default, otherwise the public
        # capability search returns matches in arbitrary insertion order.
        ranked = await rank_by_reputation(matches, oracle=oracle)
        return CapabilitySearchResponse(matches=[_match_to_response(m) for m in ranked])

    @app.get("/v1/reputation/{vacant_id}", response_model=StubResponse, tags=["read"])
    async def get_reputation(vacant_id: str) -> StubResponse:
        _ = vacant_id
        return StubResponse(component="P3", message="reputation snapshots land with P3")

    @app.get(
        "/v1/reputation_history/{vacant_id}",
        response_model=StubResponse,
        tags=["read"],
    )
    async def get_reputation_history(vacant_id: str) -> StubResponse:
        _ = vacant_id
        return StubResponse(component="P3", message="reputation history lands with P3")

    @app.get("/v1/event_log/{vacant_id}", response_model=list[EventResponse], tags=["read"])
    async def get_event_log(
        vacant_id: str,
        from_seq: int = Query(default=0, ge=0),
        limit: int = Query(default=100, ge=1, le=500),
    ) -> list[EventResponse]:
        rows = await store.list_events_for_vacant(vacant_id, from_seq=from_seq, limit=limit)
        return [
            EventResponse(
                seq=r.seq or 0,
                event_type=r.event_type,
                actor_vacant_id=r.actor_vacant_id,
                subject_vacant_id=r.subject_vacant_id,
                payload_json=r.payload_json,
                event_hash_hex=r.event_hash.hex(),
                actor_seq=r.actor_seq,
                ts=r.ts,
            )
            for r in rows
        ]

    @app.get("/v1/event/{seq}", response_model=EventResponse, tags=["read"])
    async def get_event(seq: int) -> EventResponse:
        row = await store.get_event(seq)
        if row is None:
            raise HTTPException(status_code=404, detail=f"event seq={seq} not found")
        return EventResponse(
            seq=row.seq or 0,
            event_type=row.event_type,
            actor_vacant_id=row.actor_vacant_id,
            subject_vacant_id=row.subject_vacant_id,
            payload_json=row.payload_json,
            event_hash_hex=row.event_hash.hex(),
            actor_seq=row.actor_seq,
            ts=row.ts,
        )

    @app.get("/v1/lineage/{vacant_id}", response_model=LineageResponse, tags=["read"])
    async def get_lineage(
        vacant_id: str,
        direction: Literal["descendants", "ancestors"] = Query(default="descendants"),
        depth: int = Query(default=8, ge=1, le=32),
    ) -> LineageResponse:
        try:
            chain = await lineage_query(
                store=store, vacant_id=vacant_id, direction=direction, depth=depth
            )
        except NotFoundError as exc:
            raise HTTPException(status_code=404, detail=str(exc)) from exc
        return LineageResponse(vacant_id=vacant_id, direction=direction, chain=chain)

    @app.get(
        "/v1/composition_links/{vacant_id}",
        response_model=StubResponse,
        tags=["read"],
    )
    async def get_composition_links(vacant_id: str) -> StubResponse:
        _ = vacant_id
        return StubResponse(component="P5", message="composition links land with P5")

    @app.get("/v1/sink_record/{vacant_id}", response_model=StubResponse, tags=["read"])
    async def get_sink_record(vacant_id: str) -> StubResponse:
        _ = vacant_id
        return StubResponse(
            component="P1+P3",
            message="sink_record table is populated by runtime+reputation",
        )

    @app.get("/v1/freeze_status/{vacant_id}", response_model=StubResponse, tags=["read"])
    async def get_freeze_status(vacant_id: str) -> StubResponse:
        _ = vacant_id
        return StubResponse(
            component="P3+P4",
            message="freeze table populated by anomaly engine + governance",
        )

    @app.get("/v1/revocation_list", response_model=list[str], tags=["read"])
    async def get_revocation_list() -> list[str]:
        # Returns vacant_ids whose status is `revoked`.
        rows = await store.search_capability(
            capability=None,
            family=None,
            status="revoked",
            visibility=None,
            limit=10_000,
        )
        return [r.vacant_id for r in rows]

    @app.get("/v1/epoch/{epoch_id}", response_model=EpochResponse, tags=["read"])
    async def get_epoch(epoch_id: int) -> EpochResponse:
        epoch = await store.get_merkle_epoch(epoch_id)
        if epoch is None:
            raise HTTPException(status_code=404, detail=f"epoch_id={epoch_id} not found")
        return EpochResponse(
            epoch_id=epoch.epoch_id or 0,
            first_seq=epoch.first_seq,
            last_seq=epoch.last_seq,
            tree_size=epoch.tree_size,
            root_hash_hex=epoch.root_hash.hex(),
            sealed_at=epoch.sealed_at,
            registry_signature_hex=epoch.registry_signature.hex(),
        )

    @app.get("/v1/epoch_root/latest", response_model=EpochResponse, tags=["read"])
    async def get_latest_epoch_root() -> EpochResponse:
        epoch = await store.latest_merkle_epoch()
        if epoch is None:
            raise HTTPException(status_code=404, detail="no sealed epoch yet")
        return EpochResponse(
            epoch_id=epoch.epoch_id or 0,
            first_seq=epoch.first_seq,
            last_seq=epoch.last_seq,
            tree_size=epoch.tree_size,
            root_hash_hex=epoch.root_hash.hex(),
            sealed_at=epoch.sealed_at,
            registry_signature_hex=epoch.registry_signature.hex(),
        )

    return app

antitamper

Six anti-tamper layers (dispatch §5):

  1. Signature verify — every write checks the actor's Ed25519 signature against the canonical event bytes before insert.
  2. Sequence-number monotonicity — per-vacant actor_seq strictly increases; out-of-order writes are rejected.
  3. Freshness window — attestations carry a validity window; stale attestations fail at submit_attestation time and again at consume time.
  4. Merkle-root snapshotsseal_epoch() builds a balanced Merkle tree over all unsealed event hashes and stores the root + the registry operator's signature on it.
  5. Anomaly counters — rule-based windows over rep-jump, review bursts, spawn rates; surfaced as a triggered flag, not a hard block.
  6. Append-only audit log — every signed write also lands in the event log; DELETE on event is rejected at the store layer.

These are pure functions (no I/O); the store wires them in before commit.

MerkleProof dataclass

MerkleProof(leaf_index: int, leaf: bytes, siblings: tuple[bytes, ...])

Inclusion proof: sibling hashes from leaf up to (but excluding) root.

The position of each sibling (left vs right) is reconstructed by walking the bits of leaf_index rather than tagging each sibling explicitly — saves a byte per level and matches RFC 6962.

Attributes:

Name Type Description
leaf_index int

Position of the leaf in the original sequence.

leaf bytes

The hashed leaf (BLAKE2b(b"\x00" || preimage)).

siblings tuple[bytes, ...]

Hashes of the sibling at each level, leaf side up. Length equals log2(padded_n).

AnomalyAssessment dataclass

AnomalyAssessment(metric: str, value: float, threshold: float, triggered: bool)

Outcome of evaluating one anomaly counter against its threshold.

Attributes:

Name Type Description
metric str

Counter name (e.g. "rep_jump_24h").

value float

Measured value.

threshold float

Threshold from the operator's configuration.

triggered bool

True iff value >= threshold. Surfaced to the operator as a flag, not as a hard reject — anomaly counters are detection signals, not authorisation gates.

canonical_event_bytes

canonical_event_bytes(*, event_type: str, actor_vacant_id: str, subject_vacant_id: str | None, payload_hash: bytes, idempotency_key: str, signed_by_pubkey: bytes, ts: int, actor_seq: int) -> bytes

Build the canonical byte form of a registry event.

The same byte string is fed to both sign() (when the actor creates the event) and verify() (when the registry accepts it). It is also the pre-image of event_hash. Matches P4 §3.1 hash- chain canonical rules (modulo BLAKE2b vs BLAKE3 — see D006 §A).

Parameters:

Name Type Description Default
event_type str

Event kind (e.g. "halo_publish", "review").

required
actor_vacant_id str

Hex of the vacant submitting the event.

required
subject_vacant_id str | None

Optional hex of the vacant the event is about (e.g. the target of a review). Empty string when absent.

required
payload_hash bytes

BLAKE2b of the event-specific payload.

required
idempotency_key str

Caller-supplied identifier for de-dup.

required
signed_by_pubkey bytes

Raw 32-byte Ed25519 pubkey expected to have signed the event.

required
ts int

Unix timestamp in seconds (or epoch-resolution of choice).

required
actor_seq int

Strictly-increasing per-actor sequence number.

required

Returns:

Type Description
bytes

Bytes with the eight fields joined by the 0x1f separator,

bytes

suitable for signing or verification.

Source code in src/vacant/registry/antitamper.py
def canonical_event_bytes(
    *,
    event_type: str,
    actor_vacant_id: str,
    subject_vacant_id: str | None,
    payload_hash: bytes,
    idempotency_key: str,
    signed_by_pubkey: bytes,
    ts: int,
    actor_seq: int,
) -> bytes:
    """Build the canonical byte form of a registry event.

    The same byte string is fed to both `sign()` (when the actor
    creates the event) and `verify()` (when the registry accepts it).
    It is also the pre-image of `event_hash`. Matches P4 §3.1 hash-
    chain canonical rules (modulo BLAKE2b vs BLAKE3 — see D006 §A).

    Args:
        event_type: Event kind (e.g. `"halo_publish"`, `"review"`).
        actor_vacant_id: Hex of the vacant submitting the event.
        subject_vacant_id: Optional hex of the vacant the event is
            *about* (e.g. the target of a review). Empty string when
            absent.
        payload_hash: BLAKE2b of the event-specific payload.
        idempotency_key: Caller-supplied identifier for de-dup.
        signed_by_pubkey: Raw 32-byte Ed25519 pubkey expected to have
            signed the event.
        ts: Unix timestamp in seconds (or epoch-resolution of choice).
        actor_seq: Strictly-increasing per-actor sequence number.

    Returns:
        Bytes with the eight fields joined by the `0x1f` separator,
        suitable for signing or verification.
    """
    return b"\x1f".join(
        [
            event_type.encode("utf-8"),
            actor_vacant_id.encode("utf-8"),
            (subject_vacant_id or "").encode("utf-8"),
            payload_hash,
            idempotency_key.encode("utf-8"),
            signed_by_pubkey,
            ts.to_bytes(8, "big"),
            actor_seq.to_bytes(8, "big"),
        ]
    )

verify_event_signature

verify_event_signature(*, pubkey_bytes: bytes, canonical_bytes: bytes, signature: bytes) -> None

Verify an event signature, raising on any failure.

Parameters:

Name Type Description Default
pubkey_bytes bytes

Raw 32-byte Ed25519 pubkey to verify under.

required
canonical_bytes bytes

Output of canonical_event_bytes(...).

required
signature bytes

Ed25519 signature claimed by the actor.

required

Raises:

Type Description
SignatureRejected

If pubkey_bytes is malformed, or the signature does not validate over canonical_bytes.

Source code in src/vacant/registry/antitamper.py
def verify_event_signature(
    *,
    pubkey_bytes: bytes,
    canonical_bytes: bytes,
    signature: bytes,
) -> None:
    """Verify an event signature, raising on any failure.

    Args:
        pubkey_bytes: Raw 32-byte Ed25519 pubkey to verify under.
        canonical_bytes: Output of `canonical_event_bytes(...)`.
        signature: Ed25519 signature claimed by the actor.

    Raises:
        SignatureRejected: If `pubkey_bytes` is malformed, or the
            signature does not validate over `canonical_bytes`.
    """
    try:
        vk = pubkey_from_bytes(pubkey_bytes)
    except Exception as exc:
        raise SignatureRejected(f"invalid pubkey: {exc}") from exc
    if not verify(vk, canonical_bytes, signature):
        raise SignatureRejected("event signature did not verify")

compute_event_hash

compute_event_hash(*, prev_event_hash: bytes, canonical_bytes: bytes, signature: bytes) -> bytes

Compute the hash that links one event to the next in the chain.

The signature is mixed in so two events with identical canonical bytes but distinct actors (one impersonating the other) cannot collide — defensive against an adversary who somehow forged a canonical-byte collision.

Parameters:

Name Type Description Default
prev_event_hash bytes

Hash of the previous event in the chain.

required
canonical_bytes bytes

Output of canonical_event_bytes(...).

required
signature bytes

Actor's Ed25519 signature over canonical_bytes.

required

Returns:

Type Description
bytes

BLAKE2b(prev || canonical || signature). Stored on the event

bytes

row and used as prev_event_hash for the next insert.

Source code in src/vacant/registry/antitamper.py
def compute_event_hash(
    *, prev_event_hash: bytes, canonical_bytes: bytes, signature: bytes
) -> bytes:
    """Compute the hash that links one event to the next in the chain.

    The signature is mixed in so two events with identical canonical
    bytes but distinct actors (one impersonating the other) cannot
    collide — defensive against an adversary who somehow forged a
    canonical-byte collision.

    Args:
        prev_event_hash: Hash of the previous event in the chain.
        canonical_bytes: Output of `canonical_event_bytes(...)`.
        signature: Actor's Ed25519 signature over `canonical_bytes`.

    Returns:
        `BLAKE2b(prev || canonical || signature)`. Stored on the event
        row and used as `prev_event_hash` for the next insert.
    """
    return hash_blake2b(prev_event_hash + canonical_bytes + signature)

check_sequence_monotonic

check_sequence_monotonic(*, last_seq: int, candidate_seq: int) -> None

Enforce strict-by-one monotonicity for a per-actor sequence number.

CONSTANTS.md pins "Sequence-number monotonicity tolerance: 0 (strict)" — the candidate must be exactly last_seq + 1, not just > last_seq. The strict form catches both reordering attacks (where a stale event is replayed) and gap-introduction attacks (where a malicious actor bumps the sequence to skip auditable history).

Parameters:

Name Type Description Default
last_seq int

Highest actor_seq already accepted for this actor. Use 0 for a fresh actor (their first event must claim actor_seq=1).

required
candidate_seq int

The actor_seq claimed by the inbound event.

required

Raises:

Type Description
SequenceMonotonicityError

If candidate_seq != last_seq + 1.

Source code in src/vacant/registry/antitamper.py
def check_sequence_monotonic(*, last_seq: int, candidate_seq: int) -> None:
    """Enforce strict-by-one monotonicity for a per-actor sequence number.

    `CONSTANTS.md` pins "Sequence-number monotonicity tolerance: 0
    (strict)" — the candidate must be **exactly** `last_seq + 1`, not
    just `> last_seq`. The strict form catches both reordering attacks
    (where a stale event is replayed) and gap-introduction attacks
    (where a malicious actor bumps the sequence to skip auditable
    history).

    Args:
        last_seq: Highest `actor_seq` already accepted for this actor.
            Use `0` for a fresh actor (their first event must claim
            `actor_seq=1`).
        candidate_seq: The `actor_seq` claimed by the inbound event.

    Raises:
        SequenceMonotonicityError: If `candidate_seq != last_seq + 1`.
    """
    expected = last_seq + 1
    if candidate_seq != expected:
        raise SequenceMonotonicityError(
            f"actor_seq must equal last_seq + 1 = {expected}, got {candidate_seq}"
        )

check_attestation_freshness

check_attestation_freshness(*, valid_from_ms: int, valid_until_ms: int | None, now_ms: int) -> None

Reject attestations that are outside their validity window.

Parameters:

Name Type Description Default
valid_from_ms int

Earliest moment the attestation should be accepted, in milliseconds since epoch.

required
valid_until_ms int | None

Latest moment, or None for no upstream ceiling. The aggregator may still apply its own ceiling at consume time.

required
now_ms int

Wall-clock timestamp the registry will compare against.

required

Raises:

Type Description
FreshnessError

If now_ms < valid_from_ms ("not yet valid") or, when valid_until_ms is set, now_ms > valid_until_ms ("expired").

Source code in src/vacant/registry/antitamper.py
def check_attestation_freshness(
    *,
    valid_from_ms: int,
    valid_until_ms: int | None,
    now_ms: int,
) -> None:
    """Reject attestations that are outside their validity window.

    Args:
        valid_from_ms: Earliest moment the attestation should be
            accepted, in milliseconds since epoch.
        valid_until_ms: Latest moment, or `None` for no upstream
            ceiling. The aggregator may still apply its own ceiling at
            consume time.
        now_ms: Wall-clock timestamp the registry will compare
            against.

    Raises:
        FreshnessError: If `now_ms < valid_from_ms` ("not yet valid")
            or, when `valid_until_ms` is set, `now_ms > valid_until_ms`
            ("expired").
    """
    if now_ms < valid_from_ms:
        raise FreshnessError(
            f"attestation not yet valid (now={now_ms} < valid_from={valid_from_ms})"
        )
    if valid_until_ms is not None and now_ms > valid_until_ms:
        raise FreshnessError(f"attestation expired (now={now_ms} > valid_until={valid_until_ms})")

build_merkle_tree

build_merkle_tree(leaves: Sequence[bytes]) -> list[list[bytes]]

Build the full Merkle tree as a list of levels.

Parameters:

Name Type Description Default
leaves Sequence[bytes]

Pre-image bytes for each leaf. Order is significant — inclusion proofs index into this order.

required

Returns:

Type Description
list[list[bytes]]

A list of levels, leaves first, root last (so the root is at

list[list[bytes]]

tree[-1][0]). For an empty input the tree is

list[list[bytes]]

[[BLAKE2b(b"\x00")]] so empty epochs still have a stable

list[list[bytes]]

root shape.

Source code in src/vacant/registry/antitamper.py
def build_merkle_tree(leaves: Sequence[bytes]) -> list[list[bytes]]:
    """Build the full Merkle tree as a list of levels.

    Args:
        leaves: Pre-image bytes for each leaf. Order is significant —
            inclusion proofs index into this order.

    Returns:
        A list of levels, leaves first, root last (so the root is at
        `tree[-1][0]`). For an empty input the tree is
        `[[BLAKE2b(b"\\x00")]]` so empty epochs still have a stable
        root shape.
    """
    hashed = [_leaf(b) for b in leaves]
    if not hashed:
        return [[hash_blake2b(b"\x00")]]
    padded = _pad_to_power_of_two(hashed)
    levels = [padded]
    while len(levels[-1]) > 1:
        prev = levels[-1]
        nxt = [_node(prev[i], prev[i + 1]) for i in range(0, len(prev), 2)]
        levels.append(nxt)
    return levels

build_merkle_root

build_merkle_root(leaves: Sequence[bytes]) -> bytes

Build only the root (convenience wrapper).

Parameters:

Name Type Description Default
leaves Sequence[bytes]

Pre-image bytes for each leaf, in deterministic order.

required

Returns:

Type Description
bytes

The 32-byte root hash. For empty input, a stable empty-epoch

bytes

root.

Source code in src/vacant/registry/antitamper.py
def build_merkle_root(leaves: Sequence[bytes]) -> bytes:
    """Build only the root (convenience wrapper).

    Args:
        leaves: Pre-image bytes for each leaf, in deterministic order.

    Returns:
        The 32-byte root hash. For empty input, a stable empty-epoch
        root.
    """
    return build_merkle_tree(leaves)[-1][0]

merkle_inclusion_proof

merkle_inclusion_proof(leaves: Sequence[bytes], leaf_index: int) -> MerkleProof

Build an inclusion proof for the leaf at leaf_index.

Parameters:

Name Type Description Default
leaves Sequence[bytes]

The full leaf sequence the tree was built over.

required
leaf_index int

Index into leaves.

required

Returns:

Type Description
MerkleProof

A MerkleProof whose verify_inclusion_proof(...) will

MerkleProof

succeed against the root of the same tree.

Raises:

Type Description
IndexError

If leaf_index is out of range.

Source code in src/vacant/registry/antitamper.py
def merkle_inclusion_proof(leaves: Sequence[bytes], leaf_index: int) -> MerkleProof:
    """Build an inclusion proof for the leaf at `leaf_index`.

    Args:
        leaves: The full leaf sequence the tree was built over.
        leaf_index: Index into `leaves`.

    Returns:
        A `MerkleProof` whose `verify_inclusion_proof(...)` will
        succeed against the root of the same tree.

    Raises:
        IndexError: If `leaf_index` is out of range.
    """
    if leaf_index < 0 or leaf_index >= len(leaves):
        raise IndexError(f"leaf_index {leaf_index} out of range for {len(leaves)} leaves")
    levels = build_merkle_tree(leaves)
    siblings: list[bytes] = []
    idx = leaf_index
    for level in levels[:-1]:
        sibling_idx = idx ^ 1
        siblings.append(level[sibling_idx])
        idx //= 2
    return MerkleProof(
        leaf_index=leaf_index,
        leaf=_leaf(leaves[leaf_index]),
        siblings=tuple(siblings),
    )

verify_inclusion_proof

verify_inclusion_proof(proof: MerkleProof, root: bytes) -> bool

Verify that proof.leaf is included in a tree with root.

Parameters:

Name Type Description Default
proof MerkleProof

The proof returned by merkle_inclusion_proof.

required
root bytes

The expected Merkle root.

required

Returns:

Type Description
bool

True iff folding proof.leaf upward with proof.siblings

bool

(using proof.leaf_index to decide left/right at each level)

bool

yields root.

Source code in src/vacant/registry/antitamper.py
def verify_inclusion_proof(proof: MerkleProof, root: bytes) -> bool:
    """Verify that `proof.leaf` is included in a tree with `root`.

    Args:
        proof: The proof returned by `merkle_inclusion_proof`.
        root: The expected Merkle root.

    Returns:
        `True` iff folding `proof.leaf` upward with `proof.siblings`
        (using `proof.leaf_index` to decide left/right at each level)
        yields `root`.
    """
    h = proof.leaf
    idx = proof.leaf_index
    for sib in proof.siblings:
        if idx % 2 == 0:
            h = _node(h, sib)
        else:
            h = _node(sib, h)
        idx //= 2
    return h == root

sign_epoch_root

sign_epoch_root(*, root: bytes, signing_key: SigningKey) -> bytes

Operator-key signature over an epoch root.

Parameters:

Name Type Description Default
root bytes

The epoch's Merkle root.

required
signing_key SigningKey

The registry operator's private key.

required

Returns:

Type Description
bytes

Ed25519 signature over b"vacant:registry:epoch:" || root.

bytes

The domain-separation prefix prevents the signature from being

bytes

replayed against a non-epoch payload that happens to start

bytes

with these bytes.

Source code in src/vacant/registry/antitamper.py
def sign_epoch_root(*, root: bytes, signing_key: SigningKey) -> bytes:
    """Operator-key signature over an epoch root.

    Args:
        root: The epoch's Merkle root.
        signing_key: The registry operator's private key.

    Returns:
        Ed25519 signature over `b"vacant:registry:epoch:" || root`.
        The domain-separation prefix prevents the signature from being
        replayed against a non-epoch payload that happens to start
        with these bytes.
    """
    return sign(signing_key, b"vacant:registry:epoch:" + root)

verify_epoch_signature

verify_epoch_signature(*, root: bytes, signature: bytes, operator_pubkey: VerifyKey) -> bool

Verify a previously-signed epoch root.

Parameters:

Name Type Description Default
root bytes

The epoch's Merkle root.

required
signature bytes

Output of sign_epoch_root.

required
operator_pubkey VerifyKey

The expected operator verify-key.

required

Returns:

Type Description
bool

True iff signature validates over the domain-separated

bool

epoch payload.

Source code in src/vacant/registry/antitamper.py
def verify_epoch_signature(*, root: bytes, signature: bytes, operator_pubkey: VerifyKey) -> bool:
    """Verify a previously-signed epoch root.

    Args:
        root: The epoch's Merkle root.
        signature: Output of `sign_epoch_root`.
        operator_pubkey: The expected operator verify-key.

    Returns:
        `True` iff `signature` validates over the domain-separated
        epoch payload.
    """
    return verify(operator_pubkey, b"vacant:registry:epoch:" + root, signature)

assess_anomaly

assess_anomaly(*, metric: str, value: float, threshold: float) -> AnomalyAssessment

Compare value to threshold and package as an AnomalyAssessment.

Parameters:

Name Type Description Default
metric str

Counter name.

required
value float

Measured value.

required
threshold float

Threshold to compare against.

required

Returns:

Type Description
AnomalyAssessment

An AnomalyAssessment with triggered=True iff

AnomalyAssessment

value >= threshold.

Source code in src/vacant/registry/antitamper.py
def assess_anomaly(*, metric: str, value: float, threshold: float) -> AnomalyAssessment:
    """Compare `value` to `threshold` and package as an `AnomalyAssessment`.

    Args:
        metric: Counter name.
        value: Measured value.
        threshold: Threshold to compare against.

    Returns:
        An `AnomalyAssessment` with `triggered=True` iff
        `value >= threshold`.
    """
    return AnomalyAssessment(
        metric=metric,
        value=value,
        threshold=threshold,
        triggered=value >= threshold,
    )

sha256_hex

sha256_hex(data: bytes) -> str

SHA-256 hex digest helper for test-fixture ergonomics.

BLAKE2b is the canonical hash everywhere in the registry; this helper exists only because some fixtures predate the BLAKE2b canonicalisation and retain SHA-256 inputs.

Parameters:

Name Type Description Default
data bytes

Bytes to digest.

required

Returns:

Type Description
str

Lowercase 64-character hex digest.

Source code in src/vacant/registry/antitamper.py
def sha256_hex(data: bytes) -> str:
    """SHA-256 hex digest helper for test-fixture ergonomics.

    BLAKE2b is the canonical hash everywhere in the registry; this
    helper exists only because some fixtures predate the BLAKE2b
    canonicalisation and retain SHA-256 inputs.

    Args:
        data: Bytes to digest.

    Returns:
        Lowercase 64-character hex digest.
    """
    return hashlib.sha256(data).hexdigest()

visibility

Halo visibility — the registry_visibility axis of THEORY_V5 §1.1's three-axis ontology (registry_visibility x endpoint_reachability x outbound_policy).

Visibility.NONE matches VacantState.LOCAL per CLAUDE.md §LOCAL: a LOCAL vacant runs and signs but is not published to the public index. The two concepts overlap at the discovery layer: effective_visibility collapses the runtime state into the externally-observable visibility.

Visibility

Bases: StrEnum

Discovery visibility for a halo record.

NONE class-attribute instance-attribute

NONE = 'NONE'

Not in any public index. Reachable only via owner/parent direct path.

RESTRICTED class-attribute instance-attribute

RESTRICTED = 'RESTRICTED'

Indexed but only revealed to authenticated callers (P5/P6 future).

PUBLIC class-attribute instance-attribute

PUBLIC = 'PUBLIC'

Default for ACTIVE-state vacants — fully discoverable.

effective_visibility

effective_visibility(state: VacantState, registry_visibility: Visibility) -> Visibility

Compute the discovery-layer visibility from runtime state + setting.

LOCAL state forces NONE regardless of registry_visibility — CLAUDE.md §LOCAL is load-bearing: a LOCAL vacant must not appear in the public index even if its capability card was previously published.

Sunk / Archived states keep their existing visibility (the halo is historically retained per THEORY_V5 §4.1) — but is_runnable(state) is False, so callers see the record but cannot make new calls.

Source code in src/vacant/registry/visibility.py
def effective_visibility(state: VacantState, registry_visibility: Visibility) -> Visibility:
    """Compute the discovery-layer visibility from runtime state + setting.

    LOCAL state forces `NONE` regardless of `registry_visibility` —
    CLAUDE.md §LOCAL is load-bearing: a LOCAL vacant must not appear in
    the public index even if its capability card was previously published.

    Sunk / Archived states keep their existing visibility (the halo is
    historically retained per THEORY_V5 §4.1) — but `is_runnable(state)`
    is False, so callers see the record but cannot make new calls.
    """
    if state == VacantState.LOCAL:
        return Visibility.NONE
    return registry_visibility

errors

Error hierarchy for vacant.registry.

RegistryError

Bases: CoreError

Base class for vacant.registry errors.

RegistryWriteError

Bases: RegistryError

A write violated an anti-tamper invariant before commit.

SignatureRejected

Bases: RegistryWriteError

A submitted envelope's signature did not verify (anti-tamper L1).

SequenceMonotonicityError

Bases: RegistryWriteError

A submitted event's per-vacant sequence is not strictly greater than the last one (anti-tamper L2).

FreshnessError

Bases: RegistryWriteError

An attestation is outside its freshness window (anti-tamper L3).

IdempotencyConflict

Bases: RegistryWriteError

The same idempotency_key was used with a different canonical payload hash (P4 §2.6 double-spend protection).

VisibilityViolation

Bases: RegistryError

A read attempt crossed a visibility boundary (e.g. stranger requesting a LOCAL vacant's halo).

NotFoundError

Bases: RegistryError

The requested record does not exist.

AppendOnlyViolation

Bases: RegistryWriteError

A DELETE was attempted against an append-only table (anti-tamper L6).