Skip to content

vacant.runtime

P1 runtime — 5-state lifecycle (LOCAL / ACTIVE / HIBERNATING / STALE / SUNK / ARCHIVED), heartbeat, shadow-self drift, and the D1–D5 self-replication paths. Path A (human-written vacant) is intentionally absent.

state_machine

Vacant lifecycle state machine.

Six states (VacantState from P0) by seven events (Event below): 42 cells. The transition table is an explicit dict[(state, event), state']. Cells absent from the table are invalid — applying them raises InvalidEventError rather than silently no-op'ing, because at this layer a misrouted event is a programming bug (the envelope-level checks in P6 §3.2 are responsible for rejecting requests before they reach this state machine).

Transitions are derived from: - architecture/components/P1_runtime.md §3.8 (state machine diagram) - architecture/decisions/D001_hibernation_and_stale_revival.md (warmup is collapsed into REVIVE_REQUESTED here; P1's full warmup ceremony is a later component concern, see follow-up note in PR description) - architecture/decisions/D003_p1_runtime_reconciliation.md (can_review semantics for STALE)

Predicates exposed for downstream use: - can_review(state) — used by P3 reputation - can_be_called(state) / is_runnable(state) — LOCAL is fully runnable (CLAUDE.md §LOCAL); the only difference vs ACTIVE is registry visibility - requires_revive(state) — STALE only

Sunk-heartbeat semantics (THEORY_V5 §4.2 / dispatch §1) are encoded in heartbeat_payload(state, ...) in runtime/heartbeat.py — the state machine itself doesn't know about payloads, just about transitions.

Event

Bases: StrEnum

Events the runtime feeds into the state machine.

TICK class-attribute instance-attribute

TICK = 'TICK'

Periodic housekeeping pulse.

HEARTBEAT class-attribute instance-attribute

HEARTBEAT = 'HEARTBEAT'

A heartbeat attestation is being emitted.

CALL_RECEIVED class-attribute instance-attribute

CALL_RECEIVED = 'CALL_RECEIVED'

An incoming A2A call is being accepted (passed §3.2 envelope checks).

REVIEW_RECEIVED class-attribute instance-attribute

REVIEW_RECEIVED = 'REVIEW_RECEIVED'

A peer/caller review landed in this vacant's logbook.

REVIVE_REQUESTED class-attribute instance-attribute

REVIVE_REQUESTED = 'REVIVE_REQUESTED'

Owner / scheduler requests transition out of HIBERNATING or STALE.

ARCHIVE_REQUESTED class-attribute instance-attribute

ARCHIVE_REQUESTED = 'ARCHIVE_REQUESTED'

Sunk vacant has aged past ARCHIVED_AFTER_SUNK_DAYS.

SPAWN_REQUESTED class-attribute instance-attribute

SPAWN_REQUESTED = 'SPAWN_REQUESTED'

A spawn (D1-D5) ceremony is being initiated by the parent.

VacantStateMachine

VacantStateMachine(initial: VacantState = LOCAL)

Stateful wrapper around the transition table.

Constructed with a starting state; apply(event) mutates state in place and returns the new state. Use peek(state, event) if you want to evaluate a transition without mutating.

Source code in src/vacant/runtime/state_machine.py
def __init__(self, initial: VacantState = VacantState.LOCAL) -> None:
    self._state = initial

transitions classmethod

transitions() -> dict[tuple[VacantState, Event], VacantState]

A copy of the transition table; useful for exhaustive tests.

Source code in src/vacant/runtime/state_machine.py
@classmethod
def transitions(cls) -> dict[tuple[VacantState, Event], VacantState]:
    """A copy of the transition table; useful for exhaustive tests."""
    return dict(_TRANSITIONS)

peek classmethod

peek(state: VacantState, event: Event) -> VacantState

Pure transition lookup. Raises InvalidEventError if undefined.

Source code in src/vacant/runtime/state_machine.py
@classmethod
def peek(cls, state: VacantState, event: Event) -> VacantState:
    """Pure transition lookup. Raises `InvalidEventError` if undefined."""
    try:
        return _TRANSITIONS[(state, event)]
    except KeyError as exc:
        raise InvalidEventError(
            f"event {event.value} is not valid in state {state.value}"
        ) from exc

apply

apply(event: Event) -> VacantState

Apply event, mutate self.state, return the new state.

Source code in src/vacant/runtime/state_machine.py
def apply(self, event: Event) -> VacantState:
    """Apply `event`, mutate `self.state`, return the new state."""
    new_state = self.peek(self._state, event)
    self._state = new_state
    return new_state

can_review

can_review(state: VacantState) -> bool

True iff a vacant in state may emit new peer/caller reviews.

False for STALE, SUNK, ARCHIVED (THEORY_V5 §4.1; D003 §A).

Source code in src/vacant/runtime/state_machine.py
def can_review(state: VacantState) -> bool:
    """True iff a vacant in `state` may emit new peer/caller reviews.

    False for STALE, SUNK, ARCHIVED (THEORY_V5 §4.1; D003 §A).
    """
    return state in _REVIEW_OK

can_be_called

can_be_called(state: VacantState) -> bool

True iff this vacant accepts new A2A calls.

Source code in src/vacant/runtime/state_machine.py
def can_be_called(state: VacantState) -> bool:
    """True iff this vacant accepts new A2A calls."""
    return state in _RUNNABLE

is_runnable

is_runnable(state: VacantState) -> bool

True iff the runtime should serve traffic. LOCAL is runnable (CLAUDE.md §LOCAL: registry visibility=none, but everything else works).

Source code in src/vacant/runtime/state_machine.py
def is_runnable(state: VacantState) -> bool:
    """True iff the runtime should serve traffic. LOCAL is runnable
    (CLAUDE.md §LOCAL: registry visibility=none, but everything else works).
    """
    return state in _RUNNABLE

requires_revive

requires_revive(state: VacantState) -> bool

True iff the vacant is frozen pending a REVIVE_REQUESTED event.

Source code in src/vacant/runtime/state_machine.py
def requires_revive(state: VacantState) -> bool:
    """True iff the vacant is frozen pending a REVIVE_REQUESTED event."""
    return state == _S.STALE

heartbeat

Heartbeat scheduler.

Per-state cadence and payload (dispatch §2 / THEORY_V5 §4.2 / D003):

State Period Payload
LOCAL HEARTBEAT_BASE_PERIOD_S {liveness: true}
ACTIVE HEARTBEAT_BASE_PERIOD_S {liveness: true}
HIBERNATING HEARTBEAT_HIBERNATING_PERIOD_S {liveness: "dormant", last_active: ts}
STALE HEARTBEAT_HIBERNATING_PERIOD_S {liveness: false, awaiting_revive: true}
SUNK HEARTBEAT_SUNK_LIVENESS_PERIOD_S {liveness: false, key_in_custody: true} ← load-bearing for lineage attribution (THEORY_V5 §4.2)
ARCHIVED scheduler does not run n/a

The SUNK payload is the load-bearing one: §4.2 explicitly notes that the sunk heartbeat is identity custody attestation, not liveness, so liveness=false and key_in_custody=true must both appear, and the entry kind is "HEARTBEAT_SUNK" so consumers can distinguish at a glance.

HeartbeatScheduler

HeartbeatScheduler(*, logbook: Logbook, signing_key: SigningKey, state_provider: Callable[[], VacantState], sleep: Callable[[float], Any] = sleep, clock: Callable[[], datetime] = lambda: now(UTC), last_active_provider: Callable[[], datetime | None] = lambda: None)

Async scheduler that signs and appends heartbeat entries to a logbook.

Construction is dependency-injected: state is read via state_provider (a callable, since it can change between ticks), the signing key is pinned at construction, and the cadence is derived from the current state on each tick (so a transition from ACTIVE → HIBERNATING immediately stretches the next sleep interval).

Source code in src/vacant/runtime/heartbeat.py
def __init__(
    self,
    *,
    logbook: Logbook,
    signing_key: SigningKey,
    state_provider: Callable[[], VacantState],
    sleep: Callable[[float], Any] = asyncio.sleep,
    clock: Callable[[], datetime] = lambda: datetime.now(UTC),
    last_active_provider: Callable[[], datetime | None] = lambda: None,
) -> None:
    self._logbook = logbook
    self._signing_key = signing_key
    self._state_provider = state_provider
    self._sleep = sleep
    self._clock = clock
    self._last_active_provider = last_active_provider
    self._tick_count = 0

tick async

tick(*, extra: dict[str, Any] | None = None) -> LogEntry

Append exactly one heartbeat entry for the current state.

Raises InvalidEventError if the current state is ARCHIVED.

Source code in src/vacant/runtime/heartbeat.py
async def tick(self, *, extra: dict[str, Any] | None = None) -> LogEntry:
    """Append exactly one heartbeat entry for the current state.

    Raises `InvalidEventError` if the current state is ARCHIVED.
    """
    state = self._state_provider()
    if state == VacantState.ARCHIVED:
        raise InvalidEventError("HeartbeatScheduler.tick() called in ARCHIVED")
    payload = heartbeat_payload(state, last_active=self._last_active_provider(), extra=extra)
    kind = heartbeat_kind(state)
    entry = self._logbook.append(kind, payload, self._signing_key, ts=self._clock())
    self._tick_count += 1
    return entry

run_until_archived async

run_until_archived(*, max_ticks: int | None = None) -> int

Drive tick() continuously, sleeping heartbeat_period_s(state) between ticks, and stop when state becomes ARCHIVED (or max_ticks is reached). Returns the number of ticks emitted.

Source code in src/vacant/runtime/heartbeat.py
async def run_until_archived(self, *, max_ticks: int | None = None) -> int:
    """Drive `tick()` continuously, sleeping `heartbeat_period_s(state)`
    between ticks, and stop when state becomes ARCHIVED (or `max_ticks`
    is reached). Returns the number of ticks emitted.
    """
    emitted = 0
    while True:
        state = self._state_provider()
        if state == VacantState.ARCHIVED:
            return emitted
        if max_ticks is not None and emitted >= max_ticks:
            return emitted
        await self.tick()
        emitted += 1
        next_state = self._state_provider()
        if next_state == VacantState.ARCHIVED:
            return emitted
        await self._sleep(heartbeat_period_s(next_state))

heartbeat_period_s

heartbeat_period_s(state: VacantState) -> int

Return the cadence (seconds) for state heartbeats.

Raises InvalidEventError for ARCHIVED — by spec, the scheduler does not run for archived vacants.

Source code in src/vacant/runtime/heartbeat.py
def heartbeat_period_s(state: VacantState) -> int:
    """Return the cadence (seconds) for `state` heartbeats.

    Raises `InvalidEventError` for ARCHIVED — by spec, the scheduler does
    not run for archived vacants.
    """
    match state:
        case VacantState.LOCAL | VacantState.ACTIVE:
            return HEARTBEAT_BASE_PERIOD_S
        case VacantState.HIBERNATING | VacantState.STALE:
            return HEARTBEAT_HIBERNATING_PERIOD_S
        case VacantState.SUNK:
            return HEARTBEAT_SUNK_LIVENESS_PERIOD_S
        case VacantState.ARCHIVED:
            raise InvalidEventError("scheduler does not run for ARCHIVED vacants")

heartbeat_kind

heartbeat_kind(state: VacantState) -> str

Log entry kind for the heartbeat. SUNK uses a distinct kind so downstream consumers can filter custody attestations from liveness pulses without re-inspecting the payload.

Source code in src/vacant/runtime/heartbeat.py
def heartbeat_kind(state: VacantState) -> str:
    """Log entry `kind` for the heartbeat. SUNK uses a distinct kind so
    downstream consumers can filter custody attestations from liveness pulses
    without re-inspecting the payload.
    """
    return HEARTBEAT_KIND_SUNK if state == VacantState.SUNK else HEARTBEAT_KIND_DEFAULT

heartbeat_payload

heartbeat_payload(state: VacantState, *, last_active: datetime | None = None, extra: dict[str, Any] | None = None) -> HeartbeatPayload

State-specific payload for the next heartbeat entry.

Source code in src/vacant/runtime/heartbeat.py
def heartbeat_payload(
    state: VacantState,
    *,
    last_active: datetime | None = None,
    extra: dict[str, Any] | None = None,
) -> HeartbeatPayload:
    """State-specific payload for the next heartbeat entry."""
    payload: HeartbeatPayload
    match state:
        case VacantState.LOCAL | VacantState.ACTIVE:
            payload = {"liveness": True}
        case VacantState.HIBERNATING:
            payload = {
                "liveness": "dormant",
                "last_active": (last_active or datetime.now(UTC)).isoformat(),
            }
        case VacantState.STALE:
            payload = {"liveness": False, "awaiting_revive": True}
        case VacantState.SUNK:
            payload = {"liveness": False, "key_in_custody": True}
        case VacantState.ARCHIVED:
            raise InvalidEventError("ARCHIVED vacants do not emit heartbeats")
    if extra:
        payload = {**payload, **extra}
    return payload

shadow_self

Shadow-self drift detection (P1 §3.4 stub).

P1 needs a behavioural fingerprint that downstream code (P3 honesty signal, P5 graduation gate) can call before the real STYLO Vec16 / PROBE embeddings land. This module provides:

  • compute_embedding(windows) — deterministic 16-dim float vector built from the BLAKE2b digest of N output windows. Pure stdlib, pure function.
  • AnchorDistribution — diagonal-covariance Gaussian over the embedding space (mean + per-dim std), enough to evaluate Mahalanobis-style drift without bringing in numpy. With diagonal covariance the Mahalanobis distance reduces to standardised Euclidean — a reasonable demo-scale approximation noted in architecture/research/T1_behavioral_fingerprint.md.
  • compute_drift(current, anchor) → float Mahalanobis-style distance.
  • is_drifting(drift, threshold=STYLO_DRIFT_THRESHOLD) → bool.
  • drift_log_entry(...) — convenience that writes a DRIFT_DETECTED log entry; no automatic state change (per dispatch §3, the policy layer decides what to do with the signal).

The real STYLO Vec16 lands with P3 (research/T1) and will replace compute_embedding; this module's API stays put.

EMBEDDING_DIM module-attribute

EMBEDDING_DIM = 16

STYLO Vec16 dimensionality (T1 research).

AnchorDistribution dataclass

AnchorDistribution(mean: tuple[float, ...], std: tuple[float, ...], min_std: float = 0.001)

Diagonal-covariance reference distribution over embedding space.

mean[i] and std[i] describe the historical distribution of feature i. Std values are floored to min_std to avoid division-by-zero on constant features (a known artefact of the demo-scale embedding).

from_history classmethod

from_history(history: Iterable[Sequence[float]]) -> AnchorDistribution

Build a diagonal Gaussian from an iterable of past embeddings.

Source code in src/vacant/runtime/shadow_self.py
@classmethod
def from_history(cls, history: Iterable[Sequence[float]]) -> AnchorDistribution:
    """Build a diagonal Gaussian from an iterable of past embeddings."""
    rows = [list(r) for r in history]
    if not rows:
        raise ValueError("AnchorDistribution.from_history: history is empty")
    dim = len(rows[0])
    if any(len(r) != dim for r in rows):
        raise ValueError("AnchorDistribution.from_history: ragged rows")
    n = len(rows)
    mean = [sum(col) / n for col in zip(*rows, strict=True)]
    if n == 1:
        std = [0.0] * dim
    else:
        std = [
            math.sqrt(sum((v - mean[i]) ** 2 for v in col) / (n - 1))
            for i, col in enumerate(zip(*rows, strict=True))
        ]
    return cls(mean=tuple(mean), std=tuple(std))

compute_embedding

compute_embedding(windows: Sequence[bytes]) -> list[float]

Hash-projection placeholder for STYLO Vec16.

Returns an EMBEDDING_DIM-dim float vector in [0, 1]^16. Empty input is treated as [0.0] * EMBEDDING_DIM.

Deterministic, pure, no LLM calls — suitable for unit tests and demo runs until P3 wires the real embedding.

Source code in src/vacant/runtime/shadow_self.py
def compute_embedding(windows: Sequence[bytes]) -> list[float]:
    """Hash-projection placeholder for STYLO Vec16.

    Returns an `EMBEDDING_DIM`-dim float vector in `[0, 1]^16`. Empty input
    is treated as `[0.0] * EMBEDDING_DIM`.

    Deterministic, pure, no LLM calls — suitable for unit tests and demo
    runs until P3 wires the real embedding.
    """
    h = hashlib.blake2b(digest_size=_EMBED_DIGEST_BYTES)
    for w in windows:
        h.update(len(w).to_bytes(4, "big"))
        h.update(w)
    return _bytes_to_unit_vec(h.digest())

compute_drift

compute_drift(current: Sequence[float], anchor: AnchorDistribution) -> float

Standardised-Euclidean distance (diagonal-Mahalanobis) from anchor.

Equivalent to sqrt(sum_i ((x_i - mean_i) / max(std_i, min_std))^2).

Source code in src/vacant/runtime/shadow_self.py
def compute_drift(current: Sequence[float], anchor: AnchorDistribution) -> float:
    """Standardised-Euclidean distance (diagonal-Mahalanobis) from `anchor`.

    Equivalent to `sqrt(sum_i ((x_i - mean_i) / max(std_i, min_std))^2)`.
    """
    if len(current) != len(anchor.mean):
        raise ValueError(
            f"compute_drift: vector dim {len(current)} != anchor dim {len(anchor.mean)}"
        )
    acc = 0.0
    for i, x in enumerate(current):
        sigma = max(anchor.std[i], anchor.min_std)
        z = (x - anchor.mean[i]) / sigma
        acc += z * z
    return math.sqrt(acc)

is_drifting

is_drifting(drift: float, threshold: float = STYLO_DRIFT_THRESHOLD) -> bool

True iff driftthreshold (default = STYLO_DRIFT_THRESHOLD).

Source code in src/vacant/runtime/shadow_self.py
def is_drifting(drift: float, threshold: float = STYLO_DRIFT_THRESHOLD) -> bool:
    """True iff `drift` ≥ `threshold` (default = `STYLO_DRIFT_THRESHOLD`)."""
    return drift >= threshold

drift_log_entry

drift_log_entry(*, logbook: Logbook, signing_key: SigningKey, drift: float, embedding: Sequence[float], threshold: float = STYLO_DRIFT_THRESHOLD) -> LogEntry

Append a DRIFT_DETECTED log entry. No automatic state change.

Source code in src/vacant/runtime/shadow_self.py
def drift_log_entry(
    *,
    logbook: Logbook,
    signing_key: SigningKey,
    drift: float,
    embedding: Sequence[float],
    threshold: float = STYLO_DRIFT_THRESHOLD,
) -> LogEntry:
    """Append a `DRIFT_DETECTED` log entry. No automatic state change."""
    payload = {
        "drift": drift,
        "threshold": threshold,
        "embedding": list(embedding),
        "above_threshold": is_drifting(drift, threshold),
    }
    return logbook.append(DRIFT_LOG_KIND, payload, signing_key)

spawn

Self-replication paths (D1-D5).

Each path: 1. generates a fresh Ed25519 keypair (no key derivation — keypairs are independent so a parent compromise does not give the controller the child's private key; cf. D003) 2. assembles a new BehaviorBundle / SubstrateSpec per the path 3. seeds the child's logbook with a BIRTH entry that names the parent 4. appends a SPAWN entry to the parent's logbook that names the child 5. returns the new ResidentForm with parent_id set

Path A (human-written vacant) is deprecated and intentionally absent (CLAUDE.md §Things to NOT do).

D4 lineage-merge requires an explicit ParentConsent from the secondary parent — a detached signature over the spawn intent. Without it the call raises ConsentError.

SpawnResult dataclass

SpawnResult(child: ResidentForm, child_signing_key: SigningKey, path: str)

Output of every spawn path.

path instance-attribute

path: str

One of D1..D5.

ParentConsent dataclass

ParentConsent(parent_id: VacantId, intent: str, signature: bytes)

Detached signature attesting that a parent agrees to a spawn.

signature is over consent_payload(parent_id, intent) — see consent(). Verified by verify_consent() inside spawn_lineage_merge.

consent

consent(parent_id: VacantId, parent_signing_key: SigningKey, intent: str) -> ParentConsent

Helper for tests + D4 callers: build a signed consent token.

Source code in src/vacant/runtime/spawn.py
def consent(parent_id: VacantId, parent_signing_key: SigningKey, intent: str) -> ParentConsent:
    """Helper for tests + D4 callers: build a signed consent token."""
    sig = sign(parent_signing_key, _consent_payload(parent_id, intent))
    return ParentConsent(parent_id=parent_id, intent=intent, signature=sig)

spawn_clone_with_mutation

spawn_clone_with_mutation(parent: ResidentForm, parent_signing_key: SigningKey, *, policy_mutation: str) -> SpawnResult

D1 — clone the parent's bundle, append policy_mutation to its DSL.

Same tool_whitelist, same system_prompt. Child starts in ACTIVE.

Source code in src/vacant/runtime/spawn.py
def spawn_clone_with_mutation(
    parent: ResidentForm,
    parent_signing_key: SigningKey,
    *,
    policy_mutation: str,
) -> SpawnResult:
    """D1 — clone the parent's bundle, append `policy_mutation` to its DSL.

    Same `tool_whitelist`, same `system_prompt`. Child starts in `ACTIVE`.
    """
    _ensure_parent_runnable(parent)
    if not policy_mutation.strip():
        raise SpawnError("D1 requires a non-empty policy_mutation")
    new_bundle = BehaviorBundle(
        system_prompt=parent.behavior_bundle.system_prompt,
        policy_dsl=(parent.behavior_bundle.policy_dsl + "\n" + policy_mutation).strip(),
        tool_whitelist=list(parent.behavior_bundle.tool_whitelist),
    )
    child, sk = _build_child(
        parent_id=parent.identity,
        behavior_bundle=new_bundle,
        substrate_spec=parent.substrate_spec,
        initial_state=VacantState.ACTIVE,
    )
    _seed_birth(
        child_logbook=child.logbook,
        child_signing_key=sk,
        parent_id=parent.identity,
        path="D1",
        extra={"policy_mutation": policy_mutation},
    )
    _record_spawn(
        parent_form=parent,
        parent_signing_key=parent_signing_key,
        child_id=child.identity,
        path="D1",
    )
    return SpawnResult(child=child, child_signing_key=sk, path="D1")

spawn_subagent_bud

spawn_subagent_bud(parent: ResidentForm, parent_signing_key: SigningKey, *, narrowed_tools: list[str]) -> SpawnResult

D2 — spawn a closed subagent (registry_visibility=none → LOCAL).

narrowed_tools must be a (possibly equal) subset of the parent's tool whitelist; child starts in LOCAL because composite-parent children are closed by default (P5 §5.1, CLAUDE.md §Closed children).

Source code in src/vacant/runtime/spawn.py
def spawn_subagent_bud(
    parent: ResidentForm,
    parent_signing_key: SigningKey,
    *,
    narrowed_tools: list[str],
) -> SpawnResult:
    """D2 — spawn a closed subagent (registry_visibility=none → LOCAL).

    `narrowed_tools` must be a (possibly equal) subset of the parent's
    tool whitelist; child starts in LOCAL because composite-parent
    children are closed by default (P5 §5.1, CLAUDE.md §Closed children).
    """
    _ensure_parent_runnable(parent)
    parent_tools = set(parent.behavior_bundle.tool_whitelist)
    extra_tools = set(narrowed_tools) - parent_tools
    if extra_tools:
        raise SpawnError(
            f"D2 narrowed_tools must be a subset of parent tools; extras: {sorted(extra_tools)}"
        )
    new_bundle = BehaviorBundle(
        system_prompt=parent.behavior_bundle.system_prompt,
        policy_dsl=parent.behavior_bundle.policy_dsl,
        tool_whitelist=list(narrowed_tools),
    )
    child, sk = _build_child(
        parent_id=parent.identity,
        behavior_bundle=new_bundle,
        substrate_spec=parent.substrate_spec,
        initial_state=VacantState.LOCAL,
    )
    _seed_birth(
        child_logbook=child.logbook,
        child_signing_key=sk,
        parent_id=parent.identity,
        path="D2",
        extra={"narrowed_tools": list(narrowed_tools)},
    )
    _record_spawn(
        parent_form=parent,
        parent_signing_key=parent_signing_key,
        child_id=child.identity,
        path="D2",
        extra={"closed_child": True},
    )
    return SpawnResult(child=child, child_signing_key=sk, path="D2")

spawn_capability_fork

spawn_capability_fork(parent: ResidentForm, parent_signing_key: SigningKey, *, new_capability_text: str, new_system_prompt: str) -> SpawnResult

D3 — fork into a different capability with a different system prompt.

Same substrate spec; child starts ACTIVE. new_capability_text is persisted on the BIRTH entry so downstream code can later mint a fresh CapabilityCard from it (P4 owns card publication).

Source code in src/vacant/runtime/spawn.py
def spawn_capability_fork(
    parent: ResidentForm,
    parent_signing_key: SigningKey,
    *,
    new_capability_text: str,
    new_system_prompt: str,
) -> SpawnResult:
    """D3 — fork into a different capability with a different system prompt.

    Same substrate spec; child starts ACTIVE. `new_capability_text` is
    persisted on the BIRTH entry so downstream code can later mint a fresh
    `CapabilityCard` from it (P4 owns card publication).
    """
    _ensure_parent_runnable(parent)
    if not new_capability_text.strip():
        raise SpawnError("D3 requires a non-empty new_capability_text")
    if not new_system_prompt.strip():
        raise SpawnError("D3 requires a non-empty new_system_prompt")
    new_bundle = BehaviorBundle(
        system_prompt=new_system_prompt,
        policy_dsl=parent.behavior_bundle.policy_dsl,
        tool_whitelist=list(parent.behavior_bundle.tool_whitelist),
    )
    child, sk = _build_child(
        parent_id=parent.identity,
        behavior_bundle=new_bundle,
        substrate_spec=parent.substrate_spec,
        initial_state=VacantState.ACTIVE,
    )
    _seed_birth(
        child_logbook=child.logbook,
        child_signing_key=sk,
        parent_id=parent.identity,
        path="D3",
        extra={"new_capability_text": new_capability_text},
    )
    _record_spawn(
        parent_form=parent,
        parent_signing_key=parent_signing_key,
        child_id=child.identity,
        path="D3",
    )
    return SpawnResult(child=child, child_signing_key=sk, path="D3")

spawn_lineage_merge

spawn_lineage_merge(parent_a: ResidentForm, parent_a_signing_key: SigningKey, parent_b: ResidentForm, parent_b_consent: ParentConsent, *, merged_system_prompt: str) -> SpawnResult

D4 — merge two parents' bundles. Requires parent_b's signed consent.

parent_a is the primary parent (the one running the ceremony). parent_b_consent must be a ParentConsent whose parent_id matches parent_b.identity and whose intent is _D4_INTENT. The secondary parent is recorded inside the BIRTH log entry payload (D003 §C); only the primary parent appears on child.parent_id.

Source code in src/vacant/runtime/spawn.py
def spawn_lineage_merge(
    parent_a: ResidentForm,
    parent_a_signing_key: SigningKey,
    parent_b: ResidentForm,
    parent_b_consent: ParentConsent,
    *,
    merged_system_prompt: str,
) -> SpawnResult:
    """D4 — merge two parents' bundles. Requires `parent_b`'s signed consent.

    `parent_a` is the *primary* parent (the one running the ceremony).
    `parent_b_consent` must be a `ParentConsent` whose `parent_id` matches
    `parent_b.identity` and whose `intent` is `_D4_INTENT`. The secondary
    parent is recorded inside the BIRTH log entry payload (D003 §C); only
    the primary parent appears on `child.parent_id`.
    """
    _ensure_parent_runnable(parent_a)
    _ensure_parent_runnable(parent_b)
    if parent_a.identity == parent_b.identity:
        raise SpawnError("D4 requires two distinct parents")
    if parent_b_consent.parent_id != parent_b.identity:
        raise ConsentError("D4 consent: parent_id does not match parent_b")
    _verify_consent(parent_b_consent, _D4_INTENT)

    merged_tools = sorted(
        set(parent_a.behavior_bundle.tool_whitelist) | set(parent_b.behavior_bundle.tool_whitelist)
    )
    merged_policy = "\n".join(
        s
        for s in (
            parent_a.behavior_bundle.policy_dsl,
            parent_b.behavior_bundle.policy_dsl,
        )
        if s
    )
    new_bundle = BehaviorBundle(
        system_prompt=merged_system_prompt,
        policy_dsl=merged_policy,
        tool_whitelist=merged_tools,
    )
    # Substrates: intersect allowed_substrates so the child can run on either parent's stack
    merged_substrates = sorted(
        set(parent_a.substrate_spec.allowed_substrates)
        & set(parent_b.substrate_spec.allowed_substrates)
    ) or list(parent_a.substrate_spec.allowed_substrates)
    new_substrate_spec = SubstrateSpec(
        allowed_substrates=merged_substrates,
        policy={**parent_a.substrate_spec.policy, **parent_b.substrate_spec.policy},
    )
    child, sk = _build_child(
        parent_id=parent_a.identity,
        behavior_bundle=new_bundle,
        substrate_spec=new_substrate_spec,
        initial_state=VacantState.ACTIVE,
    )
    _seed_birth(
        child_logbook=child.logbook,
        child_signing_key=sk,
        parent_id=parent_a.identity,
        path="D4",
        extra={
            "secondary_parent_id": parent_b.identity.hex(),
            "consent_signature": parent_b_consent.signature.hex(),
        },
    )
    _record_spawn(
        parent_form=parent_a,
        parent_signing_key=parent_a_signing_key,
        child_id=child.identity,
        path="D4",
        extra={"secondary_parent_id": parent_b.identity.hex()},
    )
    return SpawnResult(child=child, child_signing_key=sk, path="D4")
make_d4_consent(parent_b: ResidentForm, parent_b_signing_key: SigningKey) -> ParentConsent

Convenience: build a ParentConsent for the standard D4 intent.

Source code in src/vacant/runtime/spawn.py
def make_d4_consent(parent_b: ResidentForm, parent_b_signing_key: SigningKey) -> ParentConsent:
    """Convenience: build a ParentConsent for the standard D4 intent."""
    return consent(parent_b.identity, parent_b_signing_key, _D4_INTENT)

spawn_cross_substrate_respawn

spawn_cross_substrate_respawn(parent: ResidentForm, parent_signing_key: SigningKey, *, new_substrate_spec: SubstrateSpec) -> SpawnResult

D5 — same bundle, different substrate. Identity (keypair) is fresh, but the child carries the parent's prompt + policy + tools verbatim.

Source code in src/vacant/runtime/spawn.py
def spawn_cross_substrate_respawn(
    parent: ResidentForm,
    parent_signing_key: SigningKey,
    *,
    new_substrate_spec: SubstrateSpec,
) -> SpawnResult:
    """D5 — same bundle, different substrate. Identity (keypair) is fresh,
    but the child carries the parent's prompt + policy + tools verbatim.
    """
    _ensure_parent_runnable(parent)
    if not new_substrate_spec.allowed_substrates:
        raise SpawnError("D5 requires at least one allowed substrate in the new spec")
    if new_substrate_spec.allowed_substrates == parent.substrate_spec.allowed_substrates:
        raise SpawnError(
            "D5 requires a different substrate spec; new spec is identical to parent's"
        )
    new_bundle = BehaviorBundle(
        system_prompt=parent.behavior_bundle.system_prompt,
        policy_dsl=parent.behavior_bundle.policy_dsl,
        tool_whitelist=list(parent.behavior_bundle.tool_whitelist),
    )
    child, sk = _build_child(
        parent_id=parent.identity,
        behavior_bundle=new_bundle,
        substrate_spec=new_substrate_spec,
        initial_state=VacantState.ACTIVE,
    )
    _seed_birth(
        child_logbook=child.logbook,
        child_signing_key=sk,
        parent_id=parent.identity,
        path="D5",
        extra={
            "new_substrates": list(new_substrate_spec.allowed_substrates),
            "old_substrates": list(parent.substrate_spec.allowed_substrates),
        },
    )
    _record_spawn(
        parent_form=parent,
        parent_signing_key=parent_signing_key,
        child_id=child.identity,
        path="D5",
    )
    return SpawnResult(child=child, child_signing_key=sk, path="D5")

loop

Async lifecycle loop wiring state machine + heartbeat + store.

P1's RuntimeLoop is the minimal per-vacant runtime: it accepts Events, drives the state machine, persists logbook deltas through a LogbookStore, and (optionally) runs a HeartbeatScheduler in the background. Higher-level concerns — A2A endpoint, peer review pipeline, budget bookkeeping — are intentionally not in this loop; they belong to P3 / P4 / P6 and consume the events/logbook this loop produces.

RuntimeLoop

RuntimeLoop(*, form: ResidentForm, signing_key: SigningKey, store: LogbookStore, clock: Callable[[], datetime] = lambda: now(UTC))

Per-vacant lifecycle loop.

The loop is constructed around an existing ResidentForm; identity creation lives in P2. The signing_key is held in memory for as long as the loop runs (vacant.identity will later wrap this in a real custody boundary; see THEORY_V5 §0.1).

Source code in src/vacant/runtime/loop.py
def __init__(
    self,
    *,
    form: ResidentForm,
    signing_key: SigningKey,
    store: LogbookStore,
    clock: Callable[[], datetime] = lambda: datetime.now(UTC),
) -> None:
    self._form = form
    self._signing_key = signing_key
    self._store = store
    self._clock = clock
    self._sm = VacantStateMachine(form.runtime_state)
    self._last_active: datetime | None = (
        clock() if form.runtime_state in {VacantState.ACTIVE, VacantState.LOCAL} else None
    )

submit async

submit(event: Event) -> VacantState

Apply event, persist the resulting logbook, return the new state.

Source code in src/vacant/runtime/loop.py
async def submit(self, event: Event) -> VacantState:
    """Apply `event`, persist the resulting logbook, return the new state."""
    new_state = self._sm.apply(event)
    if new_state in {VacantState.ACTIVE, VacantState.LOCAL}:
        self._last_active = self._clock()
    self._form = self._form.model_copy(update={"runtime_state": new_state})
    await self._store.save(self._form.identity, self._form.logbook)
    return new_state

append_log async

append_log(kind: str, payload: dict[str, Any]) -> LogEntry

Append a free-form entry to the logbook and persist.

Source code in src/vacant/runtime/loop.py
async def append_log(self, kind: str, payload: dict[str, Any]) -> LogEntry:
    """Append a free-form entry to the logbook and persist."""
    entry = self._form.logbook.append(kind, payload, self._signing_key, ts=self._clock())
    await self._store.save(self._form.identity, self._form.logbook)
    return entry

heartbeat_scheduler

heartbeat_scheduler(*, sleep: Callable[[float], Any] = sleep) -> HeartbeatScheduler

Build a HeartbeatScheduler bound to this loop's logbook + state.

Source code in src/vacant/runtime/loop.py
def heartbeat_scheduler(
    self,
    *,
    sleep: Callable[[float], Any] = asyncio.sleep,
) -> HeartbeatScheduler:
    """Build a `HeartbeatScheduler` bound to this loop's logbook + state."""
    return HeartbeatScheduler(
        logbook=self._form.logbook,
        signing_key=self._signing_key,
        state_provider=lambda: self._sm.state,
        sleep=sleep,
        clock=self._clock,
        last_active_provider=lambda: self._last_active,
    )

store

Logbook persistence interface (P1) + in-memory impl.

P1's RuntimeLoop is wired through this LogbookStore Protocol so P4 can later swap in the SQLite-backed implementation without touching runtime code. The in-memory impl provided here is enough for unit + integration tests and the P7 demo dashboard.

LogbookStore

Bases: Protocol

Async key-value persistence over (VacantId, Logbook).

InMemoryLogbookStore

InMemoryLogbookStore()

Reference impl backed by a plain dict. Not thread-safe; intended for single-process tests and demos.

Source code in src/vacant/runtime/store.py
def __init__(self) -> None:
    self._data: dict[VacantId, Logbook] = {}

errors

Error hierarchy for vacant.runtime.

RuntimeError_

Bases: CoreError

Base class for vacant.runtime errors.

Suffix _ avoids shadowing the built-in RuntimeError.

InvalidEventError

Bases: RuntimeError_

An Event was applied to a VacantState that does not accept it.

Example: CALL_RECEIVED while in SUNK. The state machine does not silently no-op these — they signal a programming bug or a request that should have been rejected upstream (P6 envelope checks, §3.2).

SpawnError

Bases: RuntimeError_

A spawn (D1-D5) operation could not satisfy its preconditions.

ConsentError

Bases: SpawnError

A multi-parent spawn (D4) was missing or had an invalid parent consent.