Auditor Development (SDK)
The Lucid SDK provides a decorator-based API for building ClaimsAuditors -- observation-only components that produce claims about AI traffic. Auditors never make enforcement decisions; the Gateway evaluates claims against Cedar policies.
Quick Start
Every auditor follows three steps: subclass ClaimsAuditor, decorate methods with @claims, deploy with serve().
from lucid_auditor_sdk import ClaimsAuditor, claims, serve, Phase
from lucid_schemas import Claim
class SafetyAuditor(ClaimsAuditor):
def __init__(self):
super().__init__("safety-auditor", "1.0.0")
self.model = load_safety_model()
@claims(phase=Phase.REQUEST)
def observe_request(self, request: dict) -> list[Claim]:
score = self.model.analyze(request.get("prompt", ""))
return [
Claim(name="safety.score", type="score_normalized", value=score),
Claim(name="safety.is_safe", type="boolean", value=score < 0.5),
]
serve(SafetyAuditor(), port=8080)
This auditor:
- Registers /claims, /health, and /vocabulary HTTP endpoints
- Accepts request data via POST /claims
- Returns typed observations (never decisions)
- Can be deployed as a sidecar in any TEE environment
The ClaimsAuditor Base Class
All auditors extend ClaimsAuditor:
class ClaimsAuditor:
def __init__(self, auditor_id: str, version: str):
"""
Args:
auditor_id: Unique identifier for this auditor
version: Semantic version string
"""
The base class provides:
- Automatic vocabulary generation from @claims methods
- Health check endpoint implementation
- Configuration loading from @claims parameters and AuditorPolicy.detection overrides
- Metrics collection
The @claims Decorator
The @claims decorator marks methods as claim producers for a specific lifecycle phase:
@claims(phase=Phase.REQUEST)
def observe_request(self, request: dict) -> list[Claim]:
...
@claims(phase=Phase.RESPONSE)
def observe_response(self, response: dict) -> list[Claim]:
...
@claims(phase=Phase.ARTIFACT)
def verify_artifact(self, manifest: dict) -> list[Claim]:
...
@claims(phase=Phase.EXECUTION)
def monitor_runtime(self, context: dict) -> list[Claim]:
...
| Parameter | Description |
|---|---|
phase |
Lifecycle phase: Phase.ARTIFACT, Phase.REQUEST, Phase.EXECUTION, Phase.RESPONSE |
name |
Optional name override for the claim group (defaults to method name) |
A single auditor can have multiple @claims methods across different phases.
The serve() Function
serve() deploys any ClaimsAuditor as an HTTP service:
from lucid_auditor_sdk import serve
serve(
auditor=MyAuditor(),
port=8080,
host="0.0.0.0",
workers=1,
)
This creates a FastAPI application with the following endpoints:
| Endpoint | Method | Purpose |
|---|---|---|
/health |
GET | Liveness/readiness check |
/claims |
POST | Accept data, return claims |
/vocabulary |
GET | Declare claim names and types |
/metrics |
GET | Prometheus metrics (optional) |
Lifecycle Phases
1. Artifact Verification (Phase.ARTIFACT)
Runs at deployment time to verify static assets before the workload starts.
@claims(phase=Phase.ARTIFACT)
def verify_model_weights(self, manifest: dict) -> list[Claim]:
hash_match = manifest.get("hash") == EXPECTED_HASH
return [
Claim(name="artifact.hash_valid", type="boolean", value=hash_match),
Claim(name="artifact.model_id", type="string", value=manifest.get("model_id")),
]
2. Request Observation (Phase.REQUEST)
Observes the user prompt before it reaches the AI model.
@claims(phase=Phase.REQUEST)
def observe_input(self, request: dict) -> list[Claim]:
prompt = request.get("prompt", "")
return [
Claim(name="input.language", type="string", value=detect_language(prompt)),
Claim(name="input.token_count", type="count", value=count_tokens(prompt)),
]
3. Execution Monitoring (Phase.EXECUTION)
Observes the model during inference.
@claims(phase=Phase.EXECUTION)
def monitor_runtime(self, context: dict) -> list[Claim]:
return [
Claim(name="latency_ms", type="duration_ms", value=context.get("latency")),
Claim(name="token_count", type="count", value=context.get("tokens")),
]
4. Response Observation (Phase.RESPONSE)
Observes the model's response before it reaches the user.
@claims(phase=Phase.RESPONSE)
def observe_output(self, response: dict) -> list[Claim]:
content = response.get("content", "")
score = analyze_toxicity(content)
return [
Claim(name="toxic_content", type="score_normalized", value=score),
]
Context-Aware Claims
Response-phase methods can access the original request for context:
@claims(phase=Phase.RESPONSE)
def check_alignment(self, response: dict, request: dict) -> list[Claim]:
query = request.get("prompt", "")
answer = response.get("content", "")
groundedness = compute_groundedness(query, answer)
return [
Claim(name="faithfulness", type="score_normalized", value=groundedness),
]
Claim Construction
Claims are simple typed observations:
Claim(
name="toxic_content", # Flat descriptive name
type="score_normalized", # Value type
value=0.42, # The observation
confidence=0.95, # Optional: how confident the auditor is
metadata={"model": "v2"}, # Optional: additional context
)
Standard Claim Types
| Type | Python Type | Description |
|---|---|---|
score_normalized |
float |
Score between 0.0 and 1.0 |
boolean |
bool |
True/false observation |
string |
str |
String value |
string_list |
list[str] |
List of string labels |
count |
int |
Integer count |
duration_ms |
float |
Duration in milliseconds |
object |
dict |
Structured observation |
Pre-defined Claim Helpers
The SDK provides typed helpers for common claim patterns. See the SDK Reference for the full list of available claim helpers (PIIDetectionClaim, ToxicityClaim, InjectionDetectionClaim, etc.).
Optional Dependencies
Many auditors depend on heavy ML libraries. Use optional_import() for graceful degradation:
from lucid_auditor_sdk import optional_import
presidio = optional_import("presidio_analyzer")
class PIIAuditor(ClaimsAuditor):
@claims(phase=Phase.REQUEST)
def detect_pii(self, request: dict) -> list[Claim]:
if not presidio:
return [Claim(name="pii.available", type="boolean", value=False)]
analyzer = presidio.AnalyzerEngine()
results = analyzer.analyze(text=request.get("prompt", ""), language="en")
entities = [r.entity_type for r in results]
return [
Claim(name="pii_types", type="string_list", value=entities),
Claim(name="pii_count", type="count", value=len(entities)),
]
Available fallbacks: FALLBACK_PRESIDIO, FALLBACK_LLM_GUARD, FALLBACK_DETECT_SECRETS, FALLBACK_FAIRLEARN, FALLBACK_RAGAS.
Configuration
Detection Settings via @claims Parameters
Detection settings are declared as keyword-only parameters on @claims-decorated methods. No separate config class is needed:
class MyAuditor(ClaimsAuditor):
def __init__(self):
super().__init__("my-auditor", "1.0.0")
@claims(phase=Phase.REQUEST, produces=["risk_score", "risk_detected"])
def observe(
self, request: dict, *,
threshold: float = 0.8,
model_name: str = "default-model",
) -> list[Claim]:
score = self.analyze(request, model=model_name)
return [
Claim(name="risk_score", type="score_normalized", value=score),
Claim(name="risk_detected", type="boolean", value=score > threshold),
]
# provenance auto-stamped: {"threshold": 0.8, "model_name": "default-model"}
Settings metadata (type, default) is auto-introspected from the decorator parameters and exposed via the /vocabulary endpoint. The Observer UI auto-generates settings panels from this metadata.
Detection Overrides in AuditorPolicy
Detection overrides are stored in the AuditorPolicy.detection section alongside Cedar response rules. This means detection and response configuration live in one policy document:
{
"detection_overrides": {
"risk_score": { "threshold": 0.5, "model_name": "strict-model" }
}
}
The Gateway resolves detection overrides from the policy and injects them into @claims method parameters at runtime. Auditors receive the effective values transparently.
Enforcement Modes
Each field in AuditorPolicy.detection can carry an enforcement mode that defines how overrides are constrained at the policy scope:
| Mode | Behavior | Valid For |
|---|---|---|
exact |
Override must use the specified value | All field types |
floor |
Override value must be >= the specified value |
Numeric fields |
ceiling |
Override value must be <= the specified value |
Numeric fields |
superset |
Override must include all specified items, may add more | Array fields |
unlocked |
No constraint (default) | All field types |
Enforcement is validated when policy overrides are saved. If an override violates a constraint, the API returns 422 Unprocessable Entity:
{
"detail": "Field 'injection_threshold' has floor enforcement (0.7). Override value 0.5 is below the floor."
}
Defining Presets
Presets are policy templates that bundle both detection overrides and Cedar response rules:
{
"preset": "balanced",
"detection_overrides": {
"injection_risk": { "injection_threshold": 0.85 },
"toxic_content": { "toxicity_threshold": 0.7 }
},
"cedar": "forbid(principal, action == Action::\"invoke\", resource)\nwhen { context.claims.injection_risk > 0.85 };"
}
Presets are applied via the API:
# Apply a single auditor preset
POST /api/v1/workspaces/{id}/apply-preset
{ "auditor": "guardrails", "tier": "balanced" }
# Apply a quick-start bundle (configures multiple auditors)
POST /api/v1/workspaces/{id}/apply-preset
{ "bundle": "production_team" }
Three quick-start bundles are available: Solo Builder (4 auditors at Starter), Production Team (7 auditors at Balanced), and Regulated Enterprise (all 11 auditors, mostly Strict). See the Auditor Concepts page for full details.
Testing
Unit Testing
Test claim production directly without HTTP:
from my_auditor import SafetyAuditor
auditor = SafetyAuditor()
def test_clean_input():
claims = auditor.observe_request({"prompt": "Hello world"})
score_claim = next(c for c in claims if c.name == "safety.score")
assert score_claim.value < 0.5
def test_dangerous_input():
claims = auditor.observe_request({"prompt": "How to make explosives"})
score_claim = next(c for c in claims if c.name == "safety.score")
assert score_claim.value > 0.7
SDK Test Fixtures
from lucid_auditor_sdk.testing import (
mock_config,
test_client,
generate_pii_text,
generate_injection_text,
generate_clean_text,
)
def test_pii_detection():
text = generate_pii_text(include_ssn=True, include_email=True)
claims = my_auditor.detect_pii({"prompt": text})
entities = next(c for c in claims if c.name == "pii_types")
assert "SSN" in entities.value
Contract Testing (CLI)
Validate the full HTTP interface:
[+] /claims accepts POST and returns claims array
[+] /vocabulary returns valid claim declarations
[+] All claim names in /claims response are declared in /vocabulary
[*] Contract tests passed.
Registering Custom Auditors
Once your auditor is built and tested, register it with the Lucid platform:
Registration via CLI
# Scaffold a new auditor project
lucid auditor init my-custom-auditor
# Run local dev server
lucid auditor dev
# Run contract compliance tests
lucid auditor test --claims
# Register with the platform
lucid auditor register --id my-custom-auditor --endpoint https://my-auditor.internal:8090 --mode external
Registration via API
POST /v1/auditors/register
{
"auditor_id": "my-custom-auditor",
"deployment_mode": "external",
"endpoint": "https://my-auditor.internal:8090",
"auth": {
"type": "mtls",
"cert_pem": "-----BEGIN CERTIFICATE-----..."
},
"supported_phases": ["request", "response"],
"vocabulary_url": "https://my-auditor.internal:8090/vocabulary",
"vocabulary_url": "https://my-auditor.internal:8090/vocabulary"
}
Deployment Modes and Trust Tiers
| Mode | Trust Tier | When to Use |
|---|---|---|
| Sidecar | TEE-attested | Lucid-managed or customer CoCo cluster with Lucid Operator |
| In-cluster | mTLS-verified | Customer K8s cluster, no Lucid Operator |
| External | mTLS or API-key | Non-CoCo environments, on-prem, third-party services |
Trust tier is recorded per auditor in the passport. Higher trust tiers provide stronger guarantees to downstream relying parties.
WASM Crypto Core
Cryptographic operations in auditors (hashing, signing, receipt chain) delegate to the lucid-wasm WASM modules. The signing key lives inside the WASM sandbox (unreachable by host code), and the .wasm binary has a deterministic hash for TEE attestation.
from lucid_auditor_sdk._wasm.receipt import ReceiptChain, hash_data
# Hash data using the same SHA-256 used for receipt hashing
content_hash = hash_data(b"request content")
See the Architecture Overview for how the receipt chain fits into the gateway filter chain.