Audit Logging¶
Overview¶
The ApprovalAuditLog records every approval decision made by the system, providing a persistent, queryable trail for compliance, debugging, and operational analysis. Every action that passes through an ApprovalGate -- whether approved, denied, auto-approved, or timed out -- generates an AuditEntry that captures the full context: what was requested, who decided, why, and when.
The audit log supports both in-memory storage for fast querying and file-based persistence for long-term retention.
ApprovalAuditLog Class¶
class ApprovalAuditLog:
"""Audit log for approval decisions."""
def __init__(
self,
log_file: str | Path | None = None,
max_memory_entries: int = 1000,
):
...
| Parameter | Type | Default | Description |
|---|---|---|---|
log_file | str \| Path \| None | None | Path to JSONL file for persistent storage. Parent directories are created automatically |
max_memory_entries | int | 1000 | Maximum entries to keep in memory. Oldest are evicted when exceeded |
Methods¶
| Method | Signature | Description |
|---|---|---|
log | (request, response) -> AuditEntry | Record an approval decision |
get_entries | (limit, approved_only, denied_only, risk_level) -> list[AuditEntry] | Query entries with filters |
get_summary | () -> dict | Get aggregate statistics |
export_report | (output_path) -> None | Export a Markdown compliance report |
load_from_file | () -> int | Load entries from the log file into memory |
clear | () -> None | Clear in-memory entries (does not affect file) |
AuditEntry Data Class¶
Each audit entry captures the complete context of an approval decision:
@dataclass
class AuditEntry:
entry_id: str # Unique entry identifier
timestamp: str # ISO 8601 timestamp of the decision
request_id: str # Matching approval request ID
action_type: str # Action type (e.g., "code", "final")
risk_level: str # Risk level string (e.g., "high", "critical")
approved: bool # Whether the action was approved
status: str # ApprovalStatus value (e.g., "approved", "auto_denied")
reason: str # Explanation for the decision
approver: str # Who/what made the decision
code_preview: str # First 200 chars of the code (truncated)
affected_resources: list[str] # Resources impacted by the action
metadata: dict[str, Any] # Additional data (reversible, risk_reasons)
| Field | Type | Description |
|---|---|---|
entry_id | str | Composite ID: "{request_id}-{date}" |
timestamp | str | When the decision was made (UTC ISO 8601) |
request_id | str | The original approval request's ID |
action_type | str | What kind of action was requested |
risk_level | str | Risk level as a string value |
approved | bool | Final boolean decision |
status | str | Detailed status (e.g., "approved", "auto_denied", "timeout") |
reason | str | Human-readable explanation |
approver | str | Identity of the decision-maker (e.g., "console_user", "auto_approve_handler") |
code_preview | str | Truncated code (first 200 characters + "..." if longer) |
affected_resources | list[str] | Resources from the risk assessment |
metadata | dict | Extra data including reversible flag and risk_reasons list |
Factory Method¶
Entries are normally created automatically by the audit log, but can be constructed manually:
Serialization¶
Setup¶
In-Memory Only¶
from rlm_code.rlm.approval import ApprovalAuditLog
# Memory-only audit log (no file persistence)
audit_log = ApprovalAuditLog(max_memory_entries=500)
With File Persistence¶
# Persistent audit log (JSONL format)
audit_log = ApprovalAuditLog(
log_file="logs/approval_audit.jsonl",
max_memory_entries=1000,
)
The log file uses JSON Lines format -- one JSON object per line. Parent directories are created automatically if they do not exist.
JSONL format
Each line in the log file is a complete JSON object representing one AuditEntry:
{"entry_id": "abc12345-2025-01-15", "timestamp": "2025-01-15T10:30:00+00:00", "request_id": "abc12345", "action_type": "code", "risk_level": "high", "approved": true, "status": "approved", "reason": "User approved via console", "approver": "console_user", "code_preview": "import shutil; shutil.rmtree('/tmp/data')", "affected_resources": ["file:/tmp/data"], "metadata": {"reversible": false, "risk_reasons": ["File deletion may cause data loss"]}}
Integration with ApprovalGate¶
from rlm_code.rlm.approval import ApprovalGate, ApprovalPolicy, ApprovalAuditLog
audit_log = ApprovalAuditLog(log_file="audit.jsonl")
gate = ApprovalGate(
policy=ApprovalPolicy.CONFIRM_HIGH_RISK,
audit_log=audit_log,
)
# All decisions through this gate are automatically logged
request = gate.check_action({"action": "code", "code": "os.remove('file.txt')"})
response = await gate.request_approval(request)
# Audit entry created automatically
Logging Decisions¶
The log() method is called automatically by ApprovalGate but can also be called directly:
from rlm_code.rlm.approval import (
ApprovalAuditLog,
ApprovalRequest,
ApprovalResponse,
ApprovalStatus,
RiskAssessment,
ToolRiskLevel,
)
audit_log = ApprovalAuditLog(log_file="audit.jsonl")
# Log is called automatically by ApprovalGate, but can be manual:
entry = audit_log.log(request, response)
print(f"Logged: {entry.entry_id} - {entry.status}")
Fail-safe logging
File writes in the audit log use silent error handling. If the log file cannot be written (permissions, disk full, etc.), the error is silently ignored and the in-memory log continues to function. This ensures that audit logging never disrupts the agent's execution.
Querying Entries¶
Basic Queries¶
# Get all entries
all_entries = audit_log.get_entries()
# Get last 10 entries
recent = audit_log.get_entries(limit=10)
# Get only approved entries
approved = audit_log.get_entries(approved_only=True)
# Get only denied entries
denied = audit_log.get_entries(denied_only=True)
# Filter by risk level
high_risk = audit_log.get_entries(risk_level="high")
critical = audit_log.get_entries(risk_level="critical")
Query Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
limit | int \| None | None | Maximum number of entries to return (from most recent) |
approved_only | bool | False | Only return approved entries |
denied_only | bool | False | Only return denied entries |
risk_level | str \| None | None | Filter by risk level string ("safe", "low", "medium", "high", "critical") |
Filter precedence
If both approved_only and denied_only are set to True, approved_only takes precedence and denied_only is ignored.
Example: Investigating Denied Actions¶
denied_entries = audit_log.get_entries(denied_only=True, risk_level="critical")
for entry in denied_entries:
print(f"[{entry.timestamp}] {entry.action_type}")
print(f" Risk: {entry.risk_level}")
print(f" Reason: {entry.reason}")
print(f" Code: {entry.code_preview}")
print(f" Resources: {', '.join(entry.affected_resources)}")
print()
Summary Statistics¶
The get_summary() method provides aggregate statistics across all logged entries:
Returns:
{
"total": 150,
"approved": 120,
"denied": 30,
"approval_rate": 0.8,
"by_risk_level": {
"safe": {"total": 80, "approved": 80, "denied": 0},
"low": {"total": 30, "approved": 28, "denied": 2},
"medium": {"total": 25, "approved": 10, "denied": 15},
"high": {"total": 12, "approved": 2, "denied": 10},
"critical": {"total": 3, "approved": 0, "denied": 3},
},
}
| Field | Type | Description |
|---|---|---|
total | int | Total number of logged decisions |
approved | int | Number of approved actions |
denied | int | Number of denied actions |
approval_rate | float | Ratio of approved to total (0.0 to 1.0) |
by_risk_level | dict | Breakdown by risk level with per-level totals, approved, and denied counts |
Empty Log Summary¶
If no entries have been logged:
Exporting Compliance Reports¶
The export_report() method generates a Markdown compliance report:
The generated report includes:
# Approval Audit Report
Generated: 2025-01-15T18:30:00+00:00
## Summary
- Total decisions: 150
- Approved: 120
- Denied: 30
- Approval rate: 80.0%
## By Risk Level
- SAFE: 80 total, 80 approved (100%)
- LOW: 30 total, 28 approved (93%)
- MEDIUM: 25 total, 10 approved (40%)
- HIGH: 12 total, 2 approved (17%)
- CRITICAL: 3 total, 0 approved (0%)
## Recent Entries
- [2025-01-15T18:29:45] APPROVED code (low) - File read operation approved
- [2025-01-15T18:29:30] DENIED code (high) - User denied via console
- [2025-01-15T18:29:15] AUTO_APPROVED code (safe) - No approval required per po
...
Compliance use cases
The exported report is useful for:
- SOC 2 audits: Demonstrating that risky actions require approval
- Incident investigation: Understanding the sequence of approved/denied actions
- Security reviews: Identifying patterns in approval decisions
- Team reporting: Sharing agent activity summaries
Loading from File¶
If the audit log has file persistence, you can reload entries from disk:
audit_log = ApprovalAuditLog(log_file="audit.jsonl")
# Load previous entries from file
loaded_count = audit_log.load_from_file()
print(f"Loaded {loaded_count} entries from file")
# Now get_entries() includes loaded entries
all_entries = audit_log.get_entries()
Memory limits
Loaded entries are subject to max_memory_entries. If the file contains more entries than the limit, only the most recent entries are kept in memory.
Error handling
Malformed lines in the log file are silently skipped. This ensures that a single corrupted entry does not prevent loading the rest of the audit trail.
Clearing the Log¶
# Clear in-memory entries (file is NOT affected)
audit_log.clear()
# After clear, get_entries() returns empty
entries = audit_log.get_entries() # []
# But the file still contains all previous entries
# You can reload them:
loaded = audit_log.load_from_file()
Complete Example¶
from rlm_code.rlm.approval import (
ApprovalGate,
ApprovalPolicy,
ApprovalAuditLog,
ConsoleApprovalHandler,
RiskAssessor,
)
from rlm_code.rlm.approval.policy import RiskRule, ToolRiskLevel
# 1. Set up audit log with file persistence
audit_log = ApprovalAuditLog(
log_file="logs/session_audit.jsonl",
max_memory_entries=2000,
)
# 2. Set up risk assessor with custom rules
assessor = RiskAssessor()
assessor.add_rule(RiskRule(
name="env_var_modification",
pattern=r"os\.environ\[",
risk_level=ToolRiskLevel.MEDIUM,
reason="Environment variable modification detected",
reversible=True,
))
# 3. Set up gate with all components
gate = ApprovalGate(
policy=ApprovalPolicy.CONFIRM_MEDIUM_AND_UP,
risk_assessor=assessor,
approval_handler=ConsoleApprovalHandler(timeout_seconds=120).handle,
audit_log=audit_log,
)
# 4. Agent execution loop
async def agent_loop(actions):
for action in actions:
request = gate.check_action(action)
if request.requires_approval:
response = await gate.request_approval(request)
if not response.approved:
print(f"Skipping: {response.reason}")
continue
# Execute approved action
execute(action)
# 5. After execution, analyze the audit trail
summary = audit_log.get_summary()
print(f"Total decisions: {summary['total']}")
print(f"Approval rate: {summary['approval_rate']:.0%}")
print(f"Denied high-risk: {summary['by_risk_level'].get('high', {}).get('denied', 0)}")
# 6. Query specific entries
denied_critical = audit_log.get_entries(denied_only=True, risk_level="critical")
for entry in denied_critical:
print(f"DENIED CRITICAL: {entry.code_preview}")
print(f" Reason: {entry.reason}")
print(f" Resources: {entry.affected_resources}")
# 7. Export compliance report
audit_log.export_report("reports/session_compliance.md")