RLM Runner¶
Module
rlm_code.rlm.runner
The RLMRunner is the multi-paradigm orchestrator at the center of RLM Code. It manages the complete lifecycle of RLM execution: task dispatch, environment selection, action proposal, sandbox execution, reward calculation, memory management, benchmark sweeps, and trajectory persistence.
Classes¶
RLMRunner¶
The primary orchestrator. Supports three paradigms out of the box:
| Paradigm | Environment | Description |
|---|---|---|
| Pure RLM | pure_rlm | Paper-compliant context-as-variable with llm_query() |
| CodeAct | generic | Context included directly in the token window |
| Traditional | dspy | DSPy-aware with file operations, search, and verifier suites |
Constructor¶
class RLMRunner:
def __init__(
self,
llm_connector: Any,
execution_engine: Any,
run_dir: Path | None = None,
workdir: Path | None = None,
observability: RLMObservability | None = None,
event_bus: RLMEventBus | None = None,
reward_profile: RLMRewardProfile | dict[str, Any] | None = None,
benchmark_pack_paths: list[str | Path] | None = None,
max_parallelism: int = 4,
):
| Parameter | Type | Default | Description |
|---|---|---|---|
llm_connector | Any | required | LLM backend connector (must implement generate_response()) |
execution_engine | Any | required | Code execution sandbox |
run_dir | Path \| None | Auto-detected | Directory for JSONL trajectory files |
workdir | Path \| None | Path.cwd() | Project working directory |
observability | RLMObservability \| None | Auto-created | Observability sink manager |
event_bus | RLMEventBus \| None | Auto-created | Event bus for pub-sub |
reward_profile | RLMRewardProfile \| dict \| None | Default profile | Reward tuning knobs |
benchmark_pack_paths | list[str \| Path] \| None | None | External benchmark pack file paths |
max_parallelism | int | 4 | Maximum concurrent child tasks |
Run Directory Detection
The runner automatically detects the run directory, checking for .rlm_code/rlm/runs first, then falling back to legacy .dspy_code/rlm/runs paths.
Environment Registry¶
On construction, the runner initializes a dictionary of environments:
self.environments = {
"generic": GenericRLMEnvironment(...),
"rlm": GenericRLMEnvironment(...),
"dspy": DSPyCodingRLMEnvironment(...),
"dspy-coding": DSPyCodingRLMEnvironment(...),
"framework": DSPyCodingRLMEnvironment(...),
"pure_rlm": PureRLMEnvironment(...),
"pure-rlm": PureRLMEnvironment(...),
}
run_task()¶
The core execution method. Runs one RLM episode and persists the trajectory as JSONL.
def run_task(
self,
task: str,
max_steps: int = 4,
exec_timeout: int = 30,
environment: str = "generic",
sub_model: str | None = None,
sub_provider: str | None = None,
branch_width: int = 1,
framework: str | None = None,
max_depth: int = 2,
max_children_per_step: int = 4,
parallelism: int = 2,
time_budget_seconds: int | None = None,
) -> RLMRunResult:
| Parameter | Type | Default | Description |
|---|---|---|---|
task | str | required | Task description for the LLM |
max_steps | int | 4 | Maximum iterations before forced stop |
exec_timeout | int | 30 | Timeout in seconds per code execution |
environment | str | "generic" | Environment to use (see registry above) |
sub_model | str \| None | None | Override model for sub-LLM calls |
sub_provider | str \| None | None | Override provider for sub-LLM calls |
branch_width | int | 1 | Number of candidate actions per step (best-of-N) |
framework | str \| None | None | Framework adapter ID ("dspy", "pydantic_ai", "google_adk") |
max_depth | int | 2 | Maximum recursion depth for delegate actions |
max_children_per_step | int | 4 | Maximum child tasks per delegate action |
parallelism | int | 2 | Concurrent child execution limit |
time_budget_seconds | int \| None | None | Global time budget (kills execution if exceeded) |
Execution Loop:
- Build planner prompt from environment, memory, and trajectory
- Propose
branch_widthcandidate actions via LLM - Select highest-scoring candidate
- Execute action (code execution, file operation, delegate, or final)
- Calculate reward with
RLMRewardProfileand apply global scaling - Update memory (rolling window of last 8 notes)
- Persist step as JSONL event
- Emit runtime events for observability
- Repeat until
done=Trueormax_stepsreached
Cycle Guard
Recursive delegate tasks are protected by a cycle guard. If a child task has the same fingerprint (task + environment hash) as an ancestor, it is immediately skipped with reward -0.25.
Example:
result = runner.run_task(
task="Create a DSPy Signature for sentiment analysis",
environment="dspy",
max_steps=6,
exec_timeout=60,
branch_width=3, # Best-of-3 candidate selection
)
print(f"Run ID: {result.run_id}")
print(f"Completed: {result.completed}")
print(f"Steps: {result.steps}")
print(f"Total Reward: {result.total_reward}")
print(f"Answer: {result.final_response[:200]}")
run_benchmark()¶
Execute a benchmark preset and persist aggregate summary.
def run_benchmark(
self,
*,
preset: str = "dspy_quick",
limit: int | None = None,
environment: str | None = None,
framework: str | None = None,
max_steps: int | None = None,
exec_timeout: int | None = None,
branch_width: int = 1,
sub_model: str | None = None,
sub_provider: str | None = None,
pack_paths: list[str | Path] | None = None,
) -> RLMBenchmarkResult:
Iterates over all cases in the specified preset, running each through run_task() and collecting metrics. Results are persisted as JSON summaries in the benchmarks directory.
Example:
bench = runner.run_benchmark(
preset="dspy_quick",
limit=5,
environment="dspy",
max_steps=4,
)
print(f"Completed: {bench.completed_cases}/{bench.total_cases}")
print(f"Avg Reward: {bench.avg_reward}")
print(f"Avg Steps: {bench.avg_steps}")
compare_benchmarks()¶
Compare candidate benchmark against baseline with CI-style gate pass/fail.
def compare_benchmarks(
self,
*,
candidate: str = "latest",
baseline: str = "previous",
min_reward_delta: float = 0.0,
min_completion_delta: float = 0.0,
max_steps_increase: float = 0.0,
fail_on_completion_regression: bool = True,
) -> RLMBenchmarkComparison:
Computes deltas for reward, completion rate, and step count. Detects per-case regressions. Returns a passed boolean suitable for CI gates.
run_chat_turn()¶
Run one persistent chat turn backed by RLM episodes. Manages session state across turns with automatic memory compaction.
def run_chat_turn(
self,
message: str,
session_id: str = "default",
*,
environment: str = "generic",
max_steps: int = 4,
enable_compaction: bool = True,
compaction_limit: int = 6,
keep_recent: int = 4,
# ... additional parameters
) -> RLMRunResult:
doctor()¶
Run readiness checks for RLM execution.
Checks include:
- Run directory writability
- Sandbox runtime health
- Model connection status
- Framework adapter availability
- Environment-specific checks (workdir, pytest, DSPy imports)
Other Methods¶
| Method | Description |
|---|---|
list_runs(limit=10) | List recent RLM runs from persisted JSONL trajectories |
get_run_status(run_id) | Get summarized status for one run |
load_run_events(run_id) | Load raw JSONL events for one run |
visualize_run(run_id) | Build nested visualization payload |
supported_environments() | List available environment aliases |
supported_frameworks() | List available framework adapter IDs |
benchmark_presets() | List available benchmark preset metadata |
benchmark_pack_aliases() | List bundled benchmark pack aliases on disk |
list_benchmark_runs(limit=20) | List recent benchmark summaries |
get_chat_session(session_id) | Get chat session metadata |
reset_chat_session(session_id) | Delete persisted chat session |
observability_status() | Get configured observability sink statuses |
RLMRunResult¶
Dataclass returned by run_task().
@dataclass(slots=True)
class RLMRunResult:
run_id: str # Unique run identifier
run_path: Path # Path to JSONL trajectory file
completed: bool # Whether the task completed successfully
steps: int # Number of steps executed
total_reward: float # Cumulative reward across all steps
final_response: str # Final answer or synthesized response
started_at: str # ISO timestamp of run start
finished_at: str # ISO timestamp of run end
environment: str # Environment name used
task: str # Original task description
usage_summary: dict[str, int] | None # Token usage (total_calls, prompt_tokens, completion_tokens)
RLMBenchmarkResult¶
Dataclass returned by run_benchmark().
@dataclass(slots=True)
class RLMBenchmarkResult:
benchmark_id: str # Unique benchmark identifier
summary_path: Path # Path to JSON summary file
preset: str # Preset name used
started_at: str # ISO timestamp
finished_at: str # ISO timestamp
total_cases: int # Total benchmark cases
completed_cases: int # Cases that completed successfully
avg_reward: float # Average reward across cases
avg_steps: float # Average steps across cases
case_results: list[dict[str, Any]] # Per-case result dictionaries
Each entry in case_results contains:
| Field | Type | Description |
|---|---|---|
case_id | str | Unique case identifier |
description | str | Human-readable case description |
task | str | Task text |
environment | str | Environment used |
run_id | str | RLM run ID for this case |
completed | bool | Whether the case completed |
steps | int | Steps taken |
total_reward | float | Cumulative reward |
usage | dict | Token usage |
final_response | str | Final answer |
RLMBenchmarkComparison¶
Dataclass returned by compare_benchmarks().
@dataclass(slots=True)
class RLMBenchmarkComparison:
candidate_id: str # Candidate benchmark ID
baseline_id: str # Baseline benchmark ID
candidate_path: Path # Path to candidate JSON
baseline_path: Path # Path to baseline JSON
candidate_metrics: dict[str, float] # avg_reward, completion_rate, avg_steps
baseline_metrics: dict[str, float] # avg_reward, completion_rate, avg_steps
deltas: dict[str, float] # Metric deltas (candidate - baseline)
case_summary: dict[str, int] # common_cases, completion_regressions, reward_regressions
gates: dict[str, bool] # Gate pass/fail for each criterion
passed: bool # True if ALL gates passed
Event-Driven Architecture¶
The runner publishes events at every stage of execution through the RLMEventBus:
| Event | When Published |
|---|---|
run_start | Beginning of run_task() |
step_start | Before each action execution |
step_end | After each action execution, with reward |
run_end | End of run_task(), with final metrics |
run_cycle_guard | When a recursive task is blocked by cycle detection |
All events include run_id, depth, and parent_run_id for tracing recursive execution trees.
Reward Calculation¶
Every action result passes through:
- Environment reward -- computed by the environment based on execution outcome
- Global scaling --
reward_profile.apply_global_scale(reward)multiplies byglobal_scaleand clamps to[-1.0, 1.0] - Accumulation -- added to
total_rewardfor the run
See Environments for the full RLMRewardProfile specification.
Delegate Actions (Recursive Execution)¶
When the planner proposes a delegate or delegate_batch action, the runner:
- Checks depth against
max_depthguard - Resolves context references from the
LazyFileContextstore - Spawns child
run_task()calls (potentially in parallel) - Aggregates child results into a single
EnvironmentActionResult - Applies cycle detection via task fingerprinting
result = runner.run_task(
task="Decompose this large analysis into subtasks",
environment="dspy",
max_depth=3,
max_children_per_step=4,
parallelism=2,
time_budget_seconds=300,
)
Runtime Health Detection¶
The doctor() method performs comprehensive readiness checks:
checks = runner.doctor(environment="dspy")
for check in checks:
print(f"[{check.status}] {check.name}: {check.detail}")
if check.recommendation:
print(f" Recommendation: {check.recommendation}")
Output example:
[pass] rlm_run_dir: Run directory: /project/.rlm_code/rlm/runs
[pass] sandbox_runtime: local: Python sandbox available
[pass] model_connection: Connected model: gpt-4o
[pass] workdir_exists: Workdir exists: /project
[pass] pytest_cli: pytest available at /usr/bin/pytest
[pass] dspy_import: DSPy import check passed.