Skip to content

fix: address CodeRabbit findings on PRs #56 + #59 (wrappers + context/session/streaming)#64

Merged
rohitg00 merged 1 commit intomainfrom
fix/cr-wave2-wrappers-context
Apr 29, 2026
Merged

fix: address CodeRabbit findings on PRs #56 + #59 (wrappers + context/session/streaming)#64
rohitg00 merged 1 commit intomainfrom
fix/cr-wave2-wrappers-context

Conversation

@rohitg00
Copy link
Copy Markdown
Collaborator

@rohitg00 rohitg00 commented Apr 29, 2026

Summary

Addresses 32 CodeRabbit findings across 11 workers from merged PRs #56 (wrappers) and #59 (context/session/streaming). Re-registers the affected workers in Cargo.toml workspace members so they actually compile under cargo check/test.

Findings by severity

Severity PR #56 PR #59 Total
🔴 Critical 5 0 5
🟠 Major 9 14 23
🟡 Minor 0 1 1
Nitpicks 1 0 1 (dead-code loop)
Deferred 0 1 1 (stream::sse true-streaming)

PR #56 — wrappers (workers/{browser, code-agent, skillkit-bridge, mcp-client, orchestrator})

workers/browser/src/main.rs

  • 🔴 assert_no_ssrf (~73): match on url::Host and use Ipv4Addr/Ipv6Addr classification methods (is_loopback/is_private/is_link_local/is_unspecified + IPv6 segment masks for fc00::/7 & fe80::/10 + IPv4-mapped re-check). Domain branch keeps explicit blocklist plus *.localhost / ip6-localhost aliases.
  • 🟠 run_browser_script (~183): replace Command::output() with spawn() + kill_on_drop(true) + wait_with_output() so the python3 bridge is killed on timeout instead of leaked.
  • 🔴 navigate/click/read/screenshot/type (~390-430): re-validate session.current_url (and any post-navigation URL) before reuse so a redirect to an internal host is no longer a reusable SSRF primitive.
  • 🔴 trigger list (~587): add tool::browser_click / tool::browser_type HTTP triggers for route parity.
  • 🟠 create_session (~293): reserve the slot via atomic state::update (merge into _index.ids) before any I/O, then roll back the reservation if MAX_SESSIONS is exceeded; index helpers accept both legacy flat-array and new { ids: [...] } shapes.
  • Bonus: replace String::truncate calls with a truncate_to_char_boundary helper (UTF-8 safety).

workers/code-agent/src/main.rs

  • 🔴 run_sandboxed_js (~123): plumb a shared Instant deadline + Arc<AtomicBool> and install Runtime::set_interrupt_handler; the outer tokio::time::timeout flips the flag so QuickJS aborts within 10k bytecode ops instead of letting while(true){} keep burning the blocking thread.
  • 🔴 stdout truncation (~77): switch to truncate_to_char_boundary so multi-byte stdout never panics.

workers/skillkit-bridge/src/main.rs

  • 🟠 run_skillkit (~34): cmd.kill_on_drop(true) + spawn() + wait_with_output() so timed-out npx skillkit is reaped.
  • 🔴 skillkit::scan (~209): canonicalize WORKSPACE_ROOT and the joined target, reject absolute paths and any .. segments, and require the canonical target to start with the canonical workspace root.
  • Char-boundary safe truncation across parse_json_or_raw and the install handler.

workers/mcp-client/src/main.rs

  • 🟠 connect (~179): drain piped stderr in a tokio::spawn'd log task, opt into kill_on_drop(true), and reject SSE / unknown transports up front. Move state.connections.insert() until AFTER initialize + tools/list succeed so a failed handshake never leaves a dead entry behind.
  • Nitpick: drop the dead for kv in conn.pending.iter() loop in disconnect.

workers/orchestrator/src/main.rs

  • 🟠 execute_handler (~250): match on iii.trigger("task::spawn_workers", …) and roll plan back to planned + run to failed on failure (instead of unwrap_or(Value::Null) swallowing it).
  • 🟠 intervene_handler::resume (~385): await the trigger and revert plan/run to paused on failure.
  • 🟠 sanitize_idvalidate_id (~52): reject empty / >256 / non-[A-Za-z0-9_:.-] ids instead of silently filtering & truncating, matching task-decomposer/types.rs:53-75 semantics.

PR #59 — context / session / streaming

workers/context-cache/

  • 🟠 invalidate (~165): only cleared += 1 when state::delete returns Ok.
  • 🟠 stats_fn (~186): sanitize_id(agent_id_raw) to match the key used at recording time.
  • 🟠 types::sanitize_id: allow . and cap at 256 chars (TS-parity).

workers/context-manager/src/main.rs

  • 🟠 budget (~62, ~111): track effective_total after maxTokensPerHour clamping and use it for the response total / remaining / per-section allocations.
  • 🟠 trim_conversation (~139): only preserve the leading system message when tail_start > 0; truncate the system message itself if it alone exceeds budget; use full per-message accounting on tail candidates so the result is always ≤ maxTokens.
  • 🟠 build_prompt (~336, ~402): assemble the full system_message (with section headers and joined skills/memories), then truncate it again against total - conversation_tokens so the assembled prompt never exceeds budget.

workers/context-monitor/

  • 🟠 sanitize_tool_pairs (~75): build the result in original order, inserting synthetic tool stubs immediately after the originating assistant message (was: append at the end, breaking ordering).
  • 🟠 compress (~127): run sanitize_tool_pairs before the under-budget early return so callers always get a self-consistent message list.
  • 🟠 stats (~361): unwrap payload_body(&input) and register the matching POST api/context/stats HTTP trigger.
  • 🟡 score_relevance_decay (~91): clamp age to >= 0 and age_decay to [0,1] so future timestamps cannot push the section score above 25.

workers/session-lifecycle/src/main.rs

  • 🟠 transition::auto_recover (~186): include classification (default wake-up) in the recovery::recover payload.
  • 🟠 add_reaction (~258): allow optional agentId so global-scope reactions can be created (matching what transition already evaluates).

workers/session-replay/src/main.rs

  • 🟠 record (~37): propagate state::update failures instead of .ok() + now_ms fallback (which breaks atomic ordering and causes key collisions under load).
  • 🟠 main: register the missing replay::record POST api/replay HTTP trigger.

workers/streaming/src/types.rs

  • 🟠 chunk_markdown_aware (~22): track the byte offset of the last opening fence and search for the closing fence strictly after it; otherwise find_substring_after(&remaining, "\``", 3)` happily re-matches the same opener and splits between the opener and the first line of the code block.

Deferred

  • 🟠 stream::sse true incremental streaming requires a chunked HTTP response API the iii-sdk function-trigger model does not currently expose. Called out as a follow-up.

Test plan

  • cargo check --workspace
  • cargo test --workspace --release (all suites green)
  • cargo clippy -p agentos-skillkit-bridge -p agentos-browser -p agentos-code-agent -p agentos-mcp-client -p agentos-orchestrator -p agentos-context-cache -p agentos-context-manager -p agentos-context-monitor -p agentos-session-lifecycle -p agentos-session-replay -p agentos-streaming --all-targets -- -D warnings
  • Updated validate_id test in orchestrator covers reject-on-invalid behavior

Open in Devin Review

Summary by CodeRabbit

Release Notes

  • New Features

    • Added new worker modules for browser control, code execution, context management, session lifecycle, session replay, and streaming capabilities.
    • New HTTP endpoints for context stats and replay recording.
  • Bug Fixes

    • Fixed UTF-8 boundary truncation issues in subprocess output and text processing.
    • Improved subprocess process termination and timeout handling.
    • Enhanced message ordering in conversation sequences.
  • Security & Improvements

    • Expanded SSRF protection with structured URL validation and post-navigation re-validation.
    • Added stricter input validation for workspace and file paths.
    • Improved timeout handling with cooperative cancellation support.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 29, 2026

Warning

Rate limit exceeded

@rohitg00 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 9 minutes and 22 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: a6d99467-ad22-4e4d-b2c4-87c1c8d24617

📥 Commits

Reviewing files that changed from the base of the PR and between ef6c2ed and 0f44d89.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • Cargo.toml
  • workers/browser/src/main.rs
  • workers/code-agent/src/main.rs
  • workers/context-cache/src/main.rs
  • workers/context-cache/src/types.rs
  • workers/context-manager/src/main.rs
  • workers/context-monitor/src/main.rs
  • workers/context-monitor/src/types.rs
  • workers/mcp-client/src/main.rs
  • workers/orchestrator/src/main.rs
  • workers/session-lifecycle/src/main.rs
  • workers/session-replay/src/main.rs
  • workers/skillkit-bridge/src/main.rs
  • workers/streaming/src/types.rs
📝 Walkthrough

Walkthrough

This PR extends the Cargo workspace to include additional worker crates and implements security hardening (SSRF prevention, path validation, ID sanitization), error propagation improvements, UTF-8 boundary-safe text truncation, atomic session management, cancellation support in sandboxed execution, and process lifecycle management improvements across browser automation, code execution, session handling, context management, and MCP integration services.

Changes

Cohort / File(s) Summary
Workspace Configuration
Cargo.toml
Extends workspace members array to include 8 new/relocated worker crates: browser, code-agent, context-cache, context-manager, context-monitor, session-lifecycle, session-replay, and streaming.
Security & Input Validation
workers/browser/src/main.rs, workers/orchestrator/src/main.rs, workers/skillkit-bridge/src/main.rs
Browser: replaces string heuristics with structured url::Host matching for IPv4/IPv6/domains, expands coverage to loopback/private/link-local/unspecified/broadcast/multicast, and adds SSRF re-validation after navigations. Orchestrator: replaces ID "sanitization" with strict validate_id that rejects empty/oversized/invalid-character inputs. Skillkit: validates provided paths, rejects .. traversal, canonicalizes, and ensures resolution stays within workspace.
Process Management & Execution
workers/browser/src/main.rs, workers/code-agent/src/main.rs, workers/skillkit-bridge/src/main.rs
Browser: switches subprocess execution from .output() with timeout to spawn() + kill_on_drop + wait_with_output(). Code-agent: extends run_sandboxed_js to accept deadline and atomic cancellation flag; registers QuickJS interrupt handler. Skillkit: uses spawn() + wait_with_output() under timeout with kill_on_drop(true).
UTF-8 Boundary-Safe Truncation
workers/browser/src/main.rs, workers/code-agent/src/main.rs, workers/skillkit-bridge/src/main.rs, workers/context-cache/src/types.rs
Introduces truncate_to_char_boundary helper and applies it to stderr/stdout and extracted text to prevent panic on invalid UTF-8 boundaries. Context-cache: extends sanitize_id to permit . and truncate to 256 characters.
Session & State Management
workers/browser/src/main.rs, workers/session-lifecycle/src/main.rs, workers/session-replay/src/main.rs
Browser: updates session index storage to tolerate legacy array and new { ids: [...] } format; makes session cap enforcement atomic via state::update reservation with rollback. Session-lifecycle: makes agentId optional in add_reaction and switches scope selection based on ID validity; derives recovery classification from payload. Session-replay: treats replay-counter increment as required; tightens sequence number parsing and exposes handler via new HTTP endpoint.
Error Handling & Propagation
workers/orchestrator/src/main.rs, workers/context-cache/src/main.rs, workers/context-monitor/src/main.rs
Orchestrator: propagates task::spawn_workers trigger failures instead of masking with unwrap_or; updates intervene_handler "resume" to await and rollback on trigger failure. Context-cache: only counts cleared entries when state::delete succeeds. Context-monitor: now runs sanitize_tool_pairs before early return to ensure callers always receive sanitized messages.
Context Budget & Message Management
workers/context-manager/src/main.rs
Introduces effective_total when maxTokensPerHour reduces capacity; refines system-message preservation to avoid duplicates; adds truncation of over-budget system messages; truncates system prompt after headers if it exceeds remaining budget; clamps negative values consistently.
MCP Client & Connection Management
workers/mcp-client/src/main.rs
Pipes stderr with async drain task for stdio connections; validates SSE url and prevents fallthrough; removes implicit stdio handshake for unsupported transports; inserts connection state only after successful initialize and tools/list; simplifies disconnect cleanup.
Context Monitoring & Statistics
workers/context-monitor/src/main.rs, workers/context-monitor/src/types.rs
Rebuilds message sequence in-order during sanitize_tool_pairs to preserve assistant→tool ordering; adds HTTP endpoint for context::stats; bounds score_relevance_decay to clamp negative age and age_decay multiplier to [0.0, 1.0] range.
Code Fence Tracking
workers/streaming/src/types.rs
Records opening fence byte offset and searches for closing fence starting from last_open_fence_byte + 3 instead of fixed offset, preventing immediate re-match of the same opener in UTF-8-aware manner.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Browser Worker
    participant SSRF Validator
    participant State Storage
    participant Remote URL

    Client->>Browser Worker: Perform click action with URL
    Browser Worker->>SSRF Validator: Check initial URL
    SSRF Validator->>SSRF Validator: Parse as url::Host (IPv4/IPv6/Domain)
    SSRF Validator-->>Browser Worker: Valid or rejected
    
    Browser Worker->>Remote URL: Navigate/click
    Remote URL-->>Browser Worker: Redirect response
    Browser Worker->>SSRF Validator: Re-validate after redirect
    SSRF Validator->>SSRF Validator: Check loopback/private/link-local/unspecified
    SSRF Validator-->>Browser Worker: Final validation result
    
    Browser Worker->>State Storage: Check if session URL changed
    alt URL Changed or Reused
        Browser Worker->>SSRF Validator: Re-check stored current_url
        SSRF Validator-->>Browser Worker: Approval to proceed
    end
    
    Browser Worker-->>Client: Return result
Loading
sequenceDiagram
    participant Client
    participant Orchestrator
    participant State Storage
    participant Worker Task
    participant External API

    Client->>Orchestrator: Request task spawn
    Orchestrator->>State Storage: Update plan/run state
    Orchestrator->>Worker Task: Trigger spawn_workers
    
    alt Trigger Success
        Worker Task->>External API: Execute actions
        External API-->>Worker Task: Results
        Worker Task-->>Orchestrator: Success response
    else Trigger Failure
        Worker Task-->>Orchestrator: Error
        Orchestrator->>State Storage: Reset plan/run to prior state
        Orchestrator-->>Client: Return error
    end
    
    Client->>Orchestrator: Resume from paused
    Orchestrator->>Worker Task: Await spawn_workers
    
    alt Spawn Success
        Orchestrator-->>Client: Resumed
    else Spawn Failure
        Orchestrator->>State Storage: Rollback to paused
        Orchestrator-->>Client: Error response
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

A rabbit hops through SSRF gates,
Validates at every turn,
Sessions managed, truncation safe,
Processes rise, then softly burn! 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main objective: addressing CodeRabbit findings from two prior PRs across multiple workers with various fixes.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/cr-wave2-wrappers-context

Review rate limit: 0/3 reviews remaining, refill in 9 minutes and 22 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 7 additional findings in Devin Review.

Open in Devin Review

Comment on lines +116 to 125
"total": effective_total,
"used": used,
"remaining": (total - used).max(0),
"remaining": (effective_total - used).max(0),
"allocation": alloc,
"sections": {
"systemPrompt": budget_section(total, alloc.system_prompt, system_tokens),
"skills": budget_section(total, alloc.skills, skills_tokens),
"memories": budget_section(total, alloc.memories, memories_tokens),
"conversation": budget_section(total, alloc.conversation, conversation_tokens),
"systemPrompt": budget_section(effective_total, alloc.system_prompt, system_tokens),
"skills": budget_section(effective_total, alloc.skills, skills_tokens),
"memories": budget_section(effective_total, alloc.memories, memories_tokens),
"conversation": budget_section(effective_total, alloc.conversation, conversation_tokens),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Double-scaling of budget allocation ratios when maxTokensPerHour is active

The budget() function applies the rate-limiting reduction twice: once by scaling the allocation ratios (alloc.system_prompt * ratio) and again by passing the already-reduced effective_total to budget_section(). The old code passed the unscaled total to budget_section(), yielding total × (default_ratio × ratio) = effective × default_ratio — correct. The new code passes effective_total yielding effective × (default_ratio × ratio) = effective² × default_ratio / total, which is wrong by a factor of effective/total.

Worked example

With total = 200_000 and maxTokensPerHour = 100_000:

  • ratio = 0.5, alloc.system_prompt = 0.2 × 0.5 = 0.1
  • Old: budget_section(200_000, 0.1, …) → allocated = 20,000 ✓
  • New: budget_section(100_000, 0.1, …) → allocated = 10,000 ✗ (should be 20,000)

Every section's allocated budget is halved again, and the reported remaining is inflated relative to what is actually available.

Prompt for agents
The bug is in the `budget()` function in workers/context-manager/src/main.rs. The allocation ratios (`alloc`) are already scaled by `ratio = effective/total` at lines 69-75, but then `budget_section` is called with `effective_total` (which is already `effective = total * ratio`) instead of the original `total`. This double-applies the reduction.

Two correct fixes:

Option A (minimal change): Keep using `total` in `budget_section` calls (reverting lines 121-124 to pass `total` instead of `effective_total`) while keeping the other changes that report `effective_total` in the top-level `total` and `remaining` fields.

Option B (cleaner): Don't scale `alloc` at all — keep the default ratios — and use `effective_total` everywhere. This means removing the ratio multiplication at lines 70-74 and keeping the `budget_section(effective_total, ...)` calls as-is.

Either way, the invariant is: for each section, `allocated = effective_budget × default_ratio`.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +544 to +545
// Re-validate the stored URL before reusing it as a navigation target.
assert_no_ssrf(&session.current_url)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 SSRF check on stored current_url rejects about:blank on fresh sessions

The PR adds assert_no_ssrf(&session.current_url)? to browser_click, browser_type, browser_screenshot, and browser_read_page. A newly created session has current_url = "about:blank" (workers/browser/src/main.rs:385). Since about:blank has scheme about (not http/https), the SSRF check at line 76 rejects it with "blocked scheme: about". This is a regression: all four handlers now error on any session that hasn't called browser_navigate yet, with a confusing security error message.

Suggested change
// Re-validate the stored URL before reusing it as a navigation target.
assert_no_ssrf(&session.current_url)?;
// Re-validate the stored URL before reusing it as a navigation target.
if session.current_url != "about:blank" {
assert_no_ssrf(&session.current_url)?;
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
workers/mcp-client/src/main.rs (1)

148-150: ⚠️ Potential issue | 🟠 Major

Reserve the connection before async work, and make cleanup identity-aware.

Moving the insert until after the handshake reintroduces a TOCTOU race: two concurrent mcp::connect calls can both pass contains_key, and the stdout task removes by name only, so a reconnect with the same name can be deleted by the previous process's cleanup.

♻️ Suggested direction
- if state.connections.contains_key(&name) {
-     return Err(IIIError::Handler(format!("Connection '{name}' already exists")));
- }
+ // Reserve the name here, or keep a separate "connecting" set, so only one
+ // connect path can own the name at a time.
- state_for_reader.connections.remove(&conn_for_reader.name);
+ if state_for_reader
+     .connections
+     .get(&conn_for_reader.name)
+     .map(|current| current.id == conn_for_reader.id)
+     .unwrap_or(false)
+ {
+     state_for_reader.connections.remove(&conn_for_reader.name);
+ }

Also applies to: 199-215, 275-277

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/mcp-client/src/main.rs` around lines 148 - 150, Reserve the
connection entry in state.connections before doing async handshake to avoid
TOCTOU: insert a placeholder connection (marked as Pending) keyed by name at the
start of mcp::connect, then perform the async handshake; on success replace the
placeholder with the real Connection, on failure remove only the placeholder if
its identity matches. Add an identity field (e.g., a UUID or unique
connection_id) to the connection struct and use that identity in the stdout
cleanup task and any removal logic so cleanup only removes the exact connection
instance (compare connection_id), and apply the same pattern wherever
connections are inserted/removed (including the other connect/remove sites
referenced).
workers/context-manager/src/main.rs (1)

62-79: ⚠️ Potential issue | 🟠 Major

Budget is being reduced twice when maxTokensPerHour applies.

You both (1) reduce total to effective_total and (2) scale alloc ratios by effective/total. That double-shrinks section allocations.

Suggested fix
-    let mut alloc = DEFAULT_ALLOCATION;
+    let alloc = DEFAULT_ALLOCATION;
@@
-            let ratio = effective as f64 / total as f64;
-            alloc = BudgetAllocation {
-                system_prompt: alloc.system_prompt * ratio,
-                skills: alloc.skills * ratio,
-                memories: alloc.memories * ratio,
-                conversation: alloc.conversation * ratio,
-            };
             // Report the post-scale budget so callers don't see headroom that
             // is not actually usable.
             effective_total = effective;

Also applies to: 121-124

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/context-manager/src/main.rs` around lines 62 - 79, The code currently
applies maxTokensPerHour twice: it both reduces the global total
(effective_total) and then scales each BudgetAllocation field by
effective/total, causing double reduction; change the logic in the block where
config -> maxTokensPerHour is read (the variables effective_total, total, alloc,
BudgetAllocation) to either (A) set effective_total = effective and leave alloc
unchanged (so callers see reduced total but allocations keep their original
proportions), or (B) if you must scale alloc, do not also reduce effective_total
again — pick one approach and apply consistently; update the identical pattern
elsewhere in the file where maxTokensPerHour is enforced (the later block with
the same effective/total scaling) so allocations are only reduced once.
🧹 Nitpick comments (1)
workers/streaming/src/types.rs (1)

22-49: Add a regression case for non-leading code fences.

This fix is subtle, and the current tests still do not cover the case where regular text appears before the opening ``` inside the scan window. A focused regression case here would lock in the last_open_fence_byte behavior and keep the old opener re-match bug from slipping back in.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/streaming/src/types.rs` around lines 22 - 49, Tests are missing a
regression case for when text precedes an opening code fence inside the scan
window; add a focused unit test that constructs a window/remaining pair where
the window contains some regular chars followed by "```" (so window ->
window_chars/byte_cursor logic will set last_open_fence_byte non-zero) and
assert that find_substring_after(&remaining, "```", search_start) correctly
finds the closing fence and that in_code_fence toggling behavior (the code that
uses last_open_fence_byte and in_code_fence) does not re-match the same opener;
reference the symbols last_open_fence_byte, in_code_fence, window, remaining,
window_chars, byte_cursor, and find_substring_after when locating where to
exercise this behavior in the tests and make the assertion deterministic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@workers/browser/src/main.rs`:
- Around line 124-136: The domain branch in assert_no_ssrf only checks a small
denylist and allows domains that resolve to internal/private IPs; update
assert_no_ssrf so when matching url::Host::Domain(host) it performs DNS
resolution of host (e.g., getaddrinfo/lookup) and for each resolved IpAddr run
the same private/loopback/link-local/metadata checks used for the Ip branch, and
return Err(IIIError::Handler(...)) if any resolved address is disallowed; ensure
the error includes the host and preserve the existing denylist checks
(localhost, metadata.google.internal, *.localhost, ip6-localhost) while adding
the resolver+IpAddr checks.
- Around line 397-443: After successfully reserving the slot via iii.trigger
with function_id "state::update" (result held in updated_index), the code must
rollback that reservation if subsequent operations fail; wrap the
tokio::fs::write(&script_path, BRIDGE_SCRIPT).await and save_session(&iii,
&session).await calls so that on any error you call iii.trigger(TriggerRequest {
function_id: "state::set", payload:
json!({"scope":"browser_sessions","key":"_index","value":{"ids": rolled }}), ...
}).await (where rolled = ids.into_iter().filter(|id| id !=
&agent_id).collect::<Vec<String>>()) before returning the error; factor the
rollback into a small helper (e.g., rollback_reservation(iii, &agent_id, ids) or
inline the trigger call) and ensure you propagate the original error after
attempting the rollback.

In `@workers/mcp-client/src/main.rs`:
- Around line 181-184: The stderr/stdout drain tasks currently use
BufReader::lines() (reader.next_line()) which stops on invalid UTF-8 and can let
the pipe fill; replace the line-based reader with a bytes-based loop that
repeatedly reads into a byte buffer (e.g. an 8KB Vec<u8>) using
AsyncReadExt::read or read_buf on the stderr/stdout handle and then log using
lossily-decoded text (String::from_utf8_lossy) for server_name. Update the
tokio::spawn blocks that reference stderr/stdout and server_name (the reader
variable and its next_line() calls) to perform a continuous byte-read until EOF
or error, and ensure errors are logged but do not terminate the drain early so
the child process cannot stall.

In `@workers/orchestrator/src/main.rs`:
- Around line 266-301: The rollback is not idempotent because earlier
side-effect writes (the lifecycle_reactions writes and the workspace _meta write
performed before calling iii.trigger("task::spawn_workers")) remain committed on
spawn failure; update orchestrator::execute to either (A) move the
lifecycle_reactions and workspace _meta state_set calls to occur only after
spawn_result (the Ok branch of iii.trigger) succeeds, or (B) add explicit
cleanup in the Err branch where you currently reset plan/run (the Err(e) block)
to delete or revert those specific keys (the lifecycle_reactions entries and the
workspace _meta key) via state_set/remove so a retry won’t double-register
reactions; reference the existing symbols iii, plan, run, plan_id, state_set,
and the trigger("task::spawn_workers") call to locate where to move or what to
delete.

In `@workers/session-lifecycle/src/main.rs`:
- Around line 187-197: The code sets a `classification` from `reaction.payload`
and passes it to `recovery::recover` via fire_void, but `recovery::recover`
actually ignores that field and calls `recovery::classify` internally, so the
reaction-specific path and the "wake-up" fallback are ineffective; fix by either
updating `recovery::recover` to accept and validate an explicit classification
parameter or (preferred here) map the incoming `reaction.payload` to the
concrete recovery action names expected by `recovery::classify` and pass that
canonical action in the JSON sent by the existing fire_void call (i.e., replace
the current `classification` logic with translation logic that converts
`reaction.payload` values into the classifier's recognized tokens before
invoking `fire_void(..., "recovery::recover", json!({ "agentId": agent_id,
"classification": canonical_action }))`).

In `@workers/session-replay/src/main.rs`:
- Around line 316-321: The trigger exposes POST api/replay to the function
replay::record, but record() still reads sessionId, agentId, and action from the
top-level input instead of normalizing HTTP payloads the same way other POST
handlers do; update replay::record to check for and prefer input.body (e.g., if
input.body is Some(obj) use that object for sessionId/agentId/action) while
falling back to top-level fields so it accepts both direct and HTTP-wrapped
requests, ensuring the same normalization logic used by other POST handlers is
applied before validation/processing.

In `@workers/skillkit-bridge/src/main.rs`:
- Around line 240-277: The scan handler currently calls scan_dir which truncates
SKILL.md and .well-known contents using String::truncate(5_000), which can panic
if byte 5000 splits a UTF-8 multibyte char; update scan_dir to use the existing
truncate_to_char_boundary() helper (instead of String::truncate) when limiting
content length so truncation happens on valid UTF-8 boundaries (apply to places
in scan_dir that truncate file contents and any other places in the
skillkit::scan path handling that perform raw byte-index truncation).

---

Outside diff comments:
In `@workers/context-manager/src/main.rs`:
- Around line 62-79: The code currently applies maxTokensPerHour twice: it both
reduces the global total (effective_total) and then scales each BudgetAllocation
field by effective/total, causing double reduction; change the logic in the
block where config -> maxTokensPerHour is read (the variables effective_total,
total, alloc, BudgetAllocation) to either (A) set effective_total = effective
and leave alloc unchanged (so callers see reduced total but allocations keep
their original proportions), or (B) if you must scale alloc, do not also reduce
effective_total again — pick one approach and apply consistently; update the
identical pattern elsewhere in the file where maxTokensPerHour is enforced (the
later block with the same effective/total scaling) so allocations are only
reduced once.

In `@workers/mcp-client/src/main.rs`:
- Around line 148-150: Reserve the connection entry in state.connections before
doing async handshake to avoid TOCTOU: insert a placeholder connection (marked
as Pending) keyed by name at the start of mcp::connect, then perform the async
handshake; on success replace the placeholder with the real Connection, on
failure remove only the placeholder if its identity matches. Add an identity
field (e.g., a UUID or unique connection_id) to the connection struct and use
that identity in the stdout cleanup task and any removal logic so cleanup only
removes the exact connection instance (compare connection_id), and apply the
same pattern wherever connections are inserted/removed (including the other
connect/remove sites referenced).

---

Nitpick comments:
In `@workers/streaming/src/types.rs`:
- Around line 22-49: Tests are missing a regression case for when text precedes
an opening code fence inside the scan window; add a focused unit test that
constructs a window/remaining pair where the window contains some regular chars
followed by "```" (so window -> window_chars/byte_cursor logic will set
last_open_fence_byte non-zero) and assert that find_substring_after(&remaining,
"```", search_start) correctly finds the closing fence and that in_code_fence
toggling behavior (the code that uses last_open_fence_byte and in_code_fence)
does not re-match the same opener; reference the symbols last_open_fence_byte,
in_code_fence, window, remaining, window_chars, byte_cursor, and
find_substring_after when locating where to exercise this behavior in the tests
and make the assertion deterministic.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: f7e249a9-7900-4263-a408-5e5c29137a3e

📥 Commits

Reviewing files that changed from the base of the PR and between ab72977 and ef6c2ed.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • Cargo.toml
  • workers/browser/src/main.rs
  • workers/code-agent/src/main.rs
  • workers/context-cache/src/main.rs
  • workers/context-cache/src/types.rs
  • workers/context-manager/src/main.rs
  • workers/context-monitor/src/main.rs
  • workers/context-monitor/src/types.rs
  • workers/mcp-client/src/main.rs
  • workers/orchestrator/src/main.rs
  • workers/session-lifecycle/src/main.rs
  • workers/session-replay/src/main.rs
  • workers/skillkit-bridge/src/main.rs
  • workers/streaming/src/types.rs

Comment on lines +124 to +136
Some(url::Host::Domain(host)) => {
let lower = host.to_ascii_lowercase();
if matches!(
lower.as_str(),
"localhost" | "metadata.google.internal" | "metadata"
) {
return Err(IIIError::Handler(format!("blocked host: {host}")));
}
// Reject common loopback aliases that bypass IP parsing.
if lower.ends_with(".localhost") || lower == "ip6-localhost" {
return Err(IIIError::Handler(format!("blocked host: {host}")));
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Domain-based SSRF bypass is still possible via DNS resolution to private/internal IPs.

The domain branch only blocks a small denylist. A hostile domain that resolves to loopback/private/link-local/metadata IPs can still pass assert_no_ssrf and be fetched.

Suggested fix
-fn assert_no_ssrf(url_str: &str) -> Result<(), IIIError> {
+async fn assert_no_ssrf(url_str: &str) -> Result<(), IIIError> {
     let parsed = url::Url::parse(url_str).map_err(|e| IIIError::Handler(format!("invalid url: {e}")))?;
+    let port = parsed.port_or_known_default().unwrap_or(80);

     match parsed.host() {
         Some(url::Host::Ipv4(ip)) => { /* existing checks */ }
         Some(url::Host::Ipv6(ip)) => { /* existing checks */ }
         Some(url::Host::Domain(host)) => {
             let lower = host.to_ascii_lowercase();
             if matches!(lower.as_str(), "localhost" | "metadata.google.internal" | "metadata")
                 || lower.ends_with(".localhost")
                 || lower == "ip6-localhost"
             {
                 return Err(IIIError::Handler(format!("blocked host: {host}")));
             }
+            // Resolve and validate every returned address.
+            let resolved = tokio::net::lookup_host((host, port))
+                .await
+                .map_err(|e| IIIError::Handler(format!("dns lookup failed: {e}")))?;
+            for addr in resolved {
+                match addr.ip() {
+                    std::net::IpAddr::V4(v4) if v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified() || v4.is_multicast() || v4.is_broadcast() => {
+                        return Err(IIIError::Handler(format!("blocked resolved host: {v4}")));
+                    }
+                    std::net::IpAddr::V6(v6) if v6.is_loopback() || v6.is_unspecified() || v6.is_multicast() => {
+                        return Err(IIIError::Handler(format!("blocked resolved host: {v6}")));
+                    }
+                    _ => {}
+                }
+            }
         }
         None => return Err(IIIError::Handler("missing host".into())),
     }
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/browser/src/main.rs` around lines 124 - 136, The domain branch in
assert_no_ssrf only checks a small denylist and allows domains that resolve to
internal/private IPs; update assert_no_ssrf so when matching
url::Host::Domain(host) it performs DNS resolution of host (e.g.,
getaddrinfo/lookup) and for each resolved IpAddr run the same
private/loopback/link-local/metadata checks used for the Ip branch, and return
Err(IIIError::Handler(...)) if any resolved address is disallowed; ensure the
error includes the host and preserve the existing denylist checks (localhost,
metadata.google.internal, *.localhost, ip6-localhost) while adding the
resolver+IpAddr checks.

Comment on lines +397 to 443
let updated_index = iii
.trigger(TriggerRequest {
function_id: "state::update".into(),
payload: json!({
"scope": "browser_sessions",
"key": "_index",
"operations": [
{ "type": "merge", "path": "ids", "value": [agent_id.clone()] }
],
"upsert": { "ids": [agent_id.clone()] }
}),
action: None,
timeout_ms: None,
})
.await
.map_err(|e| IIIError::Handler(format!("reserve session slot: {e}")))?;
let ids: Vec<String> = updated_index
.get("ids")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
if ids.len() > MAX_SESSIONS {
// Roll the reservation back.
let rolled: Vec<String> =
ids.into_iter().filter(|id| id != &agent_id).collect();
let _ = iii
.trigger(TriggerRequest {
function_id: "state::set".into(),
payload: json!({
"scope": "browser_sessions",
"key": "_index",
"value": { "ids": rolled }
}),
action: None,
timeout_ms: None,
})
.await;
return Err(IIIError::Handler(format!(
"Max sessions ({MAX_SESSIONS}) reached"
)));
}

tokio::fs::write(&script_path, BRIDGE_SCRIPT)
.await
.map_err(|e| IIIError::Handler(format!("write script failed: {e}")))?;

save_session(&iii, &session).await?;
index.push(agent_id.clone());
set_session_index(&iii, index).await?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reserved session slot is not rolled back on post-reservation failures.

After reserving _index, failures in script write or save_session return early without rollback. This can leak slots and hit MAX_SESSIONS prematurely.

Suggested fix
-                tokio::fs::write(&script_path, BRIDGE_SCRIPT)
-                    .await
-                    .map_err(|e| IIIError::Handler(format!("write script failed: {e}")))?;
-
-                save_session(&iii, &session).await?;
+                if let Err(e) = tokio::fs::write(&script_path, BRIDGE_SCRIPT).await {
+                    let rolled: Vec<String> = ids.iter().filter(|id| *id != &agent_id).cloned().collect();
+                    let _ = iii.trigger(TriggerRequest {
+                        function_id: "state::set".into(),
+                        payload: json!({ "scope": "browser_sessions", "key": "_index", "value": { "ids": rolled } }),
+                        action: None,
+                        timeout_ms: None,
+                    }).await;
+                    return Err(IIIError::Handler(format!("write script failed: {e}")));
+                }
+
+                if let Err(e) = save_session(&iii, &session).await {
+                    let rolled: Vec<String> = ids.iter().filter(|id| *id != &agent_id).cloned().collect();
+                    let _ = iii.trigger(TriggerRequest {
+                        function_id: "state::set".into(),
+                        payload: json!({ "scope": "browser_sessions", "key": "_index", "value": { "ids": rolled } }),
+                        action: None,
+                        timeout_ms: None,
+                    }).await;
+                    return Err(e);
+                }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let updated_index = iii
.trigger(TriggerRequest {
function_id: "state::update".into(),
payload: json!({
"scope": "browser_sessions",
"key": "_index",
"operations": [
{ "type": "merge", "path": "ids", "value": [agent_id.clone()] }
],
"upsert": { "ids": [agent_id.clone()] }
}),
action: None,
timeout_ms: None,
})
.await
.map_err(|e| IIIError::Handler(format!("reserve session slot: {e}")))?;
let ids: Vec<String> = updated_index
.get("ids")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
if ids.len() > MAX_SESSIONS {
// Roll the reservation back.
let rolled: Vec<String> =
ids.into_iter().filter(|id| id != &agent_id).collect();
let _ = iii
.trigger(TriggerRequest {
function_id: "state::set".into(),
payload: json!({
"scope": "browser_sessions",
"key": "_index",
"value": { "ids": rolled }
}),
action: None,
timeout_ms: None,
})
.await;
return Err(IIIError::Handler(format!(
"Max sessions ({MAX_SESSIONS}) reached"
)));
}
tokio::fs::write(&script_path, BRIDGE_SCRIPT)
.await
.map_err(|e| IIIError::Handler(format!("write script failed: {e}")))?;
save_session(&iii, &session).await?;
index.push(agent_id.clone());
set_session_index(&iii, index).await?;
let updated_index = iii
.trigger(TriggerRequest {
function_id: "state::update".into(),
payload: json!({
"scope": "browser_sessions",
"key": "_index",
"operations": [
{ "type": "merge", "path": "ids", "value": [agent_id.clone()] }
],
"upsert": { "ids": [agent_id.clone()] }
}),
action: None,
timeout_ms: None,
})
.await
.map_err(|e| IIIError::Handler(format!("reserve session slot: {e}")))?;
let ids: Vec<String> = updated_index
.get("ids")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
if ids.len() > MAX_SESSIONS {
// Roll the reservation back.
let rolled: Vec<String> =
ids.into_iter().filter(|id| id != &agent_id).collect();
let _ = iii
.trigger(TriggerRequest {
function_id: "state::set".into(),
payload: json!({
"scope": "browser_sessions",
"key": "_index",
"value": { "ids": rolled }
}),
action: None,
timeout_ms: None,
})
.await;
return Err(IIIError::Handler(format!(
"Max sessions ({MAX_SESSIONS}) reached"
)));
}
if let Err(e) = tokio::fs::write(&script_path, BRIDGE_SCRIPT).await {
let rolled: Vec<String> = ids.iter().filter(|id| *id != &agent_id).cloned().collect();
let _ = iii.trigger(TriggerRequest {
function_id: "state::set".into(),
payload: json!({ "scope": "browser_sessions", "key": "_index", "value": { "ids": rolled } }),
action: None,
timeout_ms: None,
}).await;
return Err(IIIError::Handler(format!("write script failed: {e}")));
}
if let Err(e) = save_session(&iii, &session).await {
let rolled: Vec<String> = ids.iter().filter(|id| *id != &agent_id).cloned().collect();
let _ = iii.trigger(TriggerRequest {
function_id: "state::set".into(),
payload: json!({ "scope": "browser_sessions", "key": "_index", "value": { "ids": rolled } }),
action: None,
timeout_ms: None,
}).await;
return Err(e);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/browser/src/main.rs` around lines 397 - 443, After successfully
reserving the slot via iii.trigger with function_id "state::update" (result held
in updated_index), the code must rollback that reservation if subsequent
operations fail; wrap the tokio::fs::write(&script_path, BRIDGE_SCRIPT).await
and save_session(&iii, &session).await calls so that on any error you call
iii.trigger(TriggerRequest { function_id: "state::set", payload:
json!({"scope":"browser_sessions","key":"_index","value":{"ids": rolled }}), ...
}).await (where rolled = ids.into_iter().filter(|id| id !=
&agent_id).collect::<Vec<String>>()) before returning the error; factor the
rollback into a small helper (e.g., rollback_reservation(iii, &agent_id, ids) or
inline the trigger call) and ensure you propagate the original error after
attempting the rollback.

Comment on lines +181 to +184
// Pipe + drain stderr explicitly so a chatty MCP server can't fill the
// pipe buffer and stall the session by blocking the writer.
child_cmd.stderr(std::process::Stdio::piped());
child_cmd.kill_on_drop(true);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Look for any protocol/docs in this repo that guarantee UTF-8, line-delimited stderr
rg -n --hidden --glob '!target' --glob '!**/node_modules/**' \
  'stderr|UTF-8|utf8|line-delimited|BufReader::lines|mcp' \
  README.md docs workers . 2>/dev/null || true

Repository: iii-experimental/agentos

Length of output: 20300


🏁 Script executed:

sed -n '215,230p' workers/mcp-client/src/main.rs

Repository: iii-experimental/agentos

Length of output: 639


🏁 Script executed:

rg -n 'MCP|Model Context Protocol|specification|spec|protocol' \
  --type md --type toml --type rs \
  README.md ARCHITECTURE.md workers/mcp-client/ 2>/dev/null | head -40

Repository: iii-experimental/agentos

Length of output: 50


🏁 Script executed:

rg -A 5 'while let Ok\(Some\(line\)\)' workers/mcp-client/src/main.rs

Repository: iii-experimental/agentos

Length of output: 611


Use bytes-based stderr draining instead of UTF-8 line parsing.

BufReader::lines() will stop the drain task on invalid UTF-8 or read errors, allowing stderr to fill back up and stall the child process. The Model Context Protocol specification does not guarantee UTF-8 or line-delimited stderr output. Drain bytes directly and decode only for logging.

Affected code
        if let Some(stderr) = stderr {
            let server_name = conn.name.clone();
            tokio::spawn(async move {
                let mut reader = BufReader::new(stderr).lines();
                while let Ok(Some(line)) = reader.next_line().await {
                    tracing::debug!(server = %server_name, "mcp stderr: {line}");
                }
            });
        }

Also applies to: 196-206 (stdout reader).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/mcp-client/src/main.rs` around lines 181 - 184, The stderr/stdout
drain tasks currently use BufReader::lines() (reader.next_line()) which stops on
invalid UTF-8 and can let the pipe fill; replace the line-based reader with a
bytes-based loop that repeatedly reads into a byte buffer (e.g. an 8KB Vec<u8>)
using AsyncReadExt::read or read_buf on the stderr/stdout handle and then log
using lossily-decoded text (String::from_utf8_lossy) for server_name. Update the
tokio::spawn blocks that reference stderr/stdout and server_name (the reader
variable and its next_line() calls) to perform a continuous byte-read until EOF
or error, and ensure errors are logged but do not terminate the drain early so
the child process cannot stall.

Comment thread workers/orchestrator/src/main.rs
Comment thread workers/session-lifecycle/src/main.rs
Comment on lines +316 to +321
iii.register_trigger(RegisterTriggerInput {
trigger_type: "http".into(),
function_id: "replay::record".into(),
config: json!({ "http_method": "POST", "api_path": "api/replay" }),
metadata: None,
})?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize HTTP bodies before exposing replay::record.

replay::record still reads sessionId, agentId, and action from the top level, while the POST handlers elsewhere in this repo first unwrap input.body. As written, POST api/replay will reject normal HTTP requests unless record() normalizes the payload first.

💡 Suggested fix
 async fn record(iii: &III, input: Value) -> Result<Value, IIIError> {
-    let session_id = input["sessionId"]
+    let body = input.get("body").cloned().unwrap_or(input);
+    let session_id = body["sessionId"]
         .as_str()
         .filter(|s| !s.is_empty())
         .map(String::from);
-    let agent_id = input["agentId"]
+    let agent_id = body["agentId"]
         .as_str()
         .filter(|s| !s.is_empty())
         .map(String::from);
-    let action = input["action"]
+    let action = body["action"]
         .as_str()
         .filter(|s| !s.is_empty())
         .map(String::from);
@@
-    let data = input.get("data").cloned().unwrap_or_else(|| json!({}));
-    let duration_ms = input["durationMs"].as_i64().unwrap_or(0);
-    let iteration = input["iteration"].as_i64().unwrap_or(0);
+    let data = body.get("data").cloned().unwrap_or_else(|| json!({}));
+    let duration_ms = body["durationMs"].as_i64().unwrap_or(0);
+    let iteration = body["iteration"].as_i64().unwrap_or(0);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
iii.register_trigger(RegisterTriggerInput {
trigger_type: "http".into(),
function_id: "replay::record".into(),
config: json!({ "http_method": "POST", "api_path": "api/replay" }),
metadata: None,
})?;
async fn record(iii: &III, input: Value) -> Result<Value, IIIError> {
let body = input.get("body").cloned().unwrap_or(input);
let session_id = body["sessionId"]
.as_str()
.filter(|s| !s.is_empty())
.map(String::from);
let agent_id = body["agentId"]
.as_str()
.filter(|s| !s.is_empty())
.map(String::from);
let action = body["action"]
.as_str()
.filter(|s| !s.is_empty())
.map(String::from);
let data = body.get("data").cloned().unwrap_or_else(|| json!({}));
let duration_ms = body["durationMs"].as_i64().unwrap_or(0);
let iteration = body["iteration"].as_i64().unwrap_or(0);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/session-replay/src/main.rs` around lines 316 - 321, The trigger
exposes POST api/replay to the function replay::record, but record() still reads
sessionId, agentId, and action from the top-level input instead of normalizing
HTTP payloads the same way other POST handlers do; update replay::record to
check for and prefer input.body (e.g., if input.body is Some(obj) use that
object for sessionId/agentId/action) while falling back to top-level fields so
it accepts both direct and HTTP-wrapped requests, ensuring the same
normalization logic used by other POST handlers is applied before
validation/processing.

Comment on lines 240 to +277
let scan_path = input["path"].as_str().unwrap_or(".");
let root = workspace_root().join(scan_path);
// Reject obvious traversal attempts BEFORE touching the filesystem
// so a malicious caller can never canonicalize a target outside the
// workspace via symlinks they happen to control.
if scan_path.contains("..") {
return Err(IIIError::Handler("scan path contains '..'".into()));
}
let workspace = workspace_root();
let workspace_canonical = tokio::fs::canonicalize(&workspace)
.await
.map_err(|e| IIIError::Handler(format!("workspace canonicalize: {e}")))?;
let candidate = workspace.join(scan_path);
if Path::new(scan_path).is_absolute() {
return Err(IIIError::Handler("scan path must be relative".into()));
}
// Canonicalize the joined path; if the path doesn't exist yet, the
// join is treated as the requested root and rejected so we never
// walk into something we can't verify.
let target = match tokio::fs::canonicalize(&candidate).await {
Ok(p) => p,
Err(e) => {
return Err(IIIError::Handler(format!(
"scan path canonicalize: {e}"
)));
}
};
if !target.starts_with(&workspace_canonical) {
return Err(IIIError::Handler(
"scan path escapes workspace root".into(),
));
}
let mut found: Vec<Value> = Vec::new();
scan_dir(&root, 0, &mut found).await;
scan_dir(&target, 0, &mut found).await;
let count = found.len();
Ok::<Value, IIIError>(json!({
"found": found,
"count": count,
"root": root.to_string_lossy(),
"root": target.to_string_lossy(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

skillkit::scan can still panic on non-ASCII file contents.

This handler now hardens path resolution, but it still calls scan_dir(), which truncates SKILL.md and .well-known contents with String::truncate(5_000). If byte 5000 lands inside a multibyte character, the worker panics before returning this response. Please reuse truncate_to_char_boundary() there as well.

💡 Suggested fix
-                let mut snippet = content;
-                snippet.truncate(5_000);
+                let mut snippet = content;
+                truncate_to_char_boundary(&mut snippet, 5_000);
                 found.push(json!({
@@
-                        let mut snippet = content;
-                        snippet.truncate(5_000);
+                        let mut snippet = content;
+                        truncate_to_char_boundary(&mut snippet, 5_000);
                         found.push(json!({
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/skillkit-bridge/src/main.rs` around lines 240 - 277, The scan handler
currently calls scan_dir which truncates SKILL.md and .well-known contents using
String::truncate(5_000), which can panic if byte 5000 splits a UTF-8 multibyte
char; update scan_dir to use the existing truncate_to_char_boundary() helper
(instead of String::truncate) when limiting content length so truncation happens
on valid UTF-8 boundaries (apply to places in scan_dir that truncate file
contents and any other places in the skillkit::scan path handling that perform
raw byte-index truncation).

…/session/streaming)

Addresses 32 CodeRabbit findings across 11 workers from PRs #56 (wrappers) and
#59 (context/session/streaming). Re-registers the affected workers in the
workspace so they actually compile under cargo check/test.

PR #56 — wrappers
- browser: tighten SSRF to IP-classification (loopback/private/link-local/
  unique-local + IPv4-mapped IPv6); revalidate post-navigation URLs in
  navigate/click/read/screenshot/type to close the redirect bypass; spawn +
  kill_on_drop the python3 bridge so timeouts don't leak; add missing HTTP
  triggers for browser_click and browser_type; reserve session slots via
  atomic state::update so two concurrent create_session calls cannot exceed
  MAX_SESSIONS or clobber the index.
- code-agent: install rquickjs interrupt handler against a shared deadline +
  AtomicBool so wall-clock timeouts actually stop runaway JS instead of just
  dropping the join handle; replace String::truncate with a UTF-8 char-
  boundary safe helper.
- skillkit-bridge: switch to spawn + kill_on_drop so timed-out npx skillkit
  children are reaped; canonicalize scan_path against WORKSPACE_ROOT and
  reject absolute / `..` paths; use char-boundary safe truncation.
- mcp-client: drain piped stderr with a logging task so chatty servers can't
  fill the pipe; reject non-stdio transports up front and only insert into
  state.connections after the initialize + tools/list handshake; remove dead
  pending iter loop.
- orchestrator: propagate task::spawn_workers errors and roll plan/run state
  back on failure (execute + resume); replace silent sanitize_id with a
  validate_id that rejects invalid characters / oversized / empty ids,
  matching task-decomposer parity.

PR #59 — context/session/streaming
- context-cache: only count cleared on successful state::delete; sanitize the
  agentId on stats lookup so it matches the recording key; expand sanitize_id
  to allow `.` and cap at 256 chars.
- context-manager: report the post-scale effective budget (not the original
  total) when maxTokensPerHour clamps allocations; never duplicate the first
  system message when keep_last_n covers the whole conversation; account for
  per-message overhead while picking tail messages and truncate the preserved
  system prompt so trim never returns above maxTokens; re-truncate the
  assembled system_message in build_prompt so headers + skills + memories
  stay within the total budget.
- context-monitor: insert synthetic tool-result stubs in original message
  order (right after the originating assistant message); run
  sanitize_tool_pairs before the under-budget early return; register the HTTP
  trigger for context::stats and unwrap payload_body inside it; clamp future
  timestamps in score_relevance_decay so the score cannot exceed 25.
- session-lifecycle: include classification in auto_recover payload (default
  "wake-up"); allow add_reaction to omit agentId so global-scope reactions
  match what transition() already evaluates.
- session-replay: propagate state::update failures instead of falling back to
  now_ms (which breaks atomic ordering and causes key collisions); register
  the missing HTTP trigger for replay::record.
- streaming: track the byte offset of the opening fence and search for the
  closing fence after it so chunk_markdown_aware can no longer split between
  the opening ``` and the next character.

Skipped:
- stream::sse incremental flushing — requires a chunked HTTP response API
  the iii-sdk function-trigger model does not currently expose; called out as
  a follow-up rather than a wave-2 fix.
@rohitg00 rohitg00 force-pushed the fix/cr-wave2-wrappers-context branch from ef6c2ed to 0f44d89 Compare April 29, 2026 16:59
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 14 additional findings in Devin Review.

Open in Devin Review

Comment on lines +275 to +277
// Only insert AFTER initialize + tools/list succeed so a partial failure
// never leaves a dead handle behind for a future call to find.
state.connections.insert(name.clone(), conn.clone());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 MCP child process leaked when initialize/tools-list RPC fails after spawn

Moving state.connections.insert after the send_rpc calls (workers/mcp-client/src/main.rs:277) prevents dead entries on failure, but introduces a resource leak. If send_rpc("initialize", ...) or send_rpc("tools/list", ...) fails, the connect function returns an error without inserting conn into state.connections. However, the stdout reader task spawned at line 199 holds an Arc<McpConnection> clone that keeps the child process handle alive inside conn.child. Since the connection is never registered, mcp::disconnect cannot be used to clean it up. The child process continues running until its stdout closes naturally, which may never happen for a server waiting on stdin. The old code inserted the connection first, allowing explicit cleanup via mcp::disconnect.

Prompt for agents
In workers/mcp-client/src/main.rs, the connect function spawns a child process and stdout/stderr reader tasks before calling send_rpc for initialize and tools/list. If either RPC fails, the function returns Err without inserting into state.connections. The stdout reader task holds an Arc clone of the McpConnection containing the child handle, so the process is never killed.

To fix: when send_rpc fails after spawn, explicitly kill the child before returning the error. For example, add cleanup logic in the error path that acquires conn.child.lock() and calls child.kill(). Alternatively, insert the connection into state.connections before the RPCs (as the old code did) so that mcp::disconnect can clean it up, and instead mark failed connections with a status flag.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@rohitg00 rohitg00 merged commit 49a2940 into main Apr 29, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant