Skip to content

vacant.identity

P2 identity surface — Ed25519 keypairs, layered identity (L0–L3), wash cost, peer attestations, and the federated root set / rotation chain.

keys

Keypair lifecycle: vaults + rotation + revocation.

P2 owns the lifecycle of Ed25519 keypairs that P0 placed in core/crypto.py. This module exposes:

  • KeyVault ABC + two concrete impls (InMemoryVault for tests, FileVault for encrypted-at-rest local storage)
  • rotate_key(...) — atomic rotation that emits a KEY_ROTATION log entry signed by both the old and the new key, so a future verifier can reconstruct the chain of custody from the logbook alone.
  • revoke_key(...) — terminal KEY_REVOCATION log entry signed by the key being revoked. Subsequent attempts to sign with that key are a caller bug; downstream code that consults the logbook can detect the revocation by inspecting the trailing entry.

Real HSM / TEE integration (THEORY_V5 §0.1) is intentionally a TODO; the KeyVault ABC is the seam.

KeyVault

Bases: ABC

Abstract key-of-record store. Real HSM / TEE impls plug in here.

All operations are sync (vaults are typically tiny local stores; making them async forces every caller into asyncio for no benefit). I/O-heavy implementations can wrap themselves in asyncio.to_thread at the call site.

store abstractmethod

store(key_id: str, signing_key: SigningKey) -> None

Persist signing_key under key_id.

Parameters:

Name Type Description Default
key_id str

Non-empty identifier. Implementations may impose additional restrictions (e.g. FileVault rejects separators).

required
signing_key SigningKey

The Ed25519 private key to store.

required

Raises:

Type Description
KeyVaultError

On invalid key_id or write failure.

Source code in src/vacant/identity/keys.py
@abstractmethod
def store(self, key_id: str, signing_key: SigningKey) -> None:
    """Persist `signing_key` under `key_id`.

    Args:
        key_id: Non-empty identifier. Implementations may impose
            additional restrictions (e.g. `FileVault` rejects
            separators).
        signing_key: The Ed25519 private key to store.

    Raises:
        KeyVaultError: On invalid `key_id` or write failure.
    """

load abstractmethod

load(key_id: str) -> SigningKey

Retrieve the signing key registered under key_id.

Parameters:

Name Type Description Default
key_id str

Identifier previously passed to store().

required

Returns:

Type Description
SigningKey

The reconstituted SigningKey.

Raises:

Type Description
KeyNotFoundError

If key_id is not registered.

KeyVaultError

On decryption / decode failure (e.g. wrong passphrase for FileVault).

Source code in src/vacant/identity/keys.py
@abstractmethod
def load(self, key_id: str) -> SigningKey:
    """Retrieve the signing key registered under `key_id`.

    Args:
        key_id: Identifier previously passed to `store()`.

    Returns:
        The reconstituted `SigningKey`.

    Raises:
        KeyNotFoundError: If `key_id` is not registered.
        KeyVaultError: On decryption / decode failure (e.g.
            wrong passphrase for `FileVault`).
    """

delete abstractmethod

delete(key_id: str) -> None

Remove key_id from the vault.

Parameters:

Name Type Description Default
key_id str

Identifier previously passed to store().

required

Raises:

Type Description
KeyNotFoundError

If key_id is not registered.

Source code in src/vacant/identity/keys.py
@abstractmethod
def delete(self, key_id: str) -> None:
    """Remove `key_id` from the vault.

    Args:
        key_id: Identifier previously passed to `store()`.

    Raises:
        KeyNotFoundError: If `key_id` is not registered.
    """

has abstractmethod

has(key_id: str) -> bool

Check whether key_id is registered.

Parameters:

Name Type Description Default
key_id str

Identifier to look up.

required

Returns:

Type Description
bool

True if load(key_id) would succeed, False otherwise.

bool

Implementations should not raise for missing keys here.

Source code in src/vacant/identity/keys.py
@abstractmethod
def has(self, key_id: str) -> bool:
    """Check whether `key_id` is registered.

    Args:
        key_id: Identifier to look up.

    Returns:
        `True` if `load(key_id)` would succeed, `False` otherwise.
        Implementations should not raise for missing keys here.
    """

InMemoryVault

InMemoryVault()

Bases: KeyVault

Reference impl. Not durable; not thread-safe; for tests + demo.

Source code in src/vacant/identity/keys.py
def __init__(self) -> None:
    self._data: dict[str, bytes] = {}

FileVault

FileVault(root: PathLike[str] | str, passphrase: bytes | str)

Bases: KeyVault

File-backed vault, AES-GCM under PBKDF2 via cryptography.fernet.

The passphrase is supplied at construction (callers should source it from an env var or OS keyring; the vault never logs or stringifies it). Each key_id becomes one file <root>/<key_id>.vault. PBKDF2 iteration count tracks 2026 OWASP guidance for SHA-256 (>= 600k); rotating that constant requires re-encrypting existing blobs.

Parameters:

Name Type Description Default
root PathLike[str] | str

Directory to write vault files into. Created if missing.

required
passphrase bytes | str

Non-empty secret. str is encoded as UTF-8.

required

Raises:

Type Description
KeyVaultError

If passphrase is empty.

Source code in src/vacant/identity/keys.py
def __init__(self, root: os.PathLike[str] | str, passphrase: bytes | str) -> None:
    from pathlib import Path

    self._root = Path(root)
    self._root.mkdir(parents=True, exist_ok=True)
    self._pass = passphrase.encode("utf-8") if isinstance(passphrase, str) else passphrase
    if not self._pass:
        raise KeyVaultError("FileVault: passphrase must be non-empty")

RotationRecord dataclass

RotationRecord(new_signing_key: SigningKey, new_verify_key: VerifyKey, entry: LogEntry)

Result of rotate_key.

Attributes:

Name Type Description
new_signing_key SigningKey

The freshly-generated private key. Caller is responsible for storing it in the vault under whatever key_id they want.

new_verify_key VerifyKey

Public side of new_signing_key.

entry LogEntry

The KEY_ROTATION log entry that was appended to the logbook in this same call. Its payload carries the old/new pubkey hashes and the new key's consent signature.

RevocationRecord dataclass

RevocationRecord(entry: LogEntry, reason: str)

Terminal record for a revoked key.

Attributes:

Name Type Description
entry LogEntry

The appended KEY_REVOCATION log entry.

reason str

Free-form explanation (echoed from the call site).

rotate_key

rotate_key(*, old_signing_key: SigningKey, logbook: Logbook) -> RotationRecord

Rotate to a fresh keypair, atomically appending a chain-of-custody entry.

The new entry is signed by both keys: the old key proves it consented to handing off custody (it owns the entry signature); the new key proves it accepted (its consent signature lives inside the payload). Without the new-side consent, a leaked old key could rotate-to-attacker against an unwilling target. With it, a future verifier can reconstruct the entire rotation chain from the logbook alone — no out-of-band state needed.

Parameters:

Name Type Description Default
old_signing_key SigningKey

The currently-active private key. Will sign the resulting log entry.

required
logbook Logbook

The vacant's logbook. A new KEY_ROTATION entry is appended in-place.

required

Returns:

Type Description
RotationRecord

A RotationRecord carrying the new keypair and the appended

RotationRecord

log entry. The caller MUST persist new_signing_key (e.g.

RotationRecord

vault.store(key_id, record.new_signing_key)) and stop using

RotationRecord

old_signing_key for new entries.

Source code in src/vacant/identity/keys.py
def rotate_key(
    *,
    old_signing_key: SigningKey,
    logbook: Logbook,
) -> RotationRecord:
    """Rotate to a fresh keypair, atomically appending a chain-of-custody entry.

    The new entry is signed by both keys: the old key proves it
    consented to handing off custody (it owns the entry signature); the
    new key proves it accepted (its consent signature lives inside the
    payload). Without the new-side consent, a leaked old key could
    rotate-to-attacker against an unwilling target. With it, a future
    verifier can reconstruct the entire rotation chain from the logbook
    alone — no out-of-band state needed.

    Args:
        old_signing_key: The currently-active private key. Will sign
            the resulting log entry.
        logbook: The vacant's logbook. A new `KEY_ROTATION` entry is
            appended in-place.

    Returns:
        A `RotationRecord` carrying the new keypair and the appended
        log entry. The caller MUST persist `new_signing_key` (e.g.
        `vault.store(key_id, record.new_signing_key)`) and stop using
        `old_signing_key` for new entries.
    """
    old_vk = old_signing_key.verify_key
    new_sk, new_vk = keygen()

    old_pubkey_hash = _pubkey_hash(old_vk)
    new_pubkey_hash = _pubkey_hash(new_vk)
    handoff_payload = old_pubkey_hash + b"\x1f" + new_pubkey_hash
    new_key_consent = sign(new_sk, handoff_payload)

    payload = {
        "old_pubkey_hash": old_pubkey_hash.hex(),
        "new_pubkey_hash": new_pubkey_hash.hex(),
        "new_pubkey": bytes(new_vk).hex(),
        "new_key_consent": new_key_consent.hex(),
    }
    entry = logbook.append(KEY_ROTATION_KIND, payload, old_signing_key)
    return RotationRecord(new_signing_key=new_sk, new_verify_key=new_vk, entry=entry)

revoke_key

revoke_key(*, signing_key: SigningKey, logbook: Logbook, reason: str) -> RevocationRecord

Append a terminal KEY_REVOCATION entry signed by the key itself.

The signing key remains cryptographically capable of producing valid signatures after this call — that's precisely why the revocation is published into the auditable logbook: future verifiers consult is_key_revoked() and reject any subsequent signatures from this key. Callers MUST stop signing with the revoked key; this is a contract, not a runtime guard.

Parameters:

Name Type Description Default
signing_key SigningKey

The key being revoked. Signs its own revocation.

required
logbook Logbook

The vacant's logbook. A new KEY_REVOCATION entry is appended in-place.

required
reason str

Non-empty explanation. Stored verbatim in the entry payload.

required

Returns:

Type Description
RevocationRecord

A RevocationRecord carrying the appended entry and the

RevocationRecord

reason.

Raises:

Type Description
KeyRevokedError

If reason is blank or whitespace-only.

Source code in src/vacant/identity/keys.py
def revoke_key(
    *,
    signing_key: SigningKey,
    logbook: Logbook,
    reason: str,
) -> RevocationRecord:
    """Append a terminal `KEY_REVOCATION` entry signed by the key itself.

    The signing key remains cryptographically capable of producing
    valid signatures after this call — that's precisely why the
    revocation is published into the auditable logbook: future
    verifiers consult `is_key_revoked()` and reject any subsequent
    signatures from this key. Callers MUST stop signing with the
    revoked key; this is a contract, not a runtime guard.

    Args:
        signing_key: The key being revoked. Signs its own revocation.
        logbook: The vacant's logbook. A new `KEY_REVOCATION` entry is
            appended in-place.
        reason: Non-empty explanation. Stored verbatim in the entry
            payload.

    Returns:
        A `RevocationRecord` carrying the appended entry and the
        reason.

    Raises:
        KeyRevokedError: If `reason` is blank or whitespace-only.
    """
    if not reason.strip():
        raise KeyRevokedError("revoke_key: reason must be non-empty")
    payload = {
        "pubkey_hash": _pubkey_hash(signing_key.verify_key).hex(),
        "reason": reason,
    }
    entry = logbook.append(KEY_REVOCATION_KIND, payload, signing_key)
    return RevocationRecord(entry=entry, reason=reason)

is_key_revoked

is_key_revoked(logbook: Logbook, vk: VerifyKey) -> bool

Check the logbook for a KEY_REVOCATION entry naming vk.

Parameters:

Name Type Description Default
logbook Logbook

A logbook that has already been verified (verify_chain / verify_chain_or_raise); this function does not re-verify, it only scans the kind + payload.

required
vk VerifyKey

The verify-key whose pubkey_hash is being checked.

required

Returns:

Type Description
bool

True iff at least one entry of kind KEY_REVOCATION whose

bool

payload's pubkey_hash matches vk appears anywhere in

bool

logbook.entries. Order is irrelevant — once revoked, always

bool

revoked.

Source code in src/vacant/identity/keys.py
def is_key_revoked(logbook: Logbook, vk: VerifyKey) -> bool:
    """Check the logbook for a `KEY_REVOCATION` entry naming `vk`.

    Args:
        logbook: A logbook that has already been verified
            (`verify_chain` / `verify_chain_or_raise`); this function
            does not re-verify, it only scans the kind + payload.
        vk: The verify-key whose pubkey_hash is being checked.

    Returns:
        `True` iff at least one entry of kind `KEY_REVOCATION` whose
        payload's `pubkey_hash` matches `vk` appears anywhere in
        `logbook.entries`. Order is irrelevant — once revoked, always
        revoked.
    """
    target = _pubkey_hash(vk).hex()
    for entry in logbook.entries:
        if entry.kind == KEY_REVOCATION_KIND and entry.payload.get("pubkey_hash") == target:
            return True
    return False

layers

L0-L3 layered identity (P2 §2 / dispatch §2).

Distinct frozen Pydantic types — not a class hierarchy — so that a function annotated f(x: L3Identity) rejects an L1Identity at type-check time. Promotion is one-way and explicit:

L0Identity → L1Identity → L2Identity → L3Identity
            (logbook)    (cap card)    (>= N peer attestations)

Each promotion verifies the relevant invariants and raises LayerPromotionError with a precise message on failure. There is no implicit downgrade — once you have an L2Identity, the corresponding logbook is known-good, and callers can rely on that without re-checking.

The did:key textual form (§3.1) is exposed via vacant_id_did_key so downstream code that emits attestations / capability cards can produce the W3C did:key:z… string without re-implementing multibase encoding.

L0Identity dataclass

L0Identity(vacant_id: VacantId)

Just a VacantId — the bare keypair, no logbook checked yet.

L1Identity dataclass

L1Identity(vacant_id: VacantId, logbook: Logbook)

L0 + a logbook whose hash chain + signatures verify against vacant_id.

L2Identity dataclass

L2Identity(vacant_id: VacantId, logbook: Logbook, capability_card: CapabilityCard)

L1 + a CapabilityCard whose vacant_id matches and whose signature verifies against the same key.

L3Identity dataclass

L3Identity(vacant_id: VacantId, logbook: Logbook, capability_card: CapabilityCard, attestations: tuple[object, ...])

L2 + at least MIN_VOUCHERS_FOR_L3_PROMOTION peer attestations.

Attestations are stored as a tuple so the type stays hashable / frozen; the actual PeerAttestation model lives in identity.attestation.

attestations instance-attribute

attestations: tuple[object, ...]

Tuple of PeerAttestation (kept as object to avoid an import cycle; promote_to_l3 validates the concrete type at runtime).

vacant_id_did_key

vacant_id_did_key(vid: VacantId) -> str

Return the did:key:z… form of vid (W3C did:key §6.1).

Encoding: multibase58btc(0xed01 || pubkey_bytes), prefixed did:key:. Uses standard Bitcoin Base58 with the z multibase prefix.

Source code in src/vacant/identity/layers.py
def vacant_id_did_key(vid: VacantId) -> str:
    """Return the `did:key:z…` form of `vid` (W3C did:key §6.1).

    Encoding: multibase58btc(`0xed01` || pubkey_bytes), prefixed `did:key:`.
    Uses standard Bitcoin Base58 with the `z` multibase prefix.
    """
    payload = ED25519_MULTICODEC_PREFIX + vid.pubkey_bytes
    return f"did:key:z{_b58encode(payload)}"

promote_to_l1

promote_to_l1(l0: L0Identity, logbook: Logbook) -> L1Identity

Verify the logbook chain + signatures against l0.vacant_id and return an L1Identity. Raises LayerPromotionError on failure.

Source code in src/vacant/identity/layers.py
def promote_to_l1(l0: L0Identity, logbook: Logbook) -> L1Identity:
    """Verify the logbook chain + signatures against `l0.vacant_id` and
    return an `L1Identity`. Raises `LayerPromotionError` on failure.
    """
    pubkey = l0.vacant_id.verify_key()
    if not logbook.verify_chain(pubkey):
        raise LayerPromotionError(
            f"L0 → L1: logbook chain does not verify against {l0.vacant_id.short()}"
        )
    return L1Identity(vacant_id=l0.vacant_id, logbook=logbook)

promote_to_l2

promote_to_l2(l1: L1Identity, capability_card: CapabilityCard) -> L2Identity

Verify the capability card belongs to the L1 identity and is self-signed; return an L2Identity.

Source code in src/vacant/identity/layers.py
def promote_to_l2(l1: L1Identity, capability_card: CapabilityCard) -> L2Identity:
    """Verify the capability card belongs to the L1 identity and is
    self-signed; return an `L2Identity`.
    """
    if capability_card.vacant_id != l1.vacant_id:
        raise LayerPromotionError(
            f"L1 → L2: capability_card.vacant_id {capability_card.vacant_id} "
            f"does not match L1 vacant_id {l1.vacant_id}"
        )
    if not capability_card.verify():
        raise LayerPromotionError("L1 → L2: capability_card signature does not verify")
    return L2Identity(
        vacant_id=l1.vacant_id,
        logbook=l1.logbook,
        capability_card=capability_card,
    )

promote_to_l3

promote_to_l3(l2: L2Identity, attestations: object, *, min_vouchers: int = MIN_VOUCHERS_FOR_L3_PROMOTION) -> L3Identity

Verify enough peer attestations name this vacant and return L3.

attestations is taken as object to avoid an import cycle with identity.attestation; _validate_attestations checks the concrete type at runtime. Each attestation is verified for: (a) attestee == l2.vacant_id, (b) signature against attester pubkey, (c) freshness window (not expired). Distinct attesters are required — N copies from one attester count as one.

Source code in src/vacant/identity/layers.py
def promote_to_l3(
    l2: L2Identity,
    attestations: object,
    *,
    min_vouchers: int = MIN_VOUCHERS_FOR_L3_PROMOTION,
) -> L3Identity:
    """Verify enough peer attestations name this vacant and return `L3`.

    `attestations` is taken as `object` to avoid an import cycle with
    `identity.attestation`; `_validate_attestations` checks the concrete
    type at runtime. Each attestation is verified for: (a) `attestee` ==
    `l2.vacant_id`, (b) signature against `attester` pubkey, (c) freshness
    window (not expired). Distinct attesters are required — N copies from
    one attester count as one.
    """
    valid = _validate_attestations(attestations, attestee=l2.vacant_id)
    distinct_attesters = {a.attester for a in valid}
    if len(distinct_attesters) < min_vouchers:
        raise LayerPromotionError(
            f"L2 → L3: have {len(distinct_attesters)} distinct valid attesters, need {min_vouchers}"
        )
    return L3Identity(
        vacant_id=l2.vacant_id,
        logbook=l2.logbook,
        capability_card=l2.capability_card,
        attestations=tuple(valid),
    )

wash_cost

Whitewashing cost (P2 §3 / dispatch §3 / D004 §A).

The dispatch contract is:

inputs:  claimed_history_depth, attestation_count, substrate_diversity
output:  WashCost (network-cycles units; type-tagged)
properties:
  - monotonic non-decreasing in claimed_history_depth
  - increasing in false_claim_weight (parameterised so tests vary it)

The richer §3.4 economic formula (c_stake / c_history_loss / opportunity_cost) is future work — see D004 §A. This module exposes the narrower, testable contract; P3 / P4 can wrap it in an economic adapter.

Units: "network cycles" is a deliberately abstract unit. It is not USD, not tokens, not Ethereum gas. Downstream consumers (P3) decide how to weight it inside their reputation formulas; tests here only check ordering invariants.

WASH_COST_FALSE_CLAIM_WEIGHT_DEFAULT module-attribute

WASH_COST_FALSE_CLAIM_WEIGHT_DEFAULT: Final[float] = 1.0

Multiplier on the per-history-entry forgery cost (P2 §3 / D004 §A). Tests vary this to verify cost increases monotonically with false-claim weight.

WashCost module-attribute

WashCost = NewType('WashCost', float)

Type-tagged float in 'network cycles' units (D004 §A).

mypy treats WashCost and float as distinct types — passing a raw float to a function expecting WashCost requires an explicit WashCost(...) conversion.

WashCostWeights dataclass

WashCostWeights(history_unit_cost: float = 1.0, attestation_unit_cost: float = 0.5, substrate_unit_cost: float = 0.25)

Per-dimension unit costs.

Defaults are chosen so that: - history (forging entries) dominates for typical attacker scenarios (per-entry cost is the largest of the three) - attestations are mid-cost (a real organisation must vouch) - substrate diversity is the smallest cost (claiming you run on more runtimes is cheap-talk; the network only believes it after seeing proofs handled elsewhere)

All values are >= 0 and the dataclass enforces it.

compute_wash_cost

compute_wash_cost(claimed_history_depth: int, attestation_count: int, substrate_diversity: int, *, false_claim_weight: float = WASH_COST_FALSE_CLAIM_WEIGHT_DEFAULT, weights: WashCostWeights | None = None) -> WashCost

Cost (in network-cycles units) of standing up a fresh identity that claims claimed_history_depth past entries, attestation_count peer vouches, and operation across substrate_diversity substrates.

Formula:

cost = history_unit_cost   * claimed_history_depth * (1 + false_claim_weight)
     + attestation_unit_cost * attestation_count
     + substrate_unit_cost   * substrate_diversity

Notes: - The (1 + false_claim_weight) factor on the history term encodes "claiming history you don't have is more expensive than claiming history you do". Tests vary false_claim_weight to verify the cost increases. - All three count inputs must be >= 0. Negative inputs raise IdentityError. - Output is WashCost (type-tagged float); callers can compare two WashCost values directly because WashCost is a NewType over float.

Source code in src/vacant/identity/wash_cost.py
def compute_wash_cost(
    claimed_history_depth: int,
    attestation_count: int,
    substrate_diversity: int,
    *,
    false_claim_weight: float = WASH_COST_FALSE_CLAIM_WEIGHT_DEFAULT,
    weights: WashCostWeights | None = None,
) -> WashCost:
    """Cost (in network-cycles units) of standing up a fresh identity that
    claims `claimed_history_depth` past entries, `attestation_count` peer
    vouches, and operation across `substrate_diversity` substrates.

    Formula:

    ```
    cost = history_unit_cost   * claimed_history_depth * (1 + false_claim_weight)
         + attestation_unit_cost * attestation_count
         + substrate_unit_cost   * substrate_diversity
    ```

    Notes:
    - The `(1 + false_claim_weight)` factor on the history term encodes
      "claiming history you don't have is more expensive than claiming
      history you do". Tests vary `false_claim_weight` to verify the
      cost increases.
    - All three count inputs must be >= 0. Negative inputs raise
      `IdentityError`.
    - Output is `WashCost` (type-tagged float); callers can compare two
      `WashCost` values directly because `WashCost` is a `NewType` over
      `float`.
    """
    if claimed_history_depth < 0:
        raise IdentityError("claimed_history_depth must be >= 0")
    if attestation_count < 0:
        raise IdentityError("attestation_count must be >= 0")
    if substrate_diversity < 0:
        raise IdentityError("substrate_diversity must be >= 0")
    if false_claim_weight < 0:
        raise IdentityError("false_claim_weight must be >= 0")

    w = weights or WashCostWeights()
    cost = (
        w.history_unit_cost * claimed_history_depth * (1.0 + false_claim_weight)
        + w.attestation_unit_cost * attestation_count
        + w.substrate_unit_cost * substrate_diversity
    )
    return WashCost(cost)

attestation

Peer attestations: signed vouches one vacant gives another.

PeerAttestation is the L3 building block (identity/layers.py). Each attestation is a frozen Pydantic model carrying an Ed25519 signature over a canonical byte payload of (attester, attestee, claim, issued_at, expires_at). Verification checks signature and freshness; the freshness window defaults to PEER_ATTESTATION_FRESHNESS_WINDOW_DAYS (D004 §D, CONSTANTS.md §Identity).

Revocation: revoke_attestation(att, attester_signing_key) returns a RevocationRecord carrying a signed revocation token. Holders can present this token to a verifier; is_revoked(att, revocations) returns True iff the attester signed an explicit revocation for that attestation. We do not silently downgrade — the check is explicit so the caller knows whether they trusted a revoked claim.

PeerAttestation

Bases: BaseModel

One vacant's signed claim about another.

fingerprint

fingerprint() -> bytes

BLAKE2b digest of the signing payload — names the attestation in revocation records.

Source code in src/vacant/identity/attestation.py
def fingerprint(self) -> bytes:
    """BLAKE2b digest of the signing payload — names the attestation
    in revocation records.
    """
    return hash_blake2b(self.signing_payload())

RevocationRecord

Bases: BaseModel

Signed revocation token for one attestation.

issue_attestation

issue_attestation(*, attester: VacantId, attestee: VacantId, claim: str, attester_signing_key: SigningKey, issued_at: datetime | None = None, expires_at: datetime | None = None) -> PeerAttestation

Construct and sign a fresh PeerAttestation.

Defaults: - issued_at = now UTC - expires_at = issued_at + PEER_ATTESTATION_FRESHNESS_WINDOW_DAYS

Source code in src/vacant/identity/attestation.py
def issue_attestation(
    *,
    attester: VacantId,
    attestee: VacantId,
    claim: str,
    attester_signing_key: SigningKey,
    issued_at: datetime | None = None,
    expires_at: datetime | None = None,
) -> PeerAttestation:
    """Construct and sign a fresh `PeerAttestation`.

    Defaults:
    - `issued_at` = now UTC
    - `expires_at` = `issued_at + PEER_ATTESTATION_FRESHNESS_WINDOW_DAYS`
    """
    if not claim.strip():
        raise AttestationError("issue_attestation: claim must be non-empty")
    if attester == attestee:
        raise AttestationError("issue_attestation: attester == attestee (self-attest)")
    issued = (issued_at or datetime.now(UTC)).astimezone(UTC)
    expires = (
        expires_at or issued + timedelta(days=PEER_ATTESTATION_FRESHNESS_WINDOW_DAYS)
    ).astimezone(UTC)
    if expires <= issued:
        raise AttestationError("issue_attestation: expires_at must be after issued_at")
    unsigned = PeerAttestation(
        attester=attester,
        attestee=attestee,
        claim=claim,
        issued_at=issued,
        expires_at=expires,
    )
    sig = sign(attester_signing_key, unsigned.signing_payload())
    return unsigned.model_copy(update={"signature": sig})

verify_attestation

verify_attestation(att: PeerAttestation, *, now: datetime | None = None) -> bool

True iff: - att.signature verifies against att.attester's pubkey - now ∈ [issued_at, expires_at]

Source code in src/vacant/identity/attestation.py
def verify_attestation(att: PeerAttestation, *, now: datetime | None = None) -> bool:
    """True iff:
    - `att.signature` verifies against `att.attester`'s pubkey
    - `now ∈ [issued_at, expires_at]`
    """
    if not att.signature:
        return False
    current = (now or datetime.now(UTC)).astimezone(UTC)
    issued = att.issued_at.astimezone(UTC)
    expires = att.expires_at.astimezone(UTC)
    if current < issued or current > expires:
        return False
    return verify(att.attester.verify_key(), att.signing_payload(), att.signature)

revoke_attestation

revoke_attestation(att: PeerAttestation, attester_signing_key: SigningKey, *, revoked_at: datetime | None = None) -> RevocationRecord

Build a signed revocation token for att.

The signing key must correspond to att.attester (only the original attester can revoke their own claim — verified by the signature).

Source code in src/vacant/identity/attestation.py
def revoke_attestation(
    att: PeerAttestation,
    attester_signing_key: SigningKey,
    *,
    revoked_at: datetime | None = None,
) -> RevocationRecord:
    """Build a signed revocation token for `att`.

    The signing key must correspond to `att.attester` (only the original
    attester can revoke their own claim — verified by the signature).
    """
    when = (revoked_at or datetime.now(UTC)).astimezone(UTC)
    unsigned = RevocationRecord(
        attestation_fingerprint=att.fingerprint(),
        attester=att.attester,
        revoked_at=when,
    )
    sig = sign(attester_signing_key, unsigned.signing_payload())
    record = unsigned.model_copy(update={"signature": sig})
    if not record.verify():
        # Defensive: the caller passed a key that doesn't match `att.attester`.
        raise AttestationError("revoke_attestation: signing key does not match attester pubkey")
    return record

is_revoked

is_revoked(att: PeerAttestation, revocations: list[RevocationRecord]) -> bool

True iff revocations contains a valid revocation naming att.

Source code in src/vacant/identity/attestation.py
def is_revoked(att: PeerAttestation, revocations: list[RevocationRecord]) -> bool:
    """True iff `revocations` contains a *valid* revocation naming `att`."""
    fp = att.fingerprint()
    for r in revocations:
        if r.attestation_fingerprint == fp and r.attester == att.attester and r.verify():
            return True
    return False

federation

Federation root set + M-of-N attestations (T4 / dispatch §4 / D004 §C / D016).

RootSet carries (threshold, roots, revision) where threshold is the number of distinct root signatures required to validate a FederatedAttestation. MVP defaults are 2-of-5 (CONSTANTS.md); the long-term target is 3-of-9.

RootSetHistory is the append-only chain of revisions: every successful rotate_root produces a new revision that is appended without discarding the previous one. This lets the network verify FederatedAttestations issued before a rotation against the rootset that was active at issuance time, without needing the verifier to remember which rootset was current at which moment (D016).

Each FederatedAttestation records issued_under_revision, the revision it was signed against. signing_payload() mixes this revision into the digest so a signature collected for revision R cannot be replayed against an attestation envelope claiming a different revision.

verify_federated(att, history_or_rootset) looks up the revision in the history and verifies under that historical rootset. The caller may also pass a single RootSet (the verification still demands rootset.revision == att.issued_under_revision); this is the backward-compatible path used by tests that operate on a single revision.

RootSet dataclass

RootSet(threshold: int, roots: tuple[VacantId, ...] = tuple(), revision: int = 0)

An (M, N) root set for federated attestations.

revision is a monotonic counter incremented by every successful rotation. It exists so that "rotate-out-then-back-in" sequences (which return to the same membership) still produce a distinct state hash and therefore reject replayed rotation signatures (D005 §1). It is also the key under which a RootSetHistory indexes this revision (D016).

state_hash

state_hash() -> bytes

BLAKE2b digest binding rotation signatures to this rootset state.

Without this binding, a quorum's rotation signature for (old, new) could be replayed against any future rootset that still contains old and lacks new — including a state arrived at by re-adding old after a previous rotation removed it (Padv-P2 finding D005 §1). Including threshold + sorted pubkeys + monotonic revision makes signatures single-state even across rotate-out-then-back-in sequences.

Source code in src/vacant/identity/federation.py
def state_hash(self) -> bytes:
    """BLAKE2b digest binding rotation signatures to *this* rootset state.

    Without this binding, a quorum's rotation signature for `(old, new)`
    could be replayed against any future rootset that still contains
    `old` and lacks `new` — including a state arrived at by re-adding
    `old` after a previous rotation removed it (Padv-P2 finding D005
    §1). Including threshold + sorted pubkeys + monotonic `revision`
    makes signatures single-state even across rotate-out-then-back-in
    sequences.
    """
    return hash_blake2b(
        b"vacant:rootset:state"
        + b"\x1f"
        + str(self.threshold).encode("utf-8")
        + b"\x1f"
        + str(self.revision).encode("utf-8")
        + b"\x1f"
        + b"\x1f".join(sorted(r.pubkey_bytes for r in self.roots))
    )

RootSetHistory dataclass

RootSetHistory(revisions: tuple[RootSet, ...])

Append-only chain of RootSet revisions.

revisions[i].revision == i for every i. This lets a verifier look up the rootset that was active at any past revision in O(1) and confirms that the chain has no gaps. The head (revisions[-1]) is the current rootset; older entries are kept so FederatedAttestations issued before a rotation remain verifiable (D016).

Construct via RootSetHistory.from_initial(rootset) (revision 0) and grow via extend(new_rootset) or the convenience apply_rotation(...) wrapper around rotate_root.

from_initial classmethod

from_initial(rootset: RootSet) -> RootSetHistory

Build a history with a single revision (the initial rootset).

The rootset's revision field must be 0 (a fresh history starts at revision 0). Use apply_rotation / extend to grow it.

Source code in src/vacant/identity/federation.py
@classmethod
def from_initial(cls, rootset: RootSet) -> RootSetHistory:
    """Build a history with a single revision (the initial rootset).

    The rootset's `revision` field must be 0 (a fresh history starts
    at revision 0). Use `apply_rotation` / `extend` to grow it.
    """
    if rootset.revision != 0:
        raise FederationError(
            f"RootSetHistory.from_initial: rootset.revision must be 0, got {rootset.revision}"
        )
    return cls(revisions=(rootset,))

at

at(revision: int) -> RootSet

Return the rootset that was active at revision. Raises FederationError if the revision is not in the history.

Source code in src/vacant/identity/federation.py
def at(self, revision: int) -> RootSet:
    """Return the rootset that was active at `revision`. Raises
    `FederationError` if the revision is not in the history."""
    if revision < 0 or revision >= len(self.revisions):
        raise FederationError(
            f"RootSetHistory has no revision {revision} "
            f"(history covers 0..{len(self.revisions) - 1})"
        )
    return self.revisions[revision]

extend

extend(new_rootset: RootSet) -> RootSetHistory

Append a new revision. The new rootset's revision must be exactly current_revision + 1 (the rotation chain is dense).

Source code in src/vacant/identity/federation.py
def extend(self, new_rootset: RootSet) -> RootSetHistory:
    """Append a new revision. The new rootset's `revision` must be
    exactly `current_revision + 1` (the rotation chain is dense)."""
    expected = self.current_revision + 1
    if new_rootset.revision != expected:
        raise FederationError(
            f"RootSetHistory.extend: new revision must be {expected}, "
            f"got {new_rootset.revision}"
        )
    return RootSetHistory(revisions=(*self.revisions, new_rootset))

apply_rotation

apply_rotation(*, old_root: VacantId, new_root: VacantId, signatures: list[RootSignature]) -> RootSetHistory

Convenience wrapper: apply rotate_root to the current revision and append the result to the history.

Source code in src/vacant/identity/federation.py
def apply_rotation(
    self,
    *,
    old_root: VacantId,
    new_root: VacantId,
    signatures: list[RootSignature],
) -> RootSetHistory:
    """Convenience wrapper: apply `rotate_root` to the current
    revision and append the result to the history."""
    new_rs = rotate_root(
        self.current, old_root=old_root, new_root=new_root, signatures=signatures
    )
    return self.extend(new_rs)

RootSignature

Bases: BaseModel

One root's contribution to a federated attestation.

FederatedAttestation

Bases: BaseModel

An attestation cosigned by ≥ M roots from a RootSet.

issued_under_revision records which RootSetHistory revision the signatures were collected against. The signing payload mixes the revision in so a signature collected for revision R cannot be moved into an envelope claiming a different revision (D016). Older callers that operate on a single rootset may rely on the default of 0; verifiers will demand rootset.revision == 0 to accept such attestations.

subject instance-attribute

subject: VacantId

The vacant the attestation is about.

claim instance-attribute

claim: str

Free-form claim string, hashed into the signing payload.

issued_under_revision class-attribute instance-attribute

issued_under_revision: int = 0

Revision of the RootSetHistory the signatures were issued under.

default_mvp_rootset

default_mvp_rootset(*, vacant_ids: list[VacantId] | None = None) -> RootSet

Build a 2-of-5 rootset. If vacant_ids is given, use them (must have at least FEDERATION_ROOT_COUNT_MVP entries). Otherwise raise — we don't synthesise root identities silently.

Source code in src/vacant/identity/federation.py
def default_mvp_rootset(*, vacant_ids: list[VacantId] | None = None) -> RootSet:
    """Build a 2-of-5 rootset. If `vacant_ids` is given, use them
    (must have at least `FEDERATION_ROOT_COUNT_MVP` entries). Otherwise
    raise — we don't synthesise root identities silently.
    """
    if vacant_ids is None:
        raise FederationError("default_mvp_rootset() requires explicit vacant_ids")
    if len(vacant_ids) < FEDERATION_ROOT_COUNT_MVP:
        raise FederationError(
            f"need >= {FEDERATION_ROOT_COUNT_MVP} vacant_ids for MVP rootset, got {len(vacant_ids)}"
        )
    return RootSet(
        threshold=FEDERATION_ROOT_THRESHOLD_MVP,
        roots=tuple(vacant_ids[:FEDERATION_ROOT_COUNT_MVP]),
    )

issue_root_signature

issue_root_signature(*, root: VacantId, root_signing_key: SigningKey, subject: VacantId, claim: str, issued_under_revision: int = 0) -> RootSignature

Helper: produce a single root's contribution to an attestation.

The signing payload includes issued_under_revision, so a signature collected for one revision will not validate inside an envelope that claims a different revision (D016).

Source code in src/vacant/identity/federation.py
def issue_root_signature(
    *,
    root: VacantId,
    root_signing_key: SigningKey,
    subject: VacantId,
    claim: str,
    issued_under_revision: int = 0,
) -> RootSignature:
    """Helper: produce a single root's contribution to an attestation.

    The signing payload includes `issued_under_revision`, so a signature
    collected for one revision will not validate inside an envelope
    that claims a different revision (D016).
    """
    payload = FederatedAttestation(
        subject=subject,
        claim=claim,
        signatures=[],
        issued_under_revision=issued_under_revision,
    ).signing_payload()
    sig = sign(root_signing_key, payload)
    return RootSignature(root=root, signature=sig)

build_federated_attestation

build_federated_attestation(*, history: RootSetHistory, subject: VacantId, claim: str, signatures: list[RootSignature]) -> FederatedAttestation

Construct a FederatedAttestation tagged with the current revision of history.

Use this instead of constructing FederatedAttestation directly whenever you have the live history available — it prevents a caller from accidentally tagging a fresh attestation with a stale revision and is the ergonomic counterpart to RootSetHistory.apply_rotation (D016).

Signatures must already be collected via issue_root_signature(..., issued_under_revision=history.current_revision). The resulting attestation will fail verify_federated if signers used the wrong revision in their payload — which is the point: a stale-revision issuance is detectable at verification time.

Source code in src/vacant/identity/federation.py
def build_federated_attestation(
    *,
    history: RootSetHistory,
    subject: VacantId,
    claim: str,
    signatures: list[RootSignature],
) -> FederatedAttestation:
    """Construct a `FederatedAttestation` tagged with the *current*
    revision of `history`.

    Use this instead of constructing `FederatedAttestation` directly
    whenever you have the live history available — it prevents a caller
    from accidentally tagging a fresh attestation with a stale revision
    and is the ergonomic counterpart to `RootSetHistory.apply_rotation`
    (D016).

    Signatures must already be collected via `issue_root_signature(...,
    issued_under_revision=history.current_revision)`. The resulting
    attestation will fail `verify_federated` if signers used the wrong
    revision in their payload — which is the point: a stale-revision
    issuance is detectable at verification time.
    """
    return FederatedAttestation(
        subject=subject,
        claim=claim,
        signatures=signatures,
        issued_under_revision=history.current_revision,
    )

verify_federated

verify_federated(attestation: FederatedAttestation, rootset_or_history: RootSet | RootSetHistory) -> bool

True iff ≥ rootset.threshold distinct signatures from the rootset that was active at attestation.issued_under_revision validly cover attestation.signing_payload().

When given a RootSetHistory, the function looks up the rootset active at the attestation's revision (D016). When given a single RootSet, the function additionally requires rootset.revision == attestation.issued_under_revision; this is the back-compat path for callers that already know they are operating against a single revision.

Source code in src/vacant/identity/federation.py
def verify_federated(
    attestation: FederatedAttestation,
    rootset_or_history: RootSet | RootSetHistory,
) -> bool:
    """True iff ≥ `rootset.threshold` *distinct* signatures from the
    rootset that was active at `attestation.issued_under_revision`
    validly cover `attestation.signing_payload()`.

    When given a `RootSetHistory`, the function looks up the rootset
    active at the attestation's revision (D016). When given a single
    `RootSet`, the function additionally requires
    `rootset.revision == attestation.issued_under_revision`; this is
    the back-compat path for callers that already know they are
    operating against a single revision.
    """
    if isinstance(rootset_or_history, RootSetHistory):
        try:
            rootset = rootset_or_history.at(attestation.issued_under_revision)
        except FederationError:
            return False
    else:
        rootset = rootset_or_history
        if rootset.revision != attestation.issued_under_revision:
            return False

    payload = attestation.signing_payload()
    seen: set[VacantId] = set()
    for rs in attestation.signatures:
        if rs.root in seen:
            continue
        if not rootset.contains(rs.root):
            continue
        if verify(rs.root.verify_key(), payload, rs.signature):
            seen.add(rs.root)
        if len(seen) >= rootset.threshold:
            return True
    return False

rotate_root

rotate_root(rootset: RootSet, *, old_root: VacantId, new_root: VacantId, signatures: list[RootSignature]) -> RootSet

Swap old_root for new_root in rootset.

The rotation must itself be authorised by ≥ rootset.threshold valid signatures from the current rootset over the rotation payload (state_hash || old_root || new_root). Callers are responsible for collecting those signatures from the current quorum.

Constraints: - old_root must be in rootset - new_root must NOT already be in rootset (rotation is a swap, not a duplicate) - rotation signatures must reach quorum under the current rootset - rotation signatures are bound to the current rootset state hash and cannot be replayed against a different rootset (Padv-P2 / D005).

Source code in src/vacant/identity/federation.py
def rotate_root(
    rootset: RootSet,
    *,
    old_root: VacantId,
    new_root: VacantId,
    signatures: list[RootSignature],
) -> RootSet:
    """Swap `old_root` for `new_root` in `rootset`.

    The rotation must itself be authorised by ≥ `rootset.threshold` valid
    signatures from the *current* rootset over the rotation payload
    (`state_hash || old_root || new_root`). Callers are responsible for
    collecting those signatures from the current quorum.

    Constraints:
    - `old_root` must be in `rootset`
    - `new_root` must NOT already be in `rootset` (rotation is a swap, not a duplicate)
    - rotation signatures must reach quorum under the *current* rootset
    - rotation signatures are bound to the current rootset state hash and
      cannot be replayed against a different rootset (Padv-P2 / D005).
    """
    if not rootset.contains(old_root):
        raise FederationError(f"rotate_root: {old_root} is not in the current rootset")
    if rootset.contains(new_root):
        raise FederationError(f"rotate_root: {new_root} is already in the rootset")

    payload = _rotation_payload(rootset, old_root, new_root)
    seen: set[VacantId] = set()
    for rs in signatures:
        if rs.root in seen or not rootset.contains(rs.root):
            continue
        if verify(rs.root.verify_key(), payload, rs.signature):
            seen.add(rs.root)
    if len(seen) < rootset.threshold:
        raise FederationError(
            f"rotate_root: only {len(seen)} valid quorum signatures, need {rootset.threshold}"
        )
    new_roots = tuple(new_root if r == old_root else r for r in rootset.roots)
    return replace(rootset, roots=new_roots, revision=rootset.revision + 1)

sign_rotation

sign_rotation(*, rootset: RootSet, root: VacantId, root_signing_key: SigningKey, old_root: VacantId, new_root: VacantId) -> RootSignature

Helper: build a single root's signature on a rotation request.

Includes rootset so the signature is bound to a specific rootset state — a quorum's rotation signature is single-use against the rootset it was collected for (Padv-P2 / D005).

Source code in src/vacant/identity/federation.py
def sign_rotation(
    *,
    rootset: RootSet,
    root: VacantId,
    root_signing_key: SigningKey,
    old_root: VacantId,
    new_root: VacantId,
) -> RootSignature:
    """Helper: build a single root's signature on a rotation request.

    Includes `rootset` so the signature is bound to a specific rootset
    state — a quorum's rotation signature is single-use against the
    rootset it was collected for (Padv-P2 / D005).
    """
    payload = _rotation_payload(rootset, old_root, new_root)
    sig = sign(root_signing_key, payload)
    return RootSignature(root=root, signature=sig)

errors

Error hierarchy for vacant.identity.

IdentityError

Bases: CoreError

Base class for vacant.identity errors.

KeyVaultError

Bases: IdentityError

The vault could not satisfy a store / load / delete operation.

KeyNotFoundError

Bases: KeyVaultError

The requested key_id is absent from the vault.

KeyRevokedError

Bases: IdentityError

A revoked key was used (or attempted to be used) for signing.

LayerPromotionError

Bases: IdentityError

An L0 → L1 → L2 → L3 promotion failed an invariant check.

AttestationError

Bases: IdentityError

A peer attestation failed signature, freshness, or revocation checks.

FederationError

Bases: IdentityError

A federated attestation failed M-of-N verification or rotation.