Skip to content

vacant.reputation

P3 reputation — 5-dimensional Beta posterior per (vacant, substrate), UCB exploration with cold-start §3.6 mechanism, STYLO drift discount, portability factor, same-controller / same-substrate / same-stylo detection (cost-raising, never preventing).

posterior

Beta posterior + Beta5D -- five-dimensional reputation state.

Per P3 §3.1-§3.2 each reputation event updates a per-dimension Beta posterior with time-decayed prior weight. The per-dimension half-life is set in core.constants.DIM_HALF_LIFE_DAYS.

Update rule (§3.2):

gamma = exp(-ln(2) * Deltat / half_life_d)
alpha ← alpha0 + gamma * (alpha - alpha0)              # decay accumulated evidence, keep prior
beta ← beta0 + gamma * (beta - beta0)
n_eff ← gamma * n_eff
alpha ← alpha + w * s
beta ← beta + w * (1 - s)
n_eff ← n_eff + w

s ∈ [0, 1] is the signal's positive rate; w is the source-weighted evidence contribution.

Beta

Bases: BaseModel

Single-dimension Beta posterior with time-decay state.

alpha0 class-attribute instance-attribute

alpha0: float = 1.0

Prior alpha0 -- kept across decays so the prior never erodes.

beta0 class-attribute instance-attribute

beta0: float = 1.0

Prior beta0.

n_eff class-attribute instance-attribute

n_eff: float = 0.0

Effective sample size = alpha + beta - alpha0 - beta0 (post-decay). Tracks how much evidence the posterior carries beyond the prior.

last_update_ts class-attribute instance-attribute

last_update_ts: float = 0.0

Unix-epoch seconds. Used to compute decay against the current time.

decayed

decayed(*, now_ts: float | datetime) -> Beta

Return a new Beta with alpha, beta, n_eff decayed to now_ts.

The prior never decays -- only the accumulated evidence above the prior shrinks (P3 §3.2). Identity if now_ts <= last_update_ts.

Source code in src/vacant/reputation/posterior.py
def decayed(self, *, now_ts: float | datetime) -> Beta:
    """Return a *new* `Beta` with alpha, beta, n_eff decayed to `now_ts`.

    The prior never decays -- only the accumulated evidence above the
    prior shrinks (P3 §3.2). Identity if `now_ts <= last_update_ts`.
    """
    target = _ts_to_seconds(now_ts)
    dt = target - self.last_update_ts
    if dt <= 0:
        return self
    gamma = decay_factor(dt, self.half_life_days)
    new_alpha = self.alpha0 + gamma * (self.alpha - self.alpha0)
    new_beta = self.beta0 + gamma * (self.beta - self.beta0)
    new_n_eff = gamma * self.n_eff
    return self.model_copy(
        update={
            "alpha": new_alpha,
            "beta": new_beta,
            "n_eff": new_n_eff,
            "last_update_ts": target,
        }
    )

update

update(*, positive_weight: float, negative_weight: float, now_ts: float | datetime) -> Beta

Decay to now_ts then apply a weighted positive/negative pulse.

positive_weight and negative_weight are the w * s and w * (1 - s) terms from §3.2. Both must be >= 0; the caller has already factored s ∈ [0, 1] into them.

Source code in src/vacant/reputation/posterior.py
def update(
    self,
    *,
    positive_weight: float,
    negative_weight: float,
    now_ts: float | datetime,
) -> Beta:
    """Decay to `now_ts` then apply a weighted positive/negative pulse.

    `positive_weight` and `negative_weight` are the `w * s` and
    `w * (1 - s)` terms from §3.2. Both must be >= 0; the caller has
    already factored `s ∈ [0, 1]` into them.
    """
    if positive_weight < 0 or negative_weight < 0:
        raise InvalidSignalError(
            f"update weights must be >= 0; got pos={positive_weight}, neg={negative_weight}"
        )
    decayed = self.decayed(now_ts=now_ts)
    return decayed.model_copy(
        update={
            "alpha": decayed.alpha + positive_weight,
            "beta": decayed.beta + negative_weight,
            "n_eff": decayed.n_eff + positive_weight + negative_weight,
        }
    )

update_with_signal

update_with_signal(*, signal: float, weight: float, now_ts: float | datetime) -> Beta

Convenience wrapper: split (signal, weight) into pos/neg pulses.

signal is the positive rate ∈ [0, 1]; weight >= 0 is the source-weighted evidence contribution.

Source code in src/vacant/reputation/posterior.py
def update_with_signal(self, *, signal: float, weight: float, now_ts: float | datetime) -> Beta:
    """Convenience wrapper: split `(signal, weight)` into pos/neg pulses.

    `signal` is the positive rate ∈ [0, 1]; `weight` >= 0 is the
    source-weighted evidence contribution.
    """
    if not (0.0 <= signal <= 1.0):
        raise InvalidSignalError(f"signal must be in [0, 1], got {signal}")
    if weight < 0:
        raise InvalidSignalError(f"weight must be >= 0, got {weight}")
    return self.update(
        positive_weight=weight * signal,
        negative_weight=weight * (1.0 - signal),
        now_ts=now_ts,
    )

Beta5D

Bases: BaseModel

Five-dimensional reputation state for one (vacant, substrate) pair.

decay_factor

decay_factor(dt_seconds: float, half_life_days: float) -> float

Exponential half-life decay factor: exp(-ln(2) * Deltat_days / half_life_d).

Source code in src/vacant/reputation/posterior.py
def decay_factor(dt_seconds: float, half_life_days: float) -> float:
    """Exponential half-life decay factor: `exp(-ln(2) * Deltat_days / half_life_d)`."""
    if dt_seconds <= 0:
        return 1.0
    if half_life_days <= 0:
        return 0.0 if dt_seconds > 0 else 1.0
    days = dt_seconds / 86_400.0
    return math.exp(-math.log(2.0) * days / half_life_days)

five_d_with_priors

five_d_with_priors(*, now_ts: float | datetime = 0.0) -> Beta5D

Construct a Beta5D with the canonical base priors.

Cold-start adjustments (L1 attestation, stake, vouchers, sibling inheritance) are applied separately in cold_start.py.

Source code in src/vacant/reputation/posterior.py
def five_d_with_priors(*, now_ts: float | datetime = 0.0) -> Beta5D:
    """Construct a `Beta5D` with the canonical base priors.

    Cold-start adjustments (L1 attestation, stake, vouchers, sibling
    inheritance) are applied separately in `cold_start.py`.
    """
    target = _ts_to_seconds(now_ts)
    return Beta5D(
        factual=_make_dim_beta("factual", now_ts=target),
        logical=_make_dim_beta("logical", now_ts=target),
        relevance=_make_dim_beta("relevance", now_ts=target),
        honesty=_make_dim_beta("honesty", now_ts=target),
        adoption=_make_dim_beta("adoption", now_ts=target),
    )

aggregator

Reputation aggregator -- the public API surface that the registry queries.

Implements P4's vacant.registry.aggregation.ReputationOracle Protocol (via score(vacant_id, dimensions)), plus the richer dispatch §7 API:

  • get_reputation(vid, substrate) -> Beta5D
  • get_ranked(capability_query, n) -> list[(VacantId, score)]
  • record_review(reviewer, target, dimensions, substrate) -> None

record_review enforces the dispatch's reviewer-eligibility check: reviews from SUNK / ARCHIVED / STALE vacants are rejected at the API surface (P1 can_review), and reviews from suspected-collusion sets are downweighted via the same-* detector signals.

VacantContext dataclass

VacantContext(vacant_id: VacantId, base_model_family: str = 'unknown', state: VacantState = ACTIVE, capability_text: str = '', attestation_level: str = 'L0', stake_amount: float = 0.0)

Per-vacant metadata the aggregator tracks alongside its posterior.

Sourced from P4's vacant table when wired in. Held here so unit tests can run without a registry.

ReviewRecord dataclass

ReviewRecord(reviewer: VacantId, target: VacantId, dimensions: dict[str, float], substrate: str, source: str = 'peer_review', ts: float = (lambda: time())(), same_signals: tuple[SameDetectSignal, ...] = ())

One review event submitted to record_review.

Aggregator

Aggregator(contexts: dict[VacantId, VacantContext] | None = None, *, review_limit_per_target_24h: int | None = None, logbooks: dict[VacantId, Logbook] | None = None, signing_keys: dict[VacantId, SigningKey] | None = None)

In-memory reputation aggregator. Persists state externally via P4.

Construction: pass a registry of VacantContext keyed by VacantId. Tests typically build this fresh per-case; the demo dashboard constructs one and seeds from the registry's vacant table.

Source code in src/vacant/reputation/aggregator.py
def __init__(
    self,
    contexts: dict[VacantId, VacantContext] | None = None,
    *,
    review_limit_per_target_24h: int | None = None,
    logbooks: dict[VacantId, Logbook] | None = None,
    signing_keys: dict[VacantId, SigningKey] | None = None,
) -> None:
    self._contexts: dict[VacantId, VacantContext] = dict(contexts or {})
    # Audit trail (D015 §D). The aggregator runs in one of two modes:
    #
    #   audit-aware  — `_logbooks` is non-empty (constructor seeded
    #                  or `register_audit` was called). Every
    #                  `record_review` requires the reviewer to be
    #                  registered; missing registration raises
    #                  `MissingAuditKeyError` (Pfix3 B4 fail-closed).
    #                  README's "record_review first appends signed
    #                  REVIEW_EVENT" claim holds in this mode.
    #
    #   no-audit     — `_logbooks` is empty and stays empty. Used by
    #                  unit tests + offline tooling that don't care
    #                  about D015 §D. Mutation is permitted without
    #                  any audit append.
    #
    # The discriminant is `_audit_mode_active` below. Once a logbook
    # has been registered (either via constructor or `register_audit`)
    # the aggregator is locked into audit-aware mode for the rest of
    # its life — a partial fixture cannot bypass the gate.
    #
    # ONE-WAY LATCH: `_audit_mode_active` is intentionally never
    # cleared back to False. Manually flipping it (e.g., in a test)
    # silently re-introduces the silent-skip behaviour Pfix3 B4
    # explicitly removed; do not do that. If a future use case
    # genuinely needs to "drop audit mode" mid-life, build a fresh
    # Aggregator instead.
    self._logbooks: dict[VacantId, Logbook] = dict(logbooks or {})
    self._signing_keys: dict[VacantId, SigningKey] = dict(signing_keys or {})
    self._audit_mode_active: bool = bool(self._logbooks)
    # Per (vacant, substrate) Beta5D.
    self._posteriors: dict[tuple[VacantId, str], Beta5D] = {}
    # Per (reviewer, target) review-count for novelty discount.
    self._review_counts: dict[tuple[VacantId, VacantId], int] = {}
    # Per (reviewer, target) sliding-window review timestamps for the
    # per-(reviewer,target) rate limit. Spec P1 line 259: "每 24h 對同一
    # target_did 的 review 上限: 3" — reviewer-side spam cap, not absolute
    # cap. Padv-P3 D010 §1 sniping defense (single peer flooding one
    # target) is satisfied because it's the (reviewer,target) pair that's
    # capped; popular targets can still receive many reviews from many
    # distinct reviewers.
    self._target_review_timestamps: dict[tuple[VacantId, VacantId], deque[float]] = {}
    self._review_limit_per_target_24h = (
        review_limit_per_target_24h
        if review_limit_per_target_24h is not None
        else REVIEW_LIMIT_PER_TARGET_24H
    )
    self._lock = asyncio.Lock()

add_context

add_context(ctx: VacantContext) -> None

Register a vacant + its metadata.

Source code in src/vacant/reputation/aggregator.py
def add_context(self, ctx: VacantContext) -> None:
    """Register a vacant + its metadata."""
    self._contexts[ctx.vacant_id] = ctx

register_audit

register_audit(vid: VacantId, *, logbook: Logbook, signing_key: SigningKey) -> None

Attach a Logbook + SigningKey for vid. Registering ANY reviewer flips the aggregator into audit-aware mode for the rest of its lifetime: subsequent record_review calls require the reviewer to be registered, otherwise MissingAuditKeyError.

Source code in src/vacant/reputation/aggregator.py
def register_audit(self, vid: VacantId, *, logbook: Logbook, signing_key: SigningKey) -> None:
    """Attach a `Logbook` + `SigningKey` for `vid`. Registering ANY
    reviewer flips the aggregator into audit-aware mode for the rest
    of its lifetime: subsequent `record_review` calls require the
    reviewer to be registered, otherwise `MissingAuditKeyError`."""
    self._logbooks[vid] = logbook
    self._signing_keys[vid] = signing_key
    self._audit_mode_active = True

get_reputation async

get_reputation(vid: VacantId, substrate: str) -> Beta5D

Return the per-substrate Beta5D, building a cold-start prior if absent.

Source code in src/vacant/reputation/aggregator.py
async def get_reputation(self, vid: VacantId, substrate: str) -> Beta5D:
    """Return the per-substrate Beta5D, building a cold-start prior if absent."""
    key = (vid, substrate)
    async with self._lock:
        rep = self._posteriors.get(key)
        if rep is None:
            rep = five_d_with_priors(now_ts=time.time())
            self._posteriors[key] = rep
    return rep

get_ranked async

get_ranked(capability_query: str, n: int, *, substrate: str = 'default', weights: Mapping[str, float] | None = None) -> list[tuple[VacantId, float]]

UCB-scored top-N candidates for a capability query.

Filters to is_runnable vacants whose capability_text contains the query as a substring (cheap MVP search; P4's aggregation layer does the real index lookup).

Source code in src/vacant/reputation/aggregator.py
async def get_ranked(
    self,
    capability_query: str,
    n: int,
    *,
    substrate: str = "default",
    weights: Mapping[str, float] | None = None,
) -> list[tuple[VacantId, float]]:
    """UCB-scored top-N candidates for a capability query.

    Filters to `is_runnable` vacants whose `capability_text` contains
    the query as a substring (cheap MVP search; P4's aggregation
    layer does the real index lookup).
    """
    async with self._lock:
        n_global = max(1, len(self._contexts))
        scored: list[tuple[VacantId, float]] = []
        for vid, ctx in self._contexts.items():
            if ctx.state not in (VacantState.ACTIVE, VacantState.LOCAL):
                continue
            if capability_query and capability_query not in ctx.capability_text:
                continue
            rep = self._posteriors.get((vid, substrate))
            if rep is None:
                rep = five_d_with_priors(now_ts=time.time())
            score = ucb_call_score(
                rep,
                weights=weights,
                n_global=n_global,
                stake_amount=ctx.stake_amount,
                attestation_level=ctx.attestation_level,
            )
            scored.append((vid, score))
    scored.sort(key=lambda p: p[1], reverse=True)
    return scored[:n]

record_review async

record_review(reviewer: VacantId, target: VacantId, dimensions: Mapping[str, float], substrate: str, *, source: str = 'peer_review', same_signals: Sequence[SameDetectSignal] = (), ts: float | None = None) -> None

Apply a review to the target's posterior. Raises IneligibleReviewerError if reviewer's runtime state forbids new reviews (P1 §4.1) and InvalidDimensionError for unknown dims.

Source code in src/vacant/reputation/aggregator.py
async def record_review(
    self,
    reviewer: VacantId,
    target: VacantId,
    dimensions: Mapping[str, float],
    substrate: str,
    *,
    source: str = "peer_review",
    same_signals: Sequence[SameDetectSignal] = (),
    ts: float | None = None,
) -> None:
    """Apply a review to the target's posterior. Raises
    `IneligibleReviewerError` if reviewer's runtime state forbids
    new reviews (P1 §4.1) and `InvalidDimensionError` for unknown
    dims.
    """
    if reviewer not in self._contexts:
        raise InvalidSignalError(f"unknown reviewer {reviewer}")
    if target not in self._contexts:
        raise InvalidSignalError(f"unknown target {target}")
    if reviewer == target:
        raise InvalidSignalError("reviewer == target (self-review)")

    reviewer_ctx = self._contexts[reviewer]
    if not can_review(reviewer_ctx.state):
        raise IneligibleReviewerError(
            f"reviewer {reviewer} state {reviewer_ctx.state.value} cannot review"
        )

    if source not in SOURCE_BASE_WEIGHTS:
        raise InvalidSignalError(f"unknown source {source!r}")
    for d in dimensions:
        if d not in REPUTATION_DIMS:
            raise InvalidDimensionError(f"unknown dim {d!r}")
    for d, s in dimensions.items():
        if not (0.0 <= float(s) <= 1.0):
            raise InvalidSignalError(f"dim {d} signal must be in [0, 1]; got {s}")

    target_ctx = self._contexts[target]
    when = ts if ts is not None else time.time()

    # --- Audit gate (Pfix3 B4) -----------------------------------------
    # Once any reviewer is registered the aggregator is in audit-aware
    # mode and every record_review must have audit covered for the
    # reviewer. This makes the README claim ("first appends signed
    # REVIEW_EVENT") honest in production. Tests that don't construct
    # with logbooks stay in no-audit mode and skip the append.
    if self._audit_mode_active and not self._audit_enabled_for(reviewer):
        raise MissingAuditKeyError(
            f"reviewer {reviewer} has no logbook/signing_key registered "
            "but the aggregator is in audit-aware mode (D015 §D); "
            "call `register_audit(reviewer, logbook=..., signing_key=...)` "
            "before record_review"
        )

    # --- Atomic mutation under a single lock ---------------------------
    # Order: rate-limit → tentative window append → signed REVIEW_EVENT
    # append (rollback window on failure) → composition → posterior
    # update. Holding the lock for the whole sequence guarantees that
    # an exception at any step leaves the aggregator's observable
    # state (window, logbook, posterior) coherent.
    pair = (reviewer, target)
    async with self._lock:
        # L2: per-(reviewer, target) rate limit (Padv-P3 D010 §1).
        # Spec P1 line 259: "每 24h 對同一 target_did 的 review 上限: 3"
        window = self._target_review_timestamps.setdefault(pair, deque())
        cutoff = when - 86_400.0
        while window and window[0] <= cutoff:
            window.popleft()
        if len(window) >= self._review_limit_per_target_24h:
            raise ReviewRateLimitError(
                f"reviewer {reviewer} → target {target}: {len(window)} "
                f"reviews in past 24h (limit {self._review_limit_per_target_24h})"
            )
        window.append(when)

        # D015 §D audit append. Failure rolls back the window
        # timestamp so retry isn't penalised by a phantom slot.
        if self._audit_enabled_for(reviewer):
            try:
                self._append_signed_review_event(
                    reviewer=reviewer,
                    target=target,
                    dimensions=dimensions,
                    substrate=substrate,
                    source=source,
                    when=when,
                )
            except Exception:
                window.pop()  # remove the tentative timestamp we just appended
                raise

        # Weight composition (§3.4).
        base_weight = SOURCE_BASE_WEIGHTS[source]
        same_model_w = 1.0
        if reviewer_ctx.base_model_family == target_ctx.base_model_family:
            same_model_w *= SAME_BASE_MODEL_DISCOUNT

        # Novelty (§3.4.3).
        self._review_counts.setdefault(pair, 0)
        self._review_counts[pair] += 1
        k = self._review_counts[pair]
        novelty = 1.0 / (1.0 + NOVELTY_DECAY_COEFFICIENT * max(0, k - 1))
        if reviewer_ctx.base_model_family == target_ctx.base_model_family and k > 5:
            same_model_w = SAME_MODEL_HEAVY_DISCOUNT

        reviewer_rep = self._posteriors.get((reviewer, substrate))
        sig_discount = discount_from_signals(same_signals)
        composed = base_weight * same_model_w * novelty * sig_discount

        # Apply per-dim posterior update (§3.4.2 reviewer credibility).
        key = (target, substrate)
        rep = self._posteriors.get(key) or five_d_with_priors(now_ts=when)
        for d, s in dimensions.items():
            cred = REVIEWER_CREDIBILITY_FLOOR
            if reviewer_rep is not None:
                cred = (
                    REVIEWER_CREDIBILITY_FLOOR
                    + (1.0 - REVIEWER_CREDIBILITY_FLOOR) * reviewer_rep.get(d).mean
                )
            w = composed * cred
            rep = rep.update_dim(d, signal=float(s), weight=w, now_ts=when)
        self._posteriors[key] = rep

score async

score(vacant_id: str, dimensions: Sequence[str]) -> float

Implements vacant.registry.aggregation.ReputationOracle.score.

vacant_id is the hex form (P4's storage convention). We map it back to a VacantId against our context registry.

Source code in src/vacant/reputation/aggregator.py
async def score(self, vacant_id: str, dimensions: Sequence[str]) -> float:
    """Implements `vacant.registry.aggregation.ReputationOracle.score`.

    `vacant_id` is the hex form (P4's storage convention). We map it
    back to a `VacantId` against our context registry.
    """
    # Locate by hex.
    ctx = next(
        (c for c in self._contexts.values() if c.vacant_id.hex() == vacant_id),
        None,
    )
    if ctx is None:
        return 0.0
    async with self._lock:
        rep = self._posteriors.get((ctx.vacant_id, "default"))
    if rep is None:
        return 0.0
    if not dimensions:
        dims = REPUTATION_DIMS
    else:
        dims = tuple(d for d in dimensions if d in REPUTATION_DIMS)
        if not dims:
            return 0.0
    means = rep.means()
    return sum(means[d] for d in dims) / len(dims)

apply_drift_discount async

apply_drift_discount(vid: VacantId, *, substrate: str, discount: float) -> None

Apply a STYLO-distance discount to a (vacant, substrate) posterior. Called by P1 / shadow-self when drift is detected.

Source code in src/vacant/reputation/aggregator.py
async def apply_drift_discount(self, vid: VacantId, *, substrate: str, discount: float) -> None:
    """Apply a STYLO-distance discount to a (vacant, substrate)
    posterior. Called by P1 / shadow-self when drift is detected.
    """
    async with self._lock:
        key = (vid, substrate)
        rep = self._posteriors.get(key)
        if rep is None:
            return
        self._posteriors[key] = apply_discount_5d(rep, discount)

ucb

UCB scoring for vacant selection.

P3 §3.7 extends the standard UCB1 (DRF arXiv:2509.05764) to multi-dim + uncertainty-aware:

mu_w = Sum w_d * mu_d
sigma_w = √(Sum w_d^2 * sigma_d^2)              # treats dims as independent
n_w = harmonic_mean(n_eff_d for w_d > 0.05)
explore = c_explore * √(log N / max(n_w, 1))
score = mu_w + c * sigma_w + explore

Lineage and UCB scoring are decoupled (D015 §B). parent_id is caller- side metadata — useful for filtering ("show me descendants of root R") or sort tie-breaks — but the parent's posterior MUST NOT bleed into the child's UCB score. CLAUDE.md §Load-bearing theory decisions makes the lineage-not-individuals-evolve invariant load-bearing: individuals do not inherit reputation from their parent; new lineage members reset the clock.

lineage_prior_alpha is kept as a public helper for research callers that explicitly want a lineage-weighted Beta prior outside the UCB path (e.g. seeding a research probe). It is not used by ucb_score, call_score, or ucb_with_lineage_prior.

ucb_score

ucb_score(rep: Beta5D, *, weights: Mapping[str, float] | None = None, n_global: int = 1, c_base: float = UCB_C_BASE, c_explore: float = UCB_C_EXPLORE, significant_weight: float = 0.05) -> float

Multi-dim Bayesian UCB. P3 §3.7.

Source code in src/vacant/reputation/ucb.py
def ucb_score(
    rep: Beta5D,
    *,
    weights: Mapping[str, float] | None = None,
    n_global: int = 1,
    c_base: float = UCB_C_BASE,
    c_explore: float = UCB_C_EXPLORE,
    significant_weight: float = 0.05,
) -> float:
    """Multi-dim Bayesian UCB. P3 §3.7."""
    w = _normalise_weights(weights or {d: 0.2 for d in REPUTATION_DIMS})
    means = rep.means()
    vars_ = rep.variances()
    n_effs = rep.n_effs()

    mu_w = sum(w[d] * means[d] for d in REPUTATION_DIMS)
    sigma_w_sq = sum((w[d] ** 2) * vars_[d] for d in REPUTATION_DIMS)
    sigma_w = math.sqrt(sigma_w_sq)

    significant_dims = [d for d in REPUTATION_DIMS if w[d] > significant_weight]
    if not significant_dims:
        n_w = 1.0
    else:
        # `harmonic_mean` rejects 0 inputs -- clamp to a tiny positive floor so
        # cold-start (n_eff=0) doesn't hit ZeroDivisionError.
        clamped = [max(n_effs[d], 1e-6) for d in significant_dims]
        n_w = float(harmonic_mean(clamped))

    log_n = math.log(max(n_global, 2))  # log(1) is 0; UCB needs >= log(2)
    # Floor at 1e-3 (not 1.0) so cold-start vacants (n_eff ~ 0) get a
    # genuinely larger explore term than warmed-up vacants with n_eff < 1.
    # A 1.0 floor would lump all sub-1 n_eff values into the same bucket
    # and kill cold-start exploration differentiation.
    explore = c_explore * math.sqrt(log_n / max(n_w, 1e-3))

    return mu_w + c_base * sigma_w + explore

lineage_prior_alpha

lineage_prior_alpha(*, base_alpha: float, base_beta: float, parent_alpha: float, parent_beta: float, depth: int, inherit_fraction: float = 0.25, decay_lambda: float = 0.5) -> tuple[float, float]

Lineage prior shaping (§4.3): blend parent posterior into child prior.

kappa(d) = inherit_fraction * exp(-decay_lambda * d) shrinks with lineage depth so a long fork chain doesn't trivially inherit root. Returns the blended (alpha, beta) for the child's prior.

Source code in src/vacant/reputation/ucb.py
def lineage_prior_alpha(
    *,
    base_alpha: float,
    base_beta: float,
    parent_alpha: float,
    parent_beta: float,
    depth: int,
    inherit_fraction: float = 0.25,
    decay_lambda: float = 0.5,
) -> tuple[float, float]:
    """Lineage prior shaping (§4.3): blend parent posterior into child prior.

    `kappa(d) = inherit_fraction * exp(-decay_lambda * d)` shrinks with
    lineage depth so a long fork chain doesn't trivially inherit root.
    Returns the blended `(alpha, beta)` for the child's prior.
    """
    if depth < 0:
        raise ValueError(f"lineage depth must be >= 0; got {depth}")
    if not (0.0 <= inherit_fraction <= 1.0):
        raise ValueError(f"inherit_fraction must be in [0, 1]; got {inherit_fraction}")
    if decay_lambda < 0:
        raise ValueError(f"decay_lambda must be >= 0; got {decay_lambda}")
    kappa = inherit_fraction * math.exp(-decay_lambda * depth)
    return (base_alpha + kappa * parent_alpha, base_beta + kappa * parent_beta)

ucb_with_lineage_prior

ucb_with_lineage_prior(child_beta: Beta, parent_beta: Beta | None = None, *, n_global: int, depth: int = 0, c_explore: float = UCB_C_EXPLORE, inherit_fraction: float = 0.25, decay_lambda: float = 0.5) -> float

Single-dim UCB on a child posterior. Parent posterior is ignored.

D015 §B: lineage is caller-side metadata; individual vacants do not inherit reputation from their parent (CLAUDE.md §Load-bearing theory decisions). The parameters parent_beta, depth, inherit_fraction, decay_lambda are accepted for back-compatible call sites and for future caller-side filtering (depth may still be used to sort descendants), but they have no effect on the score.

Use lineage_prior_alpha(...) directly if you genuinely need a lineage-weighted Beta prior outside the UCB pipeline.

Source code in src/vacant/reputation/ucb.py
def ucb_with_lineage_prior(
    child_beta: Beta,
    parent_beta: Beta | None = None,
    *,
    n_global: int,
    depth: int = 0,
    c_explore: float = UCB_C_EXPLORE,
    inherit_fraction: float = 0.25,
    decay_lambda: float = 0.5,
) -> float:
    """Single-dim UCB on a *child* posterior. Parent posterior is ignored.

    D015 §B: lineage is caller-side metadata; individual vacants do not
    inherit reputation from their parent (CLAUDE.md §Load-bearing theory
    decisions). The parameters `parent_beta`, `depth`, `inherit_fraction`,
    `decay_lambda` are accepted for back-compatible call sites and for
    future caller-side filtering (`depth` may still be used to sort
    descendants), but they have no effect on the score.

    Use `lineage_prior_alpha(...)` directly if you genuinely need a
    lineage-weighted Beta prior outside the UCB pipeline.
    """
    _ = (parent_beta, depth, inherit_fraction, decay_lambda)  # documented no-ops
    s = child_beta.alpha + child_beta.beta
    mean = child_beta.alpha / s if s > 0 else 0.0
    n_w = max(child_beta.n_eff, 1e-6)
    log_n = math.log(max(n_global, 2))
    # Floor at 1e-3 (not 1.0) so cold-start vacants (n_eff ~ 0) get a
    # genuinely larger explore term than warmed-up vacants with n_eff < 1.
    explore = c_explore * math.sqrt(log_n / max(n_w, 1e-3))
    return mean + explore

exploration_boost

exploration_boost(*, n_eff: float, n_min: int = N_MIN_FOR_STABLE_SCORE, n_global: int, c_explore: float = UCB_C_EXPLORE) -> float

Cold-start exploration bonus (§3.8 stage 2): boost UCB explore term while n_eff < n_min. Boost is 1 + (n_min - n_eff) / n_min, multiplied into the explore term.

Source code in src/vacant/reputation/ucb.py
def exploration_boost(
    *,
    n_eff: float,
    n_min: int = N_MIN_FOR_STABLE_SCORE,
    n_global: int,
    c_explore: float = UCB_C_EXPLORE,
) -> float:
    """Cold-start exploration bonus (§3.8 stage 2): boost UCB explore term
    while `n_eff < n_min`. Boost is `1 + (n_min - n_eff) / n_min`,
    multiplied into the explore term.
    """
    if n_eff >= n_min:
        return 0.0
    log_n = math.log(max(n_global, 2))
    boost = 1.0 + (n_min - n_eff) / n_min
    return c_explore * math.sqrt(log_n / max(n_eff, 1.0)) * boost

cold_start_floor

cold_start_floor(attestation_level: str) -> float

UCB lower bound by attestation level (§3.7 line 338).

Source code in src/vacant/reputation/ucb.py
def cold_start_floor(attestation_level: str) -> float:
    """UCB lower bound by attestation level (§3.7 line 338)."""
    return COLD_START_FLOORS_BY_LEVEL.get(attestation_level, 0.0)

call_score

call_score(rep: Beta5D, *, weights: Mapping[str, float] | None = None, n_global: int, stake_amount: float = 0.0, attestation_level: str = 'L0', portability_bonus: float = 0.0, c_base: float = UCB_C_BASE, c_explore: float = UCB_C_EXPLORE) -> float

Full call scoring (§3.7 + §3.8):

score = ucb(...) + stake_bonus + att_floor + portability_bonus

  • stake_bonus = 0.1 * log(1 + stake / S_REF) -- affects exploration tolerance, not the mean (anti-Goodhart-against-stake §3.7).
  • att_floor from cold_start_floor(attestation_level).
  • portability_bonus is supplied by portability.py (capped).
Source code in src/vacant/reputation/ucb.py
def call_score(
    rep: Beta5D,
    *,
    weights: Mapping[str, float] | None = None,
    n_global: int,
    stake_amount: float = 0.0,
    attestation_level: str = "L0",
    portability_bonus: float = 0.0,
    c_base: float = UCB_C_BASE,
    c_explore: float = UCB_C_EXPLORE,
) -> float:
    """Full call scoring (§3.7 + §3.8):

    `score = ucb(...) + stake_bonus + att_floor + portability_bonus`

    * `stake_bonus = 0.1 * log(1 + stake / S_REF)` -- affects exploration
      tolerance, not the mean (anti-Goodhart-against-stake §3.7).
    * `att_floor` from `cold_start_floor(attestation_level)`.
    * `portability_bonus` is supplied by `portability.py` (capped).
    """
    if stake_amount < 0:
        raise ValueError(f"stake_amount must be >= 0; got {stake_amount}")
    if portability_bonus < 0:
        raise ValueError(f"portability_bonus must be >= 0; got {portability_bonus}")
    base = ucb_score(
        rep,
        weights=weights,
        n_global=n_global,
        c_base=c_base,
        c_explore=c_explore,
    )
    stake_bonus = 0.1 * math.log(1.0 + stake_amount / S_REF_USDC)
    return base + stake_bonus + cold_start_floor(attestation_level) + portability_bonus

cold_start

Cold-start mechanism (P3 §3.8 + dispatch §4).

Five components, per dispatch:

  1. UCB exploration -- implemented in ucb.py (exploration_boost).
  2. Birth-path startup signals -- birth_path_bonus enum table.
  3. Niche uniqueness bonus -- niche_bonus from capability-supply count.
  4. Low-stakes probes -- is_eligible_for_low_stakes_probe policy hook.
  5. Idle peer review -- should_idle_review_target policy hook.

Stage 1 (initial prior, §3.8) is initial_prior(...): takes attestation level / stake / vouchers / sibling and returns a Beta5D.

BirthPath

Bases: StrEnum

Birth-path enumeration (THEORY_V5 §3.6).

  • PATH_ZERO: human-built infrastructure (one-time).
  • PATH_B: subagent graduation (legacy; permanent transitional path).
  • PATH_C: client-mediated spawn (transitional; folds into D5 long-term).
  • D1..D5: agent self-replication paths (THEORY_V5 §3.6 D-series).

ColdStartCaveats dataclass

ColdStartCaveats(insufficient_data: bool, n_eff_min: float, partial_dims: tuple[str, ...])

Caveats accompanying a reputation read for new vacants.

partial_dims instance-attribute

partial_dims: tuple[str, ...]

Dims whose n_eff is below N_SHOW.

InsufficientDataLabel dataclass

InsufficientDataLabel(show_scalar: bool, label: str, caveats: ColdStartCaveats)

Surface-level marker returned to UI clients when n_eff is low.

birth_path_bonus

birth_path_bonus(path: BirthPath) -> tuple[float, float]

Return (alpha_boost for F/L/R each, alpha_boost for H) per birth path.

D-series paths return small boosts because lineage prior shaping (see ucb.lineage_prior_alpha) carries the parent-reputation signal.

Source code in src/vacant/reputation/cold_start.py
def birth_path_bonus(path: BirthPath) -> tuple[float, float]:
    """Return `(alpha_boost for F/L/R each, alpha_boost for H)` per birth path.

    D-series paths return small boosts because lineage prior shaping
    (see `ucb.lineage_prior_alpha`) carries the parent-reputation signal.
    """
    return _BIRTH_PATH_BOOSTS[path]

initial_prior

initial_prior(*, attestation_level: str = 'L0', stake_amount: float = 0.0, n_l1_plus_vouchers: int = 0, sibling: Beta5D | None = None, birth_path: BirthPath | None = None, now_ts: float = 0.0) -> Beta5D

Build a cold-start Beta5D per P3 §3.8 stage 1.

Composition order:

  1. Base priors (CONSTANTS.md §Reputation, see D008 §A).
  2. L1 attestation: +L1_ATTESTATION_ALPHA_BOOST to F/L/R alpha (skipped for L0).
  3. Stake bonus: min(2.0, log(1 + stake/S_REF)) split half-each across F/L/R.
  4. L3 vouches: +L3_VOUCH_ALPHA_BOOST * n_l1_plus_vouchers to H alpha.
  5. Sibling inheritance under same owner: alpha/4, beta/4 blended in (capped, decayed evidence).
  6. Birth-path boost: per birth_path_bonus table.
Source code in src/vacant/reputation/cold_start.py
def initial_prior(
    *,
    attestation_level: str = "L0",
    stake_amount: float = 0.0,
    n_l1_plus_vouchers: int = 0,
    sibling: Beta5D | None = None,
    birth_path: BirthPath | None = None,
    now_ts: float = 0.0,
) -> Beta5D:
    """Build a cold-start `Beta5D` per P3 §3.8 stage 1.

    Composition order:

    1. Base priors (CONSTANTS.md §Reputation, see D008 §A).
    2. L1 attestation: +`L1_ATTESTATION_ALPHA_BOOST` to F/L/R alpha
       (skipped for L0).
    3. Stake bonus: `min(2.0, log(1 + stake/S_REF))` split half-each
       across F/L/R.
    4. L3 vouches: `+L3_VOUCH_ALPHA_BOOST * n_l1_plus_vouchers` to H alpha.
    5. Sibling inheritance under same owner: `alpha/4`, `beta/4` blended in
       (capped, decayed evidence).
    6. Birth-path boost: per `birth_path_bonus` table.
    """
    rep = five_d_with_priors(now_ts=now_ts)

    # 2. L1 attestation
    if attestation_level in ("L1", "L2", "L3"):
        for d in ("factual", "logical", "relevance"):
            beta = rep.get(d)
            rep = rep.with_dim(
                d,
                beta.model_copy(
                    update={
                        "alpha": beta.alpha + L1_ATTESTATION_ALPHA_BOOST,
                        "alpha0": beta.alpha0 + L1_ATTESTATION_ALPHA_BOOST,
                    }
                ),
            )

    # 3. Stake
    if stake_amount > 0:
        bonus = min(2.0, math.log(1.0 + stake_amount / S_REF_USDC))
        per_dim = bonus / 2.0
        for d in ("factual", "logical", "relevance"):
            beta = rep.get(d)
            rep = rep.with_dim(
                d,
                beta.model_copy(
                    update={
                        "alpha": beta.alpha + per_dim,
                        "alpha0": beta.alpha0 + per_dim,
                    }
                ),
            )

    # 4. L3 vouches → H alpha
    if n_l1_plus_vouchers > 0:
        boost = L3_VOUCH_ALPHA_BOOST * n_l1_plus_vouchers
        h = rep.honesty
        rep = rep.with_dim(
            "honesty",
            h.model_copy(update={"alpha": h.alpha + boost, "alpha0": h.alpha0 + boost}),
        )

    # 5. Sibling inheritance (capped via /4)
    if sibling is not None:
        new_dims: dict[str, Beta] = {}
        for d in REPUTATION_DIMS:
            child = rep.get(d)
            sib = sibling.get(d)
            inherited_alpha = sib.alpha / 4.0
            inherited_beta = sib.beta / 4.0
            new_dims[d] = child.model_copy(
                update={
                    "alpha": child.alpha + inherited_alpha,
                    "beta": child.beta + inherited_beta,
                    # The inherited evidence is decayed-state, so it
                    # contributes to n_eff (not the prior).
                    "n_eff": child.n_eff + inherited_alpha + inherited_beta,
                }
            )
        rep = Beta5D(**new_dims)

    # 6. Birth-path boost
    if birth_path is not None:
        flr_boost, h_boost = birth_path_bonus(birth_path)
        if flr_boost > 0:
            for d in ("factual", "logical", "relevance"):
                beta = rep.get(d)
                rep = rep.with_dim(
                    d,
                    beta.model_copy(
                        update={
                            "alpha": beta.alpha + flr_boost,
                            "alpha0": beta.alpha0 + flr_boost,
                        }
                    ),
                )
        if h_boost > 0:
            beta = rep.honesty
            rep = rep.with_dim(
                "honesty",
                beta.model_copy(
                    update={
                        "alpha": beta.alpha + h_boost,
                        "alpha0": beta.alpha0 + h_boost,
                    }
                ),
            )

    return rep

niche_bonus

niche_bonus(*, capability_supply: int, saturation_supply: int = 10, max_bonus: float = 0.1) -> float

Niche uniqueness bonus: rarer capability → larger bonus.

capability_supply is the count of vacants currently offering this capability. The bonus is max_bonus * (1 - supply / saturation), floored at 0.

Intuition: a niche with 1 supplier should get the full max_bonus; once 10+ vacants compete on the same capability, the bonus is 0.

Source code in src/vacant/reputation/cold_start.py
def niche_bonus(
    *,
    capability_supply: int,
    saturation_supply: int = 10,
    max_bonus: float = 0.10,
) -> float:
    """Niche uniqueness bonus: rarer capability → larger bonus.

    `capability_supply` is the count of vacants currently offering this
    capability. The bonus is `max_bonus * (1 - supply / saturation)`,
    floored at 0.

    Intuition: a niche with 1 supplier should get the full `max_bonus`;
    once 10+ vacants compete on the same capability, the bonus is 0.
    """
    if capability_supply < 0:
        raise ValueError(f"capability_supply must be >= 0; got {capability_supply}")
    if saturation_supply <= 0:
        raise ValueError(f"saturation_supply must be > 0; got {saturation_supply}")
    if max_bonus < 0:
        raise ValueError(f"max_bonus must be >= 0; got {max_bonus}")
    if capability_supply >= saturation_supply:
        return 0.0
    fraction_filled = capability_supply / saturation_supply
    return max_bonus * (1.0 - fraction_filled)

is_eligible_for_low_stakes_probe

is_eligible_for_low_stakes_probe(rep: Beta5D, *, n_min: int = N_MIN_FOR_STABLE_SCORE) -> bool

Policy hook: caller-side proxy routes a small fraction of low-stakes requests to vacants that have not yet crossed n_min. Reactivated by P4 / P7 caller routing.

Source code in src/vacant/reputation/cold_start.py
def is_eligible_for_low_stakes_probe(rep: Beta5D, *, n_min: int = N_MIN_FOR_STABLE_SCORE) -> bool:
    """Policy hook: caller-side proxy routes a small fraction of low-stakes
    requests to vacants that have not yet crossed `n_min`. Reactivated by
    P4 / P7 caller routing.
    """
    return any(rep.get(d).n_eff < n_min for d in REPUTATION_DIMS)

should_idle_review_target

should_idle_review_target(*, reviewer_idle_seconds: float, target_n_eff_min: float, n_min: int = N_MIN_FOR_STABLE_SCORE, idle_threshold_s: int = IDLE_REVIEW_THRESHOLD_S) -> bool

Policy hook: an idle reviewer should peer-review a target with n_eff < n_min. P1 idle-time scheduler consumes this.

Source code in src/vacant/reputation/cold_start.py
def should_idle_review_target(
    *,
    reviewer_idle_seconds: float,
    target_n_eff_min: float,
    n_min: int = N_MIN_FOR_STABLE_SCORE,
    idle_threshold_s: int = IDLE_REVIEW_THRESHOLD_S,
) -> bool:
    """Policy hook: an idle reviewer should peer-review a target with
    `n_eff < n_min`. P1 idle-time scheduler consumes this.
    """
    return reviewer_idle_seconds >= idle_threshold_s and target_n_eff_min < n_min

show_label

show_label(rep: Beta5D, *, n_show: int = N_SHOW_MIN_THRESHOLD) -> InsufficientDataLabel

Return whether to show a scalar reputation or INSUFFICIENT_DATA.

n_show=10 per CONSTANTS.md / D008 §A: if any dim is below this, callers must not render a scalar score; show the caveat instead.

Source code in src/vacant/reputation/cold_start.py
def show_label(rep: Beta5D, *, n_show: int = N_SHOW_MIN_THRESHOLD) -> InsufficientDataLabel:
    """Return whether to show a scalar reputation or `INSUFFICIENT_DATA`.

    `n_show=10` per CONSTANTS.md / D008 §A: if any dim is below this,
    callers must not render a scalar score; show the caveat instead.
    """
    n_effs = rep.n_effs()
    partials = tuple(d for d, n in n_effs.items() if n < n_show)
    insufficient = bool(partials)
    return InsufficientDataLabel(
        show_scalar=not insufficient,
        label="INSUFFICIENT_DATA" if insufficient else "OK",
        caveats=ColdStartCaveats(
            insufficient_data=insufficient,
            n_eff_min=min(n_effs.values()) if n_effs else 0.0,
            partial_dims=partials,
        ),
    )

discount

STYLO-distance reputation discount (P3 §4.3 / dispatch §3).

When a vacant's behavioral fingerprint drifts substantially between epochs, the old evidence is less informative about the new behavior. The discount shrinks the effective sample size (alpha and beta scale together, preserving the mean while widening uncertainty).

This is the mechanism that bites self-evolution at the individual vacant level. New lineage members get a clean posterior -- see reputation/cold_start.py::initial_prior and §4.3 ("lineage as the subject of evolution").

Discount curve:

distance = 0       → discount = 1.0  (no change)
distance ~ 1.5sigma    → discount ~ 0.85
distance ~ STYLO_DRIFT_THRESHOLD (3.5) → discount ~ 0.40
distance → ∞       → discount → discount_floor (0.10)

compute_discount is shaped as a sigmoid centered just below the drift threshold; apply_discount rescales alpha and beta toward their priors so the posterior preserves its mean while shedding evidence.

CumulativeDriftTracker

CumulativeDriftTracker(*, window: int = CUMULATIVE_DRIFT_WINDOW_EPOCHS, threshold: float = STYLO_DRIFT_THRESHOLD, threshold_multiplier: float = CUMULATIVE_DRIFT_THRESHOLD_MULTIPLIER)

Rolling-window sum of per-epoch STYLO drifts.

Single-shot compute_discount is fooled by an attacker who keeps each epoch's drift just below STYLO_DRIFT_THRESHOLD while accumulating change across many epochs (P3 §3.4 Padv-P3 §2). This tracker keeps a rolling window of the last window per-epoch drifts and trips when their sum exceeds threshold * threshold_multiplier.

Tracker is per (vacant, substrate); the aggregator owns one per target it scores.

Source code in src/vacant/reputation/discount.py
def __init__(
    self,
    *,
    window: int = CUMULATIVE_DRIFT_WINDOW_EPOCHS,
    threshold: float = STYLO_DRIFT_THRESHOLD,
    threshold_multiplier: float = CUMULATIVE_DRIFT_THRESHOLD_MULTIPLIER,
) -> None:
    if window < 1:
        raise ValueError(f"window must be >= 1; got {window}")
    if threshold <= 0:
        raise ValueError(f"threshold must be > 0; got {threshold}")
    if threshold_multiplier <= 0:
        raise ValueError(f"threshold_multiplier must be > 0; got {threshold_multiplier}")
    self._window: deque[float] = deque(maxlen=window)
    self._threshold = threshold
    self._threshold_multiplier = threshold_multiplier

compute_discount

compute_discount(stylo_distance: float, *, threshold: float = STYLO_DRIFT_THRESHOLD) -> float

Map STYLO distance → discount multiplier in (discount_floor, 1.0].

Curve shape: - distance == 0 → 1.0 (no change). - Smooth sigmoid descent passing through ~0.85 at _MIDPOINT_OFFSET. - Approaches _DISCOUNT_FLOOR as distance → ∞.

Source code in src/vacant/reputation/discount.py
def compute_discount(stylo_distance: float, *, threshold: float = STYLO_DRIFT_THRESHOLD) -> float:
    """Map STYLO distance → discount multiplier in `(discount_floor, 1.0]`.

    Curve shape:
    - `distance == 0` → 1.0 (no change).
    - Smooth sigmoid descent passing through ~0.85 at `_MIDPOINT_OFFSET`.
    - Approaches `_DISCOUNT_FLOOR` as distance → ∞.
    """
    if stylo_distance < 0:
        raise ValueError(f"stylo_distance must be >= 0; got {stylo_distance}")
    if stylo_distance == 0:
        return 1.0
    # Logistic centered around the threshold, scaled so distance == 0 → ~1.0.
    # `1 / (1 + exp(k * (d - threshold)))` is in (0, 1); we re-scale to
    # ensure `d == 0 → 1.0` and `d → ∞ → _DISCOUNT_FLOOR`.
    k = math.log(_HIGH_PLATEAU / (1.0 - _HIGH_PLATEAU)) / max(threshold - _MIDPOINT_OFFSET, 1e-9)
    sigmoid = 1.0 / (1.0 + math.exp(k * (stylo_distance - threshold)))
    span = 1.0 - _DISCOUNT_FLOOR
    return _DISCOUNT_FLOOR + span * sigmoid

apply_discount

apply_discount(beta: Beta, discount: float) -> Beta

Shrink alpha and beta toward priors by discount ∈ (0, 1].

Preserves the mean (alpha / (alpha+beta)) but reduces effective sample size:

alpha' = alpha0 + discount * (alpha - alpha0)
beta' = beta0 + discount * (beta - beta0)
n_eff' = discount * n_eff
Source code in src/vacant/reputation/discount.py
def apply_discount(beta: Beta, discount: float) -> Beta:
    """Shrink alpha and beta toward priors by `discount` ∈ (0, 1].

    Preserves the mean (alpha / (alpha+beta)) but reduces effective sample size:

    ```
    alpha' = alpha0 + discount * (alpha - alpha0)
    beta' = beta0 + discount * (beta - beta0)
    n_eff' = discount * n_eff
    ```
    """
    if not (0.0 < discount <= 1.0):
        raise ValueError(f"discount must be in (0, 1]; got {discount}")
    return beta.model_copy(
        update={
            "alpha": beta.alpha0 + discount * (beta.alpha - beta.alpha0),
            "beta": beta.beta0 + discount * (beta.beta - beta.beta0),
            "n_eff": discount * beta.n_eff,
        }
    )

apply_discount_5d

apply_discount_5d(rep: Beta5D, discount: float) -> Beta5D

Apply a single discount to all five dimensions.

Source code in src/vacant/reputation/discount.py
def apply_discount_5d(rep: Beta5D, discount: float) -> Beta5D:
    """Apply a single discount to all five dimensions."""
    return Beta5D(
        factual=apply_discount(rep.factual, discount),
        logical=apply_discount(rep.logical, discount),
        relevance=apply_discount(rep.relevance, discount),
        honesty=apply_discount(rep.honesty, discount),
        adoption=apply_discount(rep.adoption, discount),
    )

dimension_imbalance_alert

dimension_imbalance_alert(rep: Beta5D, *, threshold: float | None = None) -> bool

Detect dimension imbalance (P3 §3.6 防線 4 / dispatch §"Dimension imbalance").

True iff at least one dimension's mean exceeds the threshold's gap above the lowest dimension's mean. This catches the attack pattern "pump only F while leaving A low":

  • all-similar means → False (healthy distribution)
  • one dim spiked while others stay near prior → True (imbalanced)

threshold defaults to DIMENSION_CORRELATION_ALERT_THRESHOLD = 0.6. The implementation uses max - min as a quick proxy for the correlation alert; a full pairwise correlation matrix is future work.

Source code in src/vacant/reputation/discount.py
def dimension_imbalance_alert(rep: Beta5D, *, threshold: float | None = None) -> bool:
    """Detect dimension imbalance (P3 §3.6 防線 4 / dispatch §"Dimension imbalance").

    True iff at least one dimension's mean exceeds the threshold's gap
    above the **lowest** dimension's mean. This catches the attack
    pattern "pump only F while leaving A low":

    - all-similar means → False (healthy distribution)
    - one dim spiked while others stay near prior → True (imbalanced)

    `threshold` defaults to `DIMENSION_CORRELATION_ALERT_THRESHOLD = 0.6`.
    The implementation uses `max - min` as a quick proxy for the
    correlation alert; a full pairwise correlation matrix is future work.
    """
    from vacant.core.constants import DIMENSION_CORRELATION_ALERT_THRESHOLD

    thr = threshold if threshold is not None else DIMENSION_CORRELATION_ALERT_THRESHOLD
    means = list(rep.means().values())
    if not means:
        return False
    return (max(means) - min(means)) >= thr

portability

Portability factor (dispatch §6 / P3 §"resilience-as-independent-metric").

THEORY_V5 §3.1 split portability out of raw reputation: it became an independent resilience_score. The dispatch §6 still asks for a small call_score bonus rewarding ecological contribution -- vacants that serve across multiple substrates with high success.

Implementation:

raw = sum(success_rate_per_substrate)
diversity = log(1 + n_substrates)
portability = clip(raw * diversity_norm, 0, MAX_BONUS)

The bonus is capped at PORTABILITY_FACTOR_MAX_BONUS so it can't dominate the UCB call_score (anti-Goodhart-against-portability).

compute_portability

compute_portability(*, substrates_served: list[str], success_rate_per_substrate: Mapping[str, float], max_bonus: float = PORTABILITY_FACTOR_MAX_BONUS) -> float

Return a portability bonus in [0, max_bonus].

  • substrates_served is the list of substrates this vacant has executed on.
  • success_rate_per_substrate is the per-substrate success rate ∈ [0, 1].
  • Bonus is max_bonus * diversity_factor * success_factor, where:
    • diversity_factor saturates at 1.0 once n_substrates >= 4 (log curve with reference 4).
    • success_factor = mean(success_rates over served substrates).
Source code in src/vacant/reputation/portability.py
def compute_portability(
    *,
    substrates_served: list[str],
    success_rate_per_substrate: Mapping[str, float],
    max_bonus: float = PORTABILITY_FACTOR_MAX_BONUS,
) -> float:
    """Return a portability bonus in `[0, max_bonus]`.

    - `substrates_served` is the list of substrates this vacant has
      executed on.
    - `success_rate_per_substrate` is the per-substrate success rate
      ∈ [0, 1].
    - Bonus is `max_bonus * diversity_factor * success_factor`, where:
        - `diversity_factor` saturates at 1.0 once `n_substrates >= 4`
          (log curve with reference 4).
        - `success_factor = mean(success_rates over served substrates)`.
    """
    if max_bonus < 0:
        raise ValueError(f"max_bonus must be >= 0; got {max_bonus}")
    if not substrates_served:
        return 0.0
    n = len(substrates_served)
    diversity_factor = min(1.0, math.log(1.0 + n) / math.log(1.0 + 4.0))
    rates = [
        max(0.0, min(1.0, float(success_rate_per_substrate.get(s, 0.0)))) for s in substrates_served
    ]
    success_factor = sum(rates) / n if rates else 0.0
    return max_bonus * diversity_factor * success_factor

same_detect

Same-* detection (P3 §3.4 / T5 / dispatch §5).

Three lines, each returning a SameDetectSignal:

  • same_controller: T5's three-layer pipeline (declared link → temporal correlation → behavioural similarity).
  • same_substrate: shared base-model family.
  • same_stylo: STYLO-Vec16 cosine similarity (consumes P1's shadow_self.compute_embedding).

Framing (CLAUDE.md §Things to NOT do): these raise cost, they do not prevent. The signal output biases per-review weight in the aggregator; nothing here blocks writes.

SameDetectSignal dataclass

SameDetectSignal(strength: float, suspected_cluster: frozenset[VacantId], rationale: str)

Output of every same-* detector.

strength ∈ [0, 1] is monotone in suspicion. The aggregator uses max(SAME_SIGNAL_DISCOUNT_FLOOR, 1 - max(strength)) as a per-review cost-raising multiplier, so strength=0 → no penalty and strength=1 leaves at least the floor (CLAUDE.md «same-* is cost-raising not preventing» — D015).

suspected_cluster includes both probe and target vacant ids when the signal fires; empty when strength is 0.

cosine_similarity

cosine_similarity(a: Sequence[float], b: Sequence[float]) -> float

Cosine similarity in [-1, 1] (returns 0.0 for zero-norm inputs).

Source code in src/vacant/reputation/same_detect.py
def cosine_similarity(a: Sequence[float], b: Sequence[float]) -> float:
    """Cosine similarity in [-1, 1] (returns 0.0 for zero-norm inputs)."""
    if len(a) != len(b):
        raise ValueError(f"vector dim mismatch: {len(a)} vs {len(b)}")
    dot = sum(x * y for x, y in zip(a, b, strict=True))
    na = math.sqrt(sum(x * x for x in a))
    nb = math.sqrt(sum(y * y for y in b))
    if na == 0 or nb == 0:
        return 0.0
    return dot / (na * nb)

cross_correlation

cross_correlation(a: Sequence[float], b: Sequence[float]) -> float

Pearson correlation coefficient ∈ [-1, 1] for two equal-length series. Returns 0.0 if either has zero variance.

Source code in src/vacant/reputation/same_detect.py
def cross_correlation(a: Sequence[float], b: Sequence[float]) -> float:
    """Pearson correlation coefficient ∈ [-1, 1] for two equal-length
    series. Returns 0.0 if either has zero variance.
    """
    if len(a) != len(b):
        raise ValueError(f"series dim mismatch: {len(a)} vs {len(b)}")
    n = len(a)
    if n == 0:
        return 0.0
    mean_a = sum(a) / n
    mean_b = sum(b) / n
    cov = sum((x - mean_a) * (y - mean_b) for x, y in zip(a, b, strict=True))
    var_a = sum((x - mean_a) ** 2 for x in a)
    var_b = sum((y - mean_b) ** 2 for y in b)
    if var_a == 0 or var_b == 0:
        return 0.0
    return cov / math.sqrt(var_a * var_b)

same_controller

same_controller(a: VacantId, b: VacantId, *, declared_same: bool = False, common_ancestor: bool = False, heartbeat_a: Sequence[float] | None = None, heartbeat_b: Sequence[float] | None = None, behavior_a: Sequence[float] | None = None, behavior_b: Sequence[float] | None = None, temporal_threshold: float = SAME_CONTROLLER_TEMPORAL_THRESHOLD, behavior_threshold: float = SAME_CONTROLLER_BEHAVIOR_THRESHOLD) -> SameDetectSignal

T5 §3.2 three-layer same-controller detection.

Layer 0 (declared): controller_id match or shared ancestor → 1.0. Layer 1 (temporal): cross-correlation of heartbeat series; strength proportional to (corr - threshold) / (1 - threshold). Layer 2 (behaviour): cosine similarity over behavioural fingerprints (capability text embedding or STYLO Vec16); strength proportional to (sim - threshold) / (1 - threshold).

The detector takes the max of all layers that fire -- once any layer is positive, downstream weight discount applies.

Source code in src/vacant/reputation/same_detect.py
def same_controller(
    a: VacantId,
    b: VacantId,
    *,
    declared_same: bool = False,
    common_ancestor: bool = False,
    heartbeat_a: Sequence[float] | None = None,
    heartbeat_b: Sequence[float] | None = None,
    behavior_a: Sequence[float] | None = None,
    behavior_b: Sequence[float] | None = None,
    temporal_threshold: float = SAME_CONTROLLER_TEMPORAL_THRESHOLD,
    behavior_threshold: float = SAME_CONTROLLER_BEHAVIOR_THRESHOLD,
) -> SameDetectSignal:
    """T5 §3.2 three-layer same-controller detection.

    Layer 0 (declared): `controller_id` match or shared ancestor → 1.0.
    Layer 1 (temporal): cross-correlation of heartbeat series; strength
    proportional to `(corr - threshold) / (1 - threshold)`.
    Layer 2 (behaviour): cosine similarity over behavioural fingerprints
    (capability text embedding or STYLO Vec16); strength proportional
    to `(sim - threshold) / (1 - threshold)`.

    The detector takes the **max** of all layers that fire -- once any
    layer is positive, downstream weight discount applies.
    """
    if a == b:
        return SameDetectSignal(
            strength=0.0,
            suspected_cluster=frozenset(),
            rationale="self-pair (a == b)",
        )

    rationale_parts: list[str] = []
    strength = 0.0

    # Layer 0
    if declared_same:
        rationale_parts.append("declared controller_id match")
        strength = max(strength, SAME_CONTROLLER_DECLARED_STRENGTH)
    elif common_ancestor:
        rationale_parts.append("common parent_id ancestor")
        strength = max(strength, SAME_CONTROLLER_DECLARED_STRENGTH)

    # Layer 1
    if heartbeat_a is not None and heartbeat_b is not None:
        corr = cross_correlation(heartbeat_a, heartbeat_b)
        if corr > temporal_threshold:
            l1 = (corr - temporal_threshold) / max(1.0 - temporal_threshold, 1e-9)
            l1 = min(max(l1, 0.0), 1.0)
            rationale_parts.append(f"heartbeat corr {corr:.2f} > {temporal_threshold:.2f}")
            strength = max(strength, l1)

    # Layer 2
    if behavior_a is not None and behavior_b is not None:
        sim = cosine_similarity(behavior_a, behavior_b)
        if sim > behavior_threshold:
            l2 = (sim - behavior_threshold) / max(1.0 - behavior_threshold, 1e-9)
            l2 = min(max(l2, 0.0), 1.0)
            rationale_parts.append(f"behaviour sim {sim:.2f} > {behavior_threshold:.2f}")
            strength = max(strength, l2)

    cluster = frozenset({a, b}) if strength > 0 else frozenset()
    return SameDetectSignal(
        strength=strength,
        suspected_cluster=cluster,
        rationale=" + ".join(rationale_parts) if rationale_parts else "no signal",
    )

same_substrate

same_substrate(a: VacantId, b: VacantId, *, family_a: str, family_b: str) -> SameDetectSignal

Strength = 1.0 iff family_a == family_b, else 0.

"Strength = 1" means full P3 §3.4.1 discount applies; the aggregator halves weight (and quarter-weight if many recent reviews) at the review-weighting step. The actual numeric discount lives in aggregator.py; this detector just emits the binary signal.

Source code in src/vacant/reputation/same_detect.py
def same_substrate(
    a: VacantId,
    b: VacantId,
    *,
    family_a: str,
    family_b: str,
) -> SameDetectSignal:
    """Strength = 1.0 iff `family_a == family_b`, else 0.

    "Strength = 1" means full P3 §3.4.1 discount applies; the aggregator
    halves weight (and quarter-weight if many recent reviews) at the
    review-weighting step. The actual numeric discount lives in
    `aggregator.py`; this detector just emits the binary signal.
    """
    if a == b:
        return SameDetectSignal(
            strength=0.0,
            suspected_cluster=frozenset(),
            rationale="self-pair (a == b)",
        )
    if family_a == family_b:
        return SameDetectSignal(
            strength=1.0,
            suspected_cluster=frozenset({a, b}),
            rationale=f"shared base_model_family {family_a!r}",
        )
    return SameDetectSignal(
        strength=0.0,
        suspected_cluster=frozenset(),
        rationale=f"distinct families ({family_a} vs {family_b})",
    )

same_stylo

same_stylo(a: VacantId, b: VacantId, *, embedding_a: Sequence[float], embedding_b: Sequence[float], threshold: float = SAME_CONTROLLER_BEHAVIOR_THRESHOLD) -> SameDetectSignal

STYLO-Vec16 similarity detector.

Consumes embeddings produced by P1's vacant.runtime.shadow_self.compute_embedding (or the real STYLO encoder once it lands). Strength is (sim - threshold) / (1 - threshold) clipped to [0, 1].

Source code in src/vacant/reputation/same_detect.py
def same_stylo(
    a: VacantId,
    b: VacantId,
    *,
    embedding_a: Sequence[float],
    embedding_b: Sequence[float],
    threshold: float = SAME_CONTROLLER_BEHAVIOR_THRESHOLD,
) -> SameDetectSignal:
    """STYLO-Vec16 similarity detector.

    Consumes embeddings produced by P1's
    `vacant.runtime.shadow_self.compute_embedding` (or the real STYLO
    encoder once it lands). Strength is `(sim - threshold) /
    (1 - threshold)` clipped to [0, 1].
    """
    if a == b:
        return SameDetectSignal(
            strength=0.0,
            suspected_cluster=frozenset(),
            rationale="self-pair (a == b)",
        )
    sim = cosine_similarity(embedding_a, embedding_b)
    if sim <= threshold:
        return SameDetectSignal(
            strength=0.0,
            suspected_cluster=frozenset(),
            rationale=f"sim {sim:.2f} <= {threshold:.2f}",
        )
    raw = (sim - threshold) / max(1.0 - threshold, 1e-9)
    strength = min(max(raw, 0.0), 1.0)
    return SameDetectSignal(
        strength=strength,
        suspected_cluster=frozenset({a, b}),
        rationale=f"STYLO sim {sim:.2f} > {threshold:.2f}",
    )

discount_from_signals

discount_from_signals(signals: Sequence[SameDetectSignal]) -> float

Compose multiple SameDetectSignals into a single weight multiplier.

max(SAME_SIGNAL_DISCOUNT_FLOOR, 1 - max(strength)) is conservative: any one detector firing reduces weight by its strength; the strongest detector dominates so we don't compound penalties (the dispatch's explicit framing — these are signals, not evidence to be summed).

The floor (D015) is load-bearing: same- detection is cost-raising, not preventing* (CLAUDE.md §Load-bearing theory decisions). Even at strength=1.0 we must not zero a reviewer's contribution — that would convert a probabilistic suspicion into a unilateral mute.

Source code in src/vacant/reputation/same_detect.py
def discount_from_signals(signals: Sequence[SameDetectSignal]) -> float:
    """Compose multiple `SameDetectSignal`s into a single weight multiplier.

    `max(SAME_SIGNAL_DISCOUNT_FLOOR, 1 - max(strength))` is conservative:
    any one detector firing reduces weight by its `strength`; the strongest
    detector dominates so we don't compound penalties (the dispatch's
    explicit framing — these are signals, not evidence to be summed).

    The floor (D015) is load-bearing: same-* detection is *cost-raising,
    not preventing* (CLAUDE.md §Load-bearing theory decisions). Even at
    `strength=1.0` we must not zero a reviewer's contribution — that
    would convert a probabilistic suspicion into a unilateral mute.
    """
    if not signals:
        return 1.0
    max_strength = max(s.strength for s in signals)
    return max(SAME_SIGNAL_DISCOUNT_FLOOR, 1.0 - max_strength)

errors

Error hierarchy for vacant.reputation.

ReputationError

Bases: CoreError

Base class for vacant.reputation errors.

IneligibleReviewerError

Bases: ReputationError

A reviewer's runtime state forbids new reviews (P1 §4.1).

InvalidDimensionError

Bases: ReputationError

An unknown reputation dimension was referenced.

InvalidSignalError

Bases: ReputationError

A signal is malformed (e.g. score outside [0,1]).

ReviewRateLimitError

Bases: ReputationError

A target's per-window review rate limit was exceeded (Padv-P3 finding D010 §1).

ChainTamperError

Bases: ReputationError

A reviewer's logbook failed verify_chain during record_review audit (D015 §D). Posterior update is rolled back and the review is rejected — the auditable history of reputation events is the load- bearing claim of the thesis (CLAUDE.md §load-bearing decisions).

MissingAuditKeyError

Bases: ReputationError

The aggregator was constructed in audit mode but the reviewer's signing key / logbook is not registered (D015 §D).