Skip to content

Risk Assessment

Overview

The RiskAssessor evaluates agent actions against a comprehensive set of risk rules to determine their potential for harm. It uses pattern matching -- both regex and callable-based -- to scan action code for dangerous operations, extract affected resources, estimate impact, and produce actionable recommendations. The default rule set covers 40+ common risk patterns across file operations, network access, system commands, database operations, and more.


RiskAssessor Class

class RiskAssessor:
    """Assesses risk of tool actions."""

    def __init__(
        self,
        rules: list[RiskRule] | None = None,
        custom_assessor: Callable[[dict[str, Any]], RiskAssessment] | None = None,
    ):
        self.rules = rules or self.DEFAULT_RULES.copy()
        self.custom_assessor = custom_assessor
Parameter Type Default Description
rules list[RiskRule] \| None DEFAULT_RULES List of risk rules to evaluate against
custom_assessor Callable \| None None Custom assessor function that bypasses rule-based evaluation

Methods

Method Signature Description
assess (action: dict) -> RiskAssessment Evaluate the risk of an action
add_rule (rule: RiskRule) -> None Add a custom risk rule
remove_rule (name: str) -> bool Remove a rule by name. Returns True if found and removed

ToolRiskLevel Enum

Risk is classified into five levels, from harmless to potentially catastrophic:

class ToolRiskLevel(Enum):
    SAFE     = "safe"      # No risk, auto-approve
    LOW      = "low"       # Minor risk, usually approve
    MEDIUM   = "medium"    # Moderate risk, consider carefully
    HIGH     = "high"      # High risk, require confirmation
    CRITICAL = "critical"  # Critical risk, require explicit approval
Level Typical Actions Approval Recommendation
SAFE print(), display operations Auto-approve
LOW File reads, variable inspection Usually approve
MEDIUM File writes, subprocess calls, pip install, git commit Consider carefully
HIGH File deletion, sudo, network POST/PUT/DELETE, git force-push Require confirmation
CRITICAL rm -rf, DROP DATABASE, disk formatting Require explicit approval

RiskAssessment Data Class

The output of a risk evaluation:

@dataclass
class RiskAssessment:
    level: ToolRiskLevel                     # Overall risk level (highest triggered)
    reasons: list[str]                       # Why this risk level was assigned
    affected_resources: list[str]            # Resources that may be impacted
    reversible: bool = True                  # Whether the action can be undone
    estimated_impact: str = ""               # Human-readable impact estimate
    recommendations: list[str]               # Suggested precautions
Field Type Description
level ToolRiskLevel The highest risk level among all triggered rules
reasons list[str] Explanations from each triggered rule
affected_resources list[str] Extracted resources (files, URLs, tables) with type prefixes
reversible bool True only if all triggered rules are reversible
estimated_impact str Human-readable impact summary based on risk level
recommendations list[str] Actionable advice (e.g., "Review carefully before approving")

The requires_approval Property

@property
def requires_approval(self) -> bool:
    """Check if this risk level typically requires approval."""
    return self.level in (ToolRiskLevel.HIGH, ToolRiskLevel.CRITICAL)

This property is used by the CUSTOM approval policy mode.


RiskRule Data Class

Individual rules that the assessor evaluates:

@dataclass
class RiskRule:
    name: str                                          # Unique rule identifier
    pattern: str | Callable[[dict[str, Any]], bool]    # Regex pattern or callable
    risk_level: ToolRiskLevel                          # Risk level when triggered
    reason: str                                        # Human-readable explanation
    reversible: bool = True                            # Whether the action is reversible
Field Type Description
name str Unique identifier for the rule (used for remove_rule)
pattern str \| Callable Regex pattern matched against action["code"], or a callable that receives the full action dict
risk_level ToolRiskLevel Risk level assigned when this rule triggers
reason str Explanation shown to the user
reversible bool Whether the action can be undone

Default Risk Rules

The RiskAssessor ships with a comprehensive set of default rules organized by risk level:

CRITICAL Risk Rules

Rule Name Pattern Reason Reversible
rm_recursive rm\s+-rf?\s+ Recursive file deletion can cause irreversible data loss No
drop_database DROP\s+(DATABASE\|TABLE\|SCHEMA) Database deletion is typically irreversible No
format_disk (mkfs\|format\|fdisk) Disk formatting destroys all data No

HIGH Risk Rules

Rule Name Pattern Reason Reversible
file_delete (os\.remove\|os\.unlink\|shutil\.rmtree\|Path.*\.unlink) File deletion may cause data loss No
git_force_push git\s+push\s+.*(-f\|--force) Force push can overwrite remote history No
git_reset_hard git\s+reset\s+--hard Hard reset discards uncommitted changes No
sudo_command sudo\s+ Elevated privileges can affect system stability Yes
network_request (requests\.(post\|put\|delete\|patch)\|urllib\|httpx\.(post\|put\|delete)) Modifying external resources via network No

MEDIUM Risk Rules

Rule Name Pattern Reason Reversible
file_write (open\(.*['\"]w\|\.write\(\|Path.*\.write_) File modification may overwrite existing content Yes
subprocess_exec (subprocess\.(run\|call\|Popen)\|os\.system) Executing system commands Yes
git_commit git\s+commit Creating git commits Yes
pip_install pip\s+install Installing packages may affect environment Yes

LOW Risk Rules

Rule Name Pattern Reason Reversible
file_read (open\(.*['\"]r\|\.read\(\|Path.*\.read_) Reading files Yes

SAFE Rules

Rule Name Pattern Reason Reversible
print_output print\( Output display only Yes

Rule evaluation

All regex patterns are evaluated with re.IGNORECASE. When multiple rules trigger, the assessor uses the highest risk level among them. The reversible flag is True only if all triggered rules are reversible.


Assessment Process

When assess() is called, the following process occurs:

1. If custom_assessor is set, delegate entirely to it and return
2. Extract code and action_type from the action dict
3. For each rule:
   a. If pattern is a string: regex search against code (case-insensitive)
   b. If pattern is a callable: call with the full action dict
   c. If match: add to triggered_rules list
4. If no rules triggered: return SAFE assessment
5. Determine highest risk level among triggered rules
6. Collect all reasons from triggered rules
7. Check if all triggered rules are reversible
8. Extract affected resources from code (files, URLs, tables)
9. Generate estimated impact text based on risk level
10. Generate recommendations for HIGH/CRITICAL levels
11. Return RiskAssessment

Resource Extraction

The assessor automatically extracts potentially affected resources from code:

Resource Type Detection Pattern Example
Files Quoted paths, Path() calls file:/tmp/data.csv
URLs http:// or https:// patterns url:https://api.example.com/data
Database tables FROM, INTO, UPDATE, DROP clauses table:users

Resources are prefixed with their type and limited to 10 total to prevent excessive output.

Impact Estimation

Risk Level Estimated Impact
CRITICAL "Potentially severe and irreversible impact"
HIGH "Significant impact, may require manual intervention to undo"
MEDIUM "Moderate impact, generally reversible"
LOW "Minor impact, easily reversible"
SAFE "No significant impact expected"

Examples

Basic Assessment

from rlm_code.rlm.approval import RiskAssessor, ToolRiskLevel

assessor = RiskAssessor()

# SAFE action
result = assessor.assess({"action": "code", "code": "print('hello')"})
assert result.level == ToolRiskLevel.SAFE

# MEDIUM action (file write)
result = assessor.assess({
    "action": "code",
    "code": "with open('/tmp/output.txt', 'w') as f: f.write('data')",
})
assert result.level == ToolRiskLevel.MEDIUM
assert "File modification may overwrite existing content" in result.reasons
assert result.reversible is True

# CRITICAL action (recursive delete)
result = assessor.assess({
    "action": "code",
    "code": "import subprocess; subprocess.run(['rm', '-rf', '/home/user/data'])",
})
assert result.level == ToolRiskLevel.CRITICAL
assert result.reversible is False
assert "file:/home/user/data" in result.affected_resources

Multiple Rules Triggered

# This code triggers both file_delete (HIGH) and subprocess_exec (MEDIUM)
result = assessor.assess({
    "action": "code",
    "code": """
import subprocess
import os
subprocess.run(['make', 'clean'])
os.remove('/tmp/build.log')
""",
})
# level = HIGH (highest of MEDIUM and HIGH)
# reasons = [
#     "File deletion may cause data loss",        # file_delete
#     "Executing system commands",                  # subprocess_exec
# ]
# reversible = False (file_delete is not reversible)

Custom Risk Rules

Adding Pattern-Based Rules

from rlm_code.rlm.approval import RiskAssessor
from rlm_code.rlm.approval.policy import RiskRule, ToolRiskLevel

assessor = RiskAssessor()

# Add a rule for detecting API key exposure
assessor.add_rule(RiskRule(
    name="api_key_exposure",
    pattern=r"(API_KEY|SECRET_KEY|PRIVATE_KEY|ACCESS_TOKEN)\s*=",
    risk_level=ToolRiskLevel.HIGH,
    reason="Potential hardcoded API key or secret detected",
    reversible=True,
))

# Add a rule for database modifications
assessor.add_rule(RiskRule(
    name="db_insert",
    pattern=r"INSERT\s+INTO",
    risk_level=ToolRiskLevel.MEDIUM,
    reason="Database insertion detected",
    reversible=True,
))

# Add a rule for Docker operations
assessor.add_rule(RiskRule(
    name="docker_run",
    pattern=r"docker\s+(run|exec|build)",
    risk_level=ToolRiskLevel.MEDIUM,
    reason="Docker container operation detected",
    reversible=True,
))

Adding Callable-Based Rules

For complex risk patterns that cannot be expressed as a single regex, use callable rules:

def check_large_file_operation(action: dict) -> bool:
    """Flag operations on files larger than 100MB."""
    code = action.get("code", "")
    # Check for known large file paths
    large_paths = ["/data/warehouse/", "/backup/", "/var/log/"]
    return any(path in code for path in large_paths)

assessor.add_rule(RiskRule(
    name="large_file_operation",
    pattern=check_large_file_operation,
    risk_level=ToolRiskLevel.HIGH,
    reason="Operation on potentially large file/directory",
    reversible=True,
))


def check_multiple_system_calls(action: dict) -> bool:
    """Flag code with more than 3 subprocess calls."""
    code = action.get("code", "")
    import re
    matches = re.findall(r"subprocess\.(run|call|Popen)|os\.system", code)
    return len(matches) > 3

assessor.add_rule(RiskRule(
    name="excessive_system_calls",
    pattern=check_multiple_system_calls,
    risk_level=ToolRiskLevel.HIGH,
    reason="Multiple system command executions detected (potential script injection)",
    reversible=True,
))

Removing Rules

# Remove a rule by name
removed = assessor.remove_rule("print_output")
# removed = True (rule existed and was removed)

removed = assessor.remove_rule("nonexistent")
# removed = False (no rule with that name)

Replacing the Entire Rule Set

from rlm_code.rlm.approval.policy import RiskRule, ToolRiskLevel

# Start from scratch with a minimal rule set
minimal_rules = [
    RiskRule(
        name="any_file_operation",
        pattern=r"(open|Path|os\.(remove|unlink)|shutil)",
        risk_level=ToolRiskLevel.MEDIUM,
        reason="File operation detected",
        reversible=True,
    ),
    RiskRule(
        name="any_network",
        pattern=r"(requests|urllib|httpx|socket)",
        risk_level=ToolRiskLevel.HIGH,
        reason="Network operation detected",
        reversible=False,
    ),
]

assessor = RiskAssessor(rules=minimal_rules)

Custom Assessor Function

For complete control over risk assessment, provide a custom assessor function that bypasses the rule engine entirely:

from rlm_code.rlm.approval import RiskAssessor, RiskAssessment, ToolRiskLevel

def my_custom_assessor(action: dict) -> RiskAssessment:
    """Domain-specific risk assessment for a financial application."""
    code = action.get("code", "")

    # Financial-specific checks
    if "transfer" in code.lower() or "withdraw" in code.lower():
        return RiskAssessment(
            level=ToolRiskLevel.CRITICAL,
            reasons=["Financial transaction detected"],
            affected_resources=["financial_system"],
            reversible=False,
            estimated_impact="Direct financial impact",
            recommendations=[
                "Verify transaction amount and recipient",
                "Check authorization level",
            ],
        )

    if "balance" in code.lower() or "account" in code.lower():
        return RiskAssessment(
            level=ToolRiskLevel.MEDIUM,
            reasons=["Account data access detected"],
            affected_resources=["account_database"],
            reversible=True,
            estimated_impact="Potential PII exposure",
            recommendations=["Ensure proper access logging"],
        )

    return RiskAssessment(
        level=ToolRiskLevel.SAFE,
        reasons=["No financial operations detected"],
        reversible=True,
    )

assessor = RiskAssessor(custom_assessor=my_custom_assessor)

Custom assessor responsibility

When using a custom assessor function, the default rules are completely bypassed. Your custom function is solely responsible for all risk evaluation. Ensure it covers all relevant risk categories for your application.