From f76311f9d69e4e7d9e133f4b99b93a9e23a336e9 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Fri, 3 Apr 2026 15:04:52 +0000 Subject: [PATCH] Prevent worker prompts from outrunning boot readiness Add a foundational worker_boot control plane and tool surface for reliable startup. The new registry tracks trust gates, ready-for-prompt handshakes, prompt delivery attempts, and shell misdelivery recovery so callers can coordinate worker boot above raw terminal transport. Constraint: Current main has no tmux-backed worker control API to extend directly Constraint: First slice must stay deterministic and fully testable in-process Rejected: Wire the first implementation straight to tmux panes | would couple transport details to unfinished state semantics Rejected: Ship parser helpers without control tools | would not enforce the ready-before-prompt contract end to end Confidence: high Scope-risk: moderate Reversibility: clean Directive: Treat WorkerObserve heuristics as a temporary transport adapter and replace them with typed runtime events before widening automation policy Tested: cargo test -p runtime worker_boot Tested: cargo test -p tools worker_tools Tested: cargo check -p runtime -p tools Not-tested: Real tmux/TTY trust prompts and live worker boot on an actual coding session Not-tested: Full cargo clippy -p runtime -p tools --all-targets -- -D warnings (fails on pre-existing warnings outside this slice) --- rust/crates/runtime/src/lib.rs | 5 + rust/crates/runtime/src/worker_boot.rs | 732 +++++++++++++++++++++++++ rust/crates/tools/src/lib.rs | 357 +++++++++++- 3 files changed, 1090 insertions(+), 4 deletions(-) create mode 100644 rust/crates/runtime/src/worker_boot.rs diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 420bf0f..1c01a3f 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -23,6 +23,7 @@ mod sse; pub mod task_registry; pub mod team_cron_registry; mod usage; +pub mod worker_boot; pub use bash::{execute_bash, BashCommandInput, BashCommandOutput}; pub use bootstrap::{BootstrapPhase, BootstrapPlan}; @@ -99,6 +100,10 @@ pub use session::{ SessionFork, }; pub use sse::{IncrementalSseParser, SseEvent}; +pub use worker_boot::{ + Worker, WorkerEvent, WorkerEventKind, WorkerFailure, WorkerFailureKind, WorkerReadySnapshot, + WorkerRegistry, WorkerStatus, +}; pub use usage::{ format_usd, pricing_for_model, ModelPricing, TokenUsage, UsageCostEstimate, UsageTracker, }; diff --git a/rust/crates/runtime/src/worker_boot.rs b/rust/crates/runtime/src/worker_boot.rs new file mode 100644 index 0000000..d276779 --- /dev/null +++ b/rust/crates/runtime/src/worker_boot.rs @@ -0,0 +1,732 @@ +//! In-memory worker-boot state machine and control registry. +//! +//! This provides a foundational control plane for reliable worker startup: +//! trust-gate detection, ready-for-prompt handshakes, and prompt-misdelivery +//! detection/recovery all live above raw terminal transport. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; + +fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerStatus { + Spawning, + TrustRequired, + ReadyForPrompt, + PromptAccepted, + Running, + Blocked, + Finished, + Failed, +} + +impl std::fmt::Display for WorkerStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Spawning => write!(f, "spawning"), + Self::TrustRequired => write!(f, "trust_required"), + Self::ReadyForPrompt => write!(f, "ready_for_prompt"), + Self::PromptAccepted => write!(f, "prompt_accepted"), + Self::Running => write!(f, "running"), + Self::Blocked => write!(f, "blocked"), + Self::Finished => write!(f, "finished"), + Self::Failed => write!(f, "failed"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerFailureKind { + TrustGate, + PromptDelivery, + Protocol, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerFailure { + pub kind: WorkerFailureKind, + pub message: String, + pub created_at: u64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerEventKind { + Spawning, + TrustRequired, + TrustResolved, + ReadyForPrompt, + PromptAccepted, + PromptMisdelivery, + PromptReplayArmed, + Running, + Restarted, + Finished, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerEvent { + pub seq: u64, + pub kind: WorkerEventKind, + pub status: WorkerStatus, + pub detail: Option, + pub timestamp: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Worker { + pub worker_id: String, + pub cwd: String, + pub status: WorkerStatus, + pub trust_auto_resolve: bool, + pub trust_gate_cleared: bool, + pub auto_recover_prompt_misdelivery: bool, + pub prompt_delivery_attempts: u32, + pub last_prompt: Option, + pub replay_prompt: Option, + pub last_error: Option, + pub created_at: u64, + pub updated_at: u64, + pub events: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct WorkerRegistry { + inner: Arc>, +} + +#[derive(Debug, Default)] +struct WorkerRegistryInner { + workers: HashMap, + counter: u64, +} + +impl WorkerRegistry { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn create( + &self, + cwd: &str, + trusted_roots: &[String], + auto_recover_prompt_misdelivery: bool, + ) -> Worker { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + inner.counter += 1; + let ts = now_secs(); + let worker_id = format!("worker_{:08x}_{}", ts, inner.counter); + let trust_auto_resolve = trusted_roots + .iter() + .any(|root| path_matches_allowlist(cwd, root)); + let mut worker = Worker { + worker_id: worker_id.clone(), + cwd: cwd.to_owned(), + status: WorkerStatus::Spawning, + trust_auto_resolve, + trust_gate_cleared: false, + auto_recover_prompt_misdelivery, + prompt_delivery_attempts: 0, + last_prompt: None, + replay_prompt: None, + last_error: None, + created_at: ts, + updated_at: ts, + events: Vec::new(), + }; + push_event( + &mut worker, + WorkerEventKind::Spawning, + WorkerStatus::Spawning, + Some("worker created".to_string()), + ); + inner.workers.insert(worker_id, worker.clone()); + worker + } + + #[must_use] + pub fn get(&self, worker_id: &str) -> Option { + let inner = self.inner.lock().expect("worker registry lock poisoned"); + inner.workers.get(worker_id).cloned() + } + + pub fn observe(&self, worker_id: &str, screen_text: &str) -> Result { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + let worker = inner + .workers + .get_mut(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + let lowered = screen_text.to_ascii_lowercase(); + + if !worker.trust_gate_cleared && detect_trust_prompt(&lowered) { + worker.status = WorkerStatus::TrustRequired; + worker.last_error = Some(WorkerFailure { + kind: WorkerFailureKind::TrustGate, + message: "worker boot blocked on trust prompt".to_string(), + created_at: now_secs(), + }); + push_event( + worker, + WorkerEventKind::TrustRequired, + WorkerStatus::TrustRequired, + Some("trust prompt detected".to_string()), + ); + + if worker.trust_auto_resolve { + worker.trust_gate_cleared = true; + worker.last_error = None; + worker.status = WorkerStatus::Spawning; + push_event( + worker, + WorkerEventKind::TrustResolved, + WorkerStatus::Spawning, + Some("allowlisted repo auto-resolved trust prompt".to_string()), + ); + } else { + return Ok(worker.clone()); + } + } + + if prompt_misdelivery_is_relevant(worker) + && detect_prompt_misdelivery(&lowered, worker.last_prompt.as_deref()) + { + let detail = prompt_preview(worker.last_prompt.as_deref().unwrap_or_default()); + worker.last_error = Some(WorkerFailure { + kind: WorkerFailureKind::PromptDelivery, + message: format!("worker prompt landed in shell instead of coding agent: {detail}"), + created_at: now_secs(), + }); + push_event( + worker, + WorkerEventKind::PromptMisdelivery, + WorkerStatus::Blocked, + Some("shell misdelivery detected".to_string()), + ); + if worker.auto_recover_prompt_misdelivery { + worker.replay_prompt = worker.last_prompt.clone(); + worker.status = WorkerStatus::ReadyForPrompt; + push_event( + worker, + WorkerEventKind::PromptReplayArmed, + WorkerStatus::ReadyForPrompt, + Some("prompt replay armed after shell misdelivery".to_string()), + ); + } else { + worker.status = WorkerStatus::Blocked; + } + return Ok(worker.clone()); + } + + if detect_running_cue(&lowered) + && matches!( + worker.status, + WorkerStatus::PromptAccepted | WorkerStatus::ReadyForPrompt + ) + { + worker.status = WorkerStatus::Running; + worker.last_error = None; + push_event( + worker, + WorkerEventKind::Running, + WorkerStatus::Running, + Some("worker accepted prompt and started running".to_string()), + ); + } + + if detect_ready_for_prompt(screen_text, &lowered) + && !matches!( + worker.status, + WorkerStatus::ReadyForPrompt | WorkerStatus::Running + ) + { + worker.status = WorkerStatus::ReadyForPrompt; + if matches!( + worker.last_error.as_ref().map(|failure| failure.kind), + Some(WorkerFailureKind::TrustGate) + ) { + worker.last_error = None; + } + push_event( + worker, + WorkerEventKind::ReadyForPrompt, + WorkerStatus::ReadyForPrompt, + Some("worker is ready for prompt delivery".to_string()), + ); + } + + Ok(worker.clone()) + } + + pub fn resolve_trust(&self, worker_id: &str) -> Result { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + let worker = inner + .workers + .get_mut(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + + if worker.status != WorkerStatus::TrustRequired { + return Err(format!( + "worker {worker_id} is not waiting on trust; current status: {}", + worker.status + )); + } + + worker.trust_gate_cleared = true; + worker.last_error = None; + worker.status = WorkerStatus::Spawning; + push_event( + worker, + WorkerEventKind::TrustResolved, + WorkerStatus::Spawning, + Some("trust prompt resolved manually".to_string()), + ); + Ok(worker.clone()) + } + + pub fn send_prompt(&self, worker_id: &str, prompt: Option<&str>) -> Result { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + let worker = inner + .workers + .get_mut(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + + if worker.status != WorkerStatus::ReadyForPrompt { + return Err(format!( + "worker {worker_id} is not ready for prompt delivery; current status: {}", + worker.status + )); + } + + let next_prompt = prompt + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_owned) + .or_else(|| worker.replay_prompt.clone()) + .ok_or_else(|| format!("worker {worker_id} has no prompt to send or replay"))?; + + worker.prompt_delivery_attempts += 1; + worker.last_prompt = Some(next_prompt.clone()); + worker.replay_prompt = None; + worker.last_error = None; + worker.status = WorkerStatus::PromptAccepted; + push_event( + worker, + WorkerEventKind::PromptAccepted, + WorkerStatus::PromptAccepted, + Some(format!( + "prompt accepted for delivery: {}", + prompt_preview(&next_prompt) + )), + ); + Ok(worker.clone()) + } + + pub fn await_ready(&self, worker_id: &str) -> Result { + let worker = self + .get(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + + Ok(WorkerReadySnapshot { + worker_id: worker.worker_id.clone(), + status: worker.status, + ready: worker.status == WorkerStatus::ReadyForPrompt, + blocked: matches!( + worker.status, + WorkerStatus::TrustRequired | WorkerStatus::Blocked + ), + replay_prompt_ready: worker.replay_prompt.is_some(), + last_error: worker.last_error.clone(), + }) + } + + pub fn restart(&self, worker_id: &str) -> Result { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + let worker = inner + .workers + .get_mut(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + worker.status = WorkerStatus::Spawning; + worker.trust_gate_cleared = false; + worker.last_prompt = None; + worker.replay_prompt = None; + worker.last_error = None; + worker.prompt_delivery_attempts = 0; + push_event( + worker, + WorkerEventKind::Restarted, + WorkerStatus::Spawning, + Some("worker restarted".to_string()), + ); + Ok(worker.clone()) + } + + pub fn terminate(&self, worker_id: &str) -> Result { + let mut inner = self.inner.lock().expect("worker registry lock poisoned"); + let worker = inner + .workers + .get_mut(worker_id) + .ok_or_else(|| format!("worker not found: {worker_id}"))?; + worker.status = WorkerStatus::Finished; + push_event( + worker, + WorkerEventKind::Finished, + WorkerStatus::Finished, + Some("worker terminated by control plane".to_string()), + ); + Ok(worker.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerReadySnapshot { + pub worker_id: String, + pub status: WorkerStatus, + pub ready: bool, + pub blocked: bool, + pub replay_prompt_ready: bool, + pub last_error: Option, +} + +fn prompt_misdelivery_is_relevant(worker: &Worker) -> bool { + matches!( + worker.status, + WorkerStatus::PromptAccepted | WorkerStatus::Running + ) && worker.last_prompt.is_some() +} + +fn push_event( + worker: &mut Worker, + kind: WorkerEventKind, + status: WorkerStatus, + detail: Option, +) { + let timestamp = now_secs(); + let seq = worker.events.len() as u64 + 1; + worker.updated_at = timestamp; + worker.events.push(WorkerEvent { + seq, + kind, + status, + detail, + timestamp, + }); +} + +fn path_matches_allowlist(cwd: &str, trusted_root: &str) -> bool { + let cwd = normalize_path(cwd); + let trusted_root = normalize_path(trusted_root); + cwd == trusted_root || cwd.starts_with(&trusted_root) +} + +fn normalize_path(path: &str) -> PathBuf { + std::fs::canonicalize(path).unwrap_or_else(|_| Path::new(path).to_path_buf()) +} + +fn detect_trust_prompt(lowered: &str) -> bool { + [ + "do you trust the files in this folder", + "trust the files in this folder", + "trust this folder", + "allow and continue", + "yes, proceed", + ] + .iter() + .any(|needle| lowered.contains(needle)) +} + +fn detect_ready_for_prompt(screen_text: &str, lowered: &str) -> bool { + if [ + "ready for input", + "ready for your input", + "ready for prompt", + "send a message", + ] + .iter() + .any(|needle| lowered.contains(needle)) + { + return true; + } + + let Some(last_non_empty) = screen_text + .lines() + .rev() + .find(|line| !line.trim().is_empty()) + else { + return false; + }; + let trimmed = last_non_empty.trim(); + if is_shell_prompt(trimmed) { + return false; + } + + trimmed == ">" + || trimmed == "›" + || trimmed == "❯" + || trimmed.starts_with("> ") + || trimmed.starts_with("› ") + || trimmed.starts_with("❯ ") + || trimmed.contains("│ >") + || trimmed.contains("│ ›") + || trimmed.contains("│ ❯") +} + +fn detect_running_cue(lowered: &str) -> bool { + [ + "thinking", + "working", + "running tests", + "inspecting", + "analyzing", + ] + .iter() + .any(|needle| lowered.contains(needle)) +} + +fn is_shell_prompt(trimmed: &str) -> bool { + trimmed.ends_with('$') + || trimmed.ends_with('%') + || trimmed.ends_with('#') + || trimmed.starts_with('$') + || trimmed.starts_with('%') + || trimmed.starts_with('#') +} + +fn detect_prompt_misdelivery(lowered: &str, prompt: Option<&str>) -> bool { + let Some(prompt) = prompt else { + return false; + }; + + let shell_error = [ + "command not found", + "syntax error near unexpected token", + "parse error near", + "no such file or directory", + "unknown command", + ] + .iter() + .any(|needle| lowered.contains(needle)); + + if !shell_error { + return false; + } + + let first_prompt_line = prompt + .lines() + .find(|line| !line.trim().is_empty()) + .map(|line| line.trim().to_ascii_lowercase()) + .unwrap_or_default(); + + first_prompt_line.is_empty() || lowered.contains(&first_prompt_line) +} + +fn prompt_preview(prompt: &str) -> String { + let trimmed = prompt.trim(); + if trimmed.chars().count() <= 48 { + return trimmed.to_string(); + } + let preview = trimmed.chars().take(48).collect::(); + format!("{}…", preview.trim_end()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn allowlisted_trust_prompt_auto_resolves_then_reaches_ready_state() { + let registry = WorkerRegistry::new(); + let worker = registry.create( + "/tmp/worktrees/repo-a", + &["/tmp/worktrees".to_string()], + true, + ); + + let after_trust = registry + .observe( + &worker.worker_id, + "Do you trust the files in this folder?\n1. Yes, proceed\n2. No", + ) + .expect("trust observe should succeed"); + assert_eq!(after_trust.status, WorkerStatus::Spawning); + assert!(after_trust.trust_gate_cleared); + assert!(after_trust + .events + .iter() + .any(|event| event.kind == WorkerEventKind::TrustRequired)); + assert!(after_trust + .events + .iter() + .any(|event| event.kind == WorkerEventKind::TrustResolved)); + + let ready = registry + .observe(&worker.worker_id, "Ready for your input\n>") + .expect("ready observe should succeed"); + assert_eq!(ready.status, WorkerStatus::ReadyForPrompt); + assert!(ready.last_error.is_none()); + } + + #[test] + fn trust_prompt_blocks_non_allowlisted_worker_until_resolved() { + let registry = WorkerRegistry::new(); + let worker = registry.create("/tmp/repo-b", &[], true); + + let blocked = registry + .observe( + &worker.worker_id, + "Do you trust the files in this folder?\n1. Yes, proceed\n2. No", + ) + .expect("trust observe should succeed"); + assert_eq!(blocked.status, WorkerStatus::TrustRequired); + assert_eq!( + blocked.last_error.expect("trust error should exist").kind, + WorkerFailureKind::TrustGate + ); + + let send_before_resolve = registry.send_prompt(&worker.worker_id, Some("ship it")); + assert!(send_before_resolve + .expect_err("prompt delivery should be gated") + .contains("not ready for prompt delivery")); + + let resolved = registry + .resolve_trust(&worker.worker_id) + .expect("manual trust resolution should succeed"); + assert_eq!(resolved.status, WorkerStatus::Spawning); + assert!(resolved.trust_gate_cleared); + } + + #[test] + fn ready_detection_ignores_plain_shell_prompts() { + assert!(!detect_ready_for_prompt("bellman@host %", "bellman@host %")); + assert!(!detect_ready_for_prompt("/tmp/repo $", "/tmp/repo $")); + assert!(detect_ready_for_prompt("│ >", "│ >")); + } + + #[test] + fn prompt_misdelivery_is_detected_and_replay_can_be_rearmed() { + let registry = WorkerRegistry::new(); + let worker = registry.create("/tmp/repo-c", &[], true); + registry + .observe(&worker.worker_id, "Ready for input\n>") + .expect("ready observe should succeed"); + + let accepted = registry + .send_prompt(&worker.worker_id, Some("Implement worker handshake")) + .expect("prompt send should succeed"); + assert_eq!(accepted.status, WorkerStatus::PromptAccepted); + assert_eq!(accepted.prompt_delivery_attempts, 1); + + let recovered = registry + .observe( + &worker.worker_id, + "% Implement worker handshake\nzsh: command not found: Implement", + ) + .expect("misdelivery observe should succeed"); + assert_eq!(recovered.status, WorkerStatus::ReadyForPrompt); + assert_eq!( + recovered + .last_error + .expect("misdelivery error should exist") + .kind, + WorkerFailureKind::PromptDelivery + ); + assert_eq!( + recovered.replay_prompt.as_deref(), + Some("Implement worker handshake") + ); + assert!(recovered + .events + .iter() + .any(|event| event.kind == WorkerEventKind::PromptMisdelivery)); + assert!(recovered + .events + .iter() + .any(|event| event.kind == WorkerEventKind::PromptReplayArmed)); + + let replayed = registry + .send_prompt(&worker.worker_id, None) + .expect("replay send should succeed"); + assert_eq!(replayed.status, WorkerStatus::PromptAccepted); + assert!(replayed.replay_prompt.is_none()); + assert_eq!(replayed.prompt_delivery_attempts, 2); + } + + #[test] + fn await_ready_surfaces_blocked_or_ready_worker_state() { + let registry = WorkerRegistry::new(); + let worker = registry.create("/tmp/repo-d", &[], false); + + let initial = registry + .await_ready(&worker.worker_id) + .expect("await should succeed"); + assert!(!initial.ready); + assert!(!initial.blocked); + + registry + .observe( + &worker.worker_id, + "Do you trust the files in this folder?\n1. Yes, proceed\n2. No", + ) + .expect("trust observe should succeed"); + let blocked = registry + .await_ready(&worker.worker_id) + .expect("await should succeed"); + assert!(!blocked.ready); + assert!(blocked.blocked); + + registry + .resolve_trust(&worker.worker_id) + .expect("manual trust resolution should succeed"); + registry + .observe(&worker.worker_id, "Ready for your input\n>") + .expect("ready observe should succeed"); + let ready = registry + .await_ready(&worker.worker_id) + .expect("await should succeed"); + assert!(ready.ready); + assert!(!ready.blocked); + assert!(ready.last_error.is_none()); + } + + #[test] + fn restart_and_terminate_reset_or_finish_worker() { + let registry = WorkerRegistry::new(); + let worker = registry.create("/tmp/repo-e", &[], true); + registry + .observe(&worker.worker_id, "Ready for input\n>") + .expect("ready observe should succeed"); + registry + .send_prompt(&worker.worker_id, Some("Run tests")) + .expect("prompt send should succeed"); + + let restarted = registry + .restart(&worker.worker_id) + .expect("restart should succeed"); + assert_eq!(restarted.status, WorkerStatus::Spawning); + assert_eq!(restarted.prompt_delivery_attempts, 0); + assert!(restarted.last_prompt.is_none()); + + let finished = registry + .terminate(&worker.worker_id) + .expect("terminate should succeed"); + assert_eq!(finished.status, WorkerStatus::Finished); + assert!(finished + .events + .iter() + .any(|event| event.kind == WorkerEventKind::Finished)); + } +} diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index e42b687..870a07c 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -18,6 +18,7 @@ use runtime::{ read_file, task_registry::TaskRegistry, team_cron_registry::{CronRegistry, TeamRegistry}, + worker_boot::{WorkerReadySnapshot, WorkerRegistry}, write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, PermissionPolicy, PromptCacheEvent, RuntimeError, Session, ToolError, ToolExecutor, @@ -56,6 +57,12 @@ fn global_task_registry() -> &'static TaskRegistry { REGISTRY.get_or_init(TaskRegistry::new) } +fn global_worker_registry() -> &'static WorkerRegistry { + use std::sync::OnceLock; + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(WorkerRegistry::new) +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct ToolManifestEntry { pub name: String, @@ -806,6 +813,117 @@ pub fn mvp_tool_specs() -> Vec { }), required_permission: PermissionMode::ReadOnly, }, + ToolSpec { + name: "WorkerCreate", + description: "Create a coding worker boot session with trust-gate and prompt-delivery guards.", + input_schema: json!({ + "type": "object", + "properties": { + "cwd": { "type": "string" }, + "trusted_roots": { + "type": "array", + "items": { "type": "string" } + }, + "auto_recover_prompt_misdelivery": { "type": "boolean" } + }, + "required": ["cwd"], + "additionalProperties": false + }), + required_permission: PermissionMode::DangerFullAccess, + }, + ToolSpec { + name: "WorkerGet", + description: "Fetch the current worker boot state, last error, and event history.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" } + }, + "required": ["worker_id"], + "additionalProperties": false + }), + required_permission: PermissionMode::ReadOnly, + }, + ToolSpec { + name: "WorkerObserve", + description: "Feed a terminal snapshot into worker boot detection to resolve trust gates, ready handshakes, and prompt misdelivery.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" }, + "screen_text": { "type": "string" } + }, + "required": ["worker_id", "screen_text"], + "additionalProperties": false + }), + required_permission: PermissionMode::ReadOnly, + }, + ToolSpec { + name: "WorkerResolveTrust", + description: "Resolve a detected trust prompt so worker boot can continue.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" } + }, + "required": ["worker_id"], + "additionalProperties": false + }), + required_permission: PermissionMode::DangerFullAccess, + }, + ToolSpec { + name: "WorkerAwaitReady", + description: "Return the current ready-handshake verdict for a coding worker.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" } + }, + "required": ["worker_id"], + "additionalProperties": false + }), + required_permission: PermissionMode::ReadOnly, + }, + ToolSpec { + name: "WorkerSendPrompt", + description: "Send a task prompt only after the worker reaches ready_for_prompt; can replay a recovered prompt.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" }, + "prompt": { "type": "string" } + }, + "required": ["worker_id"], + "additionalProperties": false + }), + required_permission: PermissionMode::DangerFullAccess, + }, + ToolSpec { + name: "WorkerRestart", + description: "Restart worker boot state after a failed or stale startup.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" } + }, + "required": ["worker_id"], + "additionalProperties": false + }), + required_permission: PermissionMode::DangerFullAccess, + }, + ToolSpec { + name: "WorkerTerminate", + description: "Terminate a worker and mark the lane finished from the control plane.", + input_schema: json!({ + "type": "object", + "properties": { + "worker_id": { "type": "string" } + }, + "required": ["worker_id"], + "additionalProperties": false + }), + required_permission: PermissionMode::DangerFullAccess, + }, ToolSpec { name: "TeamCreate", description: "Create a team of sub-agents for parallel task execution.", @@ -1059,6 +1177,18 @@ fn execute_tool_with_enforcer( "TaskStop" => from_value::(input).and_then(run_task_stop), "TaskUpdate" => from_value::(input).and_then(run_task_update), "TaskOutput" => from_value::(input).and_then(run_task_output), + "WorkerCreate" => from_value::(input).and_then(run_worker_create), + "WorkerGet" => from_value::(input).and_then(run_worker_get), + "WorkerObserve" => from_value::(input).and_then(run_worker_observe), + "WorkerResolveTrust" => { + from_value::(input).and_then(run_worker_resolve_trust) + } + "WorkerAwaitReady" => from_value::(input).and_then(run_worker_await_ready), + "WorkerSendPrompt" => { + from_value::(input).and_then(run_worker_send_prompt) + } + "WorkerRestart" => from_value::(input).and_then(run_worker_restart), + "WorkerTerminate" => from_value::(input).and_then(run_worker_terminate), "TeamCreate" => from_value::(input).and_then(run_team_create), "TeamDelete" => from_value::(input).and_then(run_team_delete), "CronCreate" => from_value::(input).and_then(run_cron_create), @@ -1232,6 +1362,60 @@ fn run_task_output(input: TaskIdInput) -> Result { } } +#[allow(clippy::needless_pass_by_value)] +fn run_worker_create(input: WorkerCreateInput) -> Result { + let worker = global_worker_registry().create( + &input.cwd, + &input.trusted_roots, + input.auto_recover_prompt_misdelivery, + ); + to_pretty_json(worker) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_get(input: WorkerIdInput) -> Result { + global_worker_registry().get(&input.worker_id).map_or_else( + || Err(format!("worker not found: {}", input.worker_id)), + to_pretty_json, + ) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_observe(input: WorkerObserveInput) -> Result { + let worker = global_worker_registry().observe(&input.worker_id, &input.screen_text)?; + to_pretty_json(worker) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_resolve_trust(input: WorkerIdInput) -> Result { + let worker = global_worker_registry().resolve_trust(&input.worker_id)?; + to_pretty_json(worker) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_await_ready(input: WorkerIdInput) -> Result { + let snapshot: WorkerReadySnapshot = global_worker_registry().await_ready(&input.worker_id)?; + to_pretty_json(snapshot) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_send_prompt(input: WorkerSendPromptInput) -> Result { + let worker = global_worker_registry().send_prompt(&input.worker_id, input.prompt.as_deref())?; + to_pretty_json(worker) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_restart(input: WorkerIdInput) -> Result { + let worker = global_worker_registry().restart(&input.worker_id)?; + to_pretty_json(worker) +} + +#[allow(clippy::needless_pass_by_value)] +fn run_worker_terminate(input: WorkerIdInput) -> Result { + let worker = global_worker_registry().terminate(&input.worker_id)?; + to_pretty_json(worker) +} + #[allow(clippy::needless_pass_by_value)] fn run_team_create(input: TeamCreateInput) -> Result { let task_ids: Vec = input @@ -1799,6 +1983,37 @@ struct TaskUpdateInput { message: String, } +#[derive(Debug, Deserialize)] +struct WorkerCreateInput { + cwd: String, + #[serde(default)] + trusted_roots: Vec, + #[serde(default = "default_auto_recover_prompt_misdelivery")] + auto_recover_prompt_misdelivery: bool, +} + +#[derive(Debug, Deserialize)] +struct WorkerIdInput { + worker_id: String, +} + +#[derive(Debug, Deserialize)] +struct WorkerObserveInput { + worker_id: String, + screen_text: String, +} + +#[derive(Debug, Deserialize)] +struct WorkerSendPromptInput { + worker_id: String, + #[serde(default)] + prompt: Option, +} + +const fn default_auto_recover_prompt_misdelivery() -> bool { + true +} + #[derive(Debug, Deserialize)] struct TeamCreateInput { name: String, @@ -4623,6 +4838,10 @@ mod tests { assert!(names.contains(&"StructuredOutput")); assert!(names.contains(&"REPL")); assert!(names.contains(&"PowerShell")); + assert!(names.contains(&"WorkerCreate")); + assert!(names.contains(&"WorkerObserve")); + assert!(names.contains(&"WorkerAwaitReady")); + assert!(names.contains(&"WorkerSendPrompt")); } #[test] @@ -4631,6 +4850,139 @@ mod tests { assert!(error.contains("unsupported tool")); } + #[test] + fn worker_tools_gate_prompt_delivery_until_ready_and_support_auto_trust() { + let created = execute_tool( + "WorkerCreate", + &json!({ + "cwd": "/tmp/worktree/repo", + "trusted_roots": ["/tmp/worktree"] + }), + ) + .expect("WorkerCreate should succeed"); + let created_output: serde_json::Value = serde_json::from_str(&created).expect("json"); + let worker_id = created_output["worker_id"] + .as_str() + .expect("worker id") + .to_string(); + assert_eq!(created_output["status"], "spawning"); + assert_eq!(created_output["trust_auto_resolve"], true); + + let gated = execute_tool( + "WorkerSendPrompt", + &json!({ + "worker_id": worker_id, + "prompt": "ship the change" + }), + ) + .expect_err("prompt delivery before ready should fail"); + assert!(gated.contains("not ready for prompt delivery")); + + let observed = execute_tool( + "WorkerObserve", + &json!({ + "worker_id": created_output["worker_id"], + "screen_text": "Do you trust the files in this folder?\n1. Yes, proceed\n2. No" + }), + ) + .expect("WorkerObserve should auto-resolve trust"); + let observed_output: serde_json::Value = serde_json::from_str(&observed).expect("json"); + assert_eq!(observed_output["status"], "spawning"); + assert_eq!(observed_output["trust_gate_cleared"], true); + + let ready = execute_tool( + "WorkerObserve", + &json!({ + "worker_id": created_output["worker_id"], + "screen_text": "Ready for your input\n>" + }), + ) + .expect("WorkerObserve should mark worker ready"); + let ready_output: serde_json::Value = serde_json::from_str(&ready).expect("json"); + assert_eq!(ready_output["status"], "ready_for_prompt"); + + let await_ready = execute_tool( + "WorkerAwaitReady", + &json!({ + "worker_id": created_output["worker_id"] + }), + ) + .expect("WorkerAwaitReady should succeed"); + let await_ready_output: serde_json::Value = + serde_json::from_str(&await_ready).expect("json"); + assert_eq!(await_ready_output["ready"], true); + + let accepted = execute_tool( + "WorkerSendPrompt", + &json!({ + "worker_id": created_output["worker_id"], + "prompt": "ship the change" + }), + ) + .expect("WorkerSendPrompt should succeed after ready"); + let accepted_output: serde_json::Value = serde_json::from_str(&accepted).expect("json"); + assert_eq!(accepted_output["status"], "prompt_accepted"); + assert_eq!(accepted_output["prompt_delivery_attempts"], 1); + } + + #[test] + fn worker_tools_detect_misdelivery_and_arm_prompt_replay() { + let created = execute_tool( + "WorkerCreate", + &json!({ + "cwd": "/tmp/repo/worker-misdelivery" + }), + ) + .expect("WorkerCreate should succeed"); + let created_output: serde_json::Value = serde_json::from_str(&created).expect("json"); + let worker_id = created_output["worker_id"] + .as_str() + .expect("worker id") + .to_string(); + + execute_tool( + "WorkerObserve", + &json!({ + "worker_id": worker_id, + "screen_text": "Ready for input\n>" + }), + ) + .expect("worker should become ready"); + + execute_tool( + "WorkerSendPrompt", + &json!({ + "worker_id": worker_id, + "prompt": "Investigate flaky boot" + }), + ) + .expect("prompt send should succeed"); + + let recovered = execute_tool( + "WorkerObserve", + &json!({ + "worker_id": worker_id, + "screen_text": "% Investigate flaky boot\nzsh: command not found: Investigate" + }), + ) + .expect("misdelivery observe should succeed"); + let recovered_output: serde_json::Value = serde_json::from_str(&recovered).expect("json"); + assert_eq!(recovered_output["status"], "ready_for_prompt"); + assert_eq!(recovered_output["last_error"]["kind"], "prompt_delivery"); + assert_eq!(recovered_output["replay_prompt"], "Investigate flaky boot"); + + let replayed = execute_tool( + "WorkerSendPrompt", + &json!({ + "worker_id": worker_id + }), + ) + .expect("WorkerSendPrompt should replay recovered prompt"); + let replayed_output: serde_json::Value = serde_json::from_str(&replayed).expect("json"); + assert_eq!(replayed_output["status"], "prompt_accepted"); + assert_eq!(replayed_output["prompt_delivery_attempts"], 2); + } + #[test] fn global_tool_registry_denies_blocked_tool_before_dispatch() { // given @@ -6326,10 +6678,7 @@ printf 'pwsh:%s' "$1" fs::write(&file, "content\n").expect("write test file"); let registry = read_only_registry(); - let result = registry.execute( - "read_file", - &json!({ "path": file.display().to_string() }), - ); + let result = registry.execute("read_file", &json!({ "path": file.display().to_string() })); assert!(result.is_ok(), "read_file should be allowed: {result:?}"); let _ = fs::remove_dir_all(root);