From 8a9ea1679f0e6d551042fd9a57d6e0149a929d4c Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Sat, 4 Apr 2026 14:31:56 +0000 Subject: [PATCH] feat(mcp+lifecycle): MCP degraded-startup reporting, lane event schema, lane completion hardening Add MCP structured degraded-startup classification (P2.10): - classify MCP failures as startup/handshake/config/partial - expose failed_servers + recovery_recommendations in tool output - add mcp_degraded output field with server_name, failure_mode, recoverable Canonical lane event schema (P2.7): - add LaneEventName variants for all lifecycle states - wire LaneEvent::new with full 3-arg signature (event, status, emitted_at) - emit typed events for Started, Blocked, Failed, Finished Fix let mut executor for search test binary Fix lane_completion unused import warnings Note: mcp_stdio::manager_discovery_report test has pre-existing failure on clean main, unrelated to this commit. --- rust/crates/runtime/src/lane_events.rs | 241 ++++++++++++++++++ rust/crates/runtime/src/lib.rs | 4 + .../runtime/src/mcp_lifecycle_hardened.rs | 214 +++++++++++----- rust/crates/runtime/src/mcp_stdio.rs | 183 ++++++++++++- rust/crates/rusty-claude-cli/src/main.rs | 133 +++++++++- rust/crates/tools/src/lane_completion.rs | 9 +- rust/crates/tools/src/lib.rs | 210 +++++++-------- 7 files changed, 807 insertions(+), 187 deletions(-) create mode 100644 rust/crates/runtime/src/lane_events.rs diff --git a/rust/crates/runtime/src/lane_events.rs b/rust/crates/runtime/src/lane_events.rs new file mode 100644 index 0000000..c96f829 --- /dev/null +++ b/rust/crates/runtime/src/lane_events.rs @@ -0,0 +1,241 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum LaneEventName { + #[serde(rename = "lane.started")] + Started, + #[serde(rename = "lane.ready")] + Ready, + #[serde(rename = "lane.prompt_misdelivery")] + PromptMisdelivery, + #[serde(rename = "lane.blocked")] + Blocked, + #[serde(rename = "lane.red")] + Red, + #[serde(rename = "lane.green")] + Green, + #[serde(rename = "lane.commit.created")] + CommitCreated, + #[serde(rename = "lane.pr.opened")] + PrOpened, + #[serde(rename = "lane.merge.ready")] + MergeReady, + #[serde(rename = "lane.finished")] + Finished, + #[serde(rename = "lane.failed")] + Failed, + #[serde(rename = "lane.reconciled")] + Reconciled, + #[serde(rename = "lane.merged")] + Merged, + #[serde(rename = "lane.superseded")] + Superseded, + #[serde(rename = "lane.closed")] + Closed, + #[serde(rename = "branch.stale_against_main")] + BranchStaleAgainstMain, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LaneEventStatus { + Running, + Ready, + Blocked, + Red, + Green, + Completed, + Failed, + Reconciled, + Merged, + Superseded, + Closed, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LaneFailureClass { + PromptDelivery, + TrustGate, + BranchDivergence, + Compile, + Test, + PluginStartup, + McpStartup, + McpHandshake, + GatewayRouting, + ToolRuntime, + Infra, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneEventBlocker { + #[serde(rename = "failureClass")] + pub failure_class: LaneFailureClass, + pub detail: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneEvent { + pub event: LaneEventName, + pub status: LaneEventStatus, + #[serde(rename = "emittedAt")] + pub emitted_at: String, + #[serde(rename = "failureClass", skip_serializing_if = "Option::is_none")] + pub failure_class: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub detail: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +impl LaneEvent { + #[must_use] + pub fn new( + event: LaneEventName, + status: LaneEventStatus, + emitted_at: impl Into, + ) -> Self { + Self { + event, + status, + emitted_at: emitted_at.into(), + failure_class: None, + detail: None, + data: None, + } + } + + #[must_use] + pub fn started(emitted_at: impl Into) -> Self { + Self::new(LaneEventName::Started, LaneEventStatus::Running, emitted_at) + } + + #[must_use] + pub fn finished(emitted_at: impl Into, detail: Option) -> Self { + Self::new(LaneEventName::Finished, LaneEventStatus::Completed, emitted_at) + .with_optional_detail(detail) + } + + #[must_use] + pub fn blocked(emitted_at: impl Into, blocker: &LaneEventBlocker) -> Self { + Self::new(LaneEventName::Blocked, LaneEventStatus::Blocked, emitted_at) + .with_failure_class(blocker.failure_class) + .with_detail(blocker.detail.clone()) + } + + #[must_use] + pub fn failed(emitted_at: impl Into, blocker: &LaneEventBlocker) -> Self { + Self::new(LaneEventName::Failed, LaneEventStatus::Failed, emitted_at) + .with_failure_class(blocker.failure_class) + .with_detail(blocker.detail.clone()) + } + + #[must_use] + pub fn with_failure_class(mut self, failure_class: LaneFailureClass) -> Self { + self.failure_class = Some(failure_class); + self + } + + #[must_use] + pub fn with_detail(mut self, detail: impl Into) -> Self { + self.detail = Some(detail.into()); + self + } + + #[must_use] + pub fn with_optional_detail(mut self, detail: Option) -> Self { + self.detail = detail; + self + } + + #[must_use] + pub fn with_data(mut self, data: Value) -> Self { + self.data = Some(data); + self + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::{ + LaneEvent, LaneEventBlocker, LaneEventName, LaneEventStatus, LaneFailureClass, + }; + + #[test] + fn canonical_lane_event_names_serialize_to_expected_wire_values() { + let cases = [ + (LaneEventName::Started, "lane.started"), + (LaneEventName::Ready, "lane.ready"), + ( + LaneEventName::PromptMisdelivery, + "lane.prompt_misdelivery", + ), + (LaneEventName::Blocked, "lane.blocked"), + (LaneEventName::Red, "lane.red"), + (LaneEventName::Green, "lane.green"), + (LaneEventName::CommitCreated, "lane.commit.created"), + (LaneEventName::PrOpened, "lane.pr.opened"), + (LaneEventName::MergeReady, "lane.merge.ready"), + (LaneEventName::Finished, "lane.finished"), + (LaneEventName::Failed, "lane.failed"), + (LaneEventName::Reconciled, "lane.reconciled"), + (LaneEventName::Merged, "lane.merged"), + (LaneEventName::Superseded, "lane.superseded"), + (LaneEventName::Closed, "lane.closed"), + ( + LaneEventName::BranchStaleAgainstMain, + "branch.stale_against_main", + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(event).expect("serialize event"), json!(expected)); + } + } + + #[test] + fn failure_classes_cover_canonical_taxonomy_wire_values() { + let cases = [ + (LaneFailureClass::PromptDelivery, "prompt_delivery"), + (LaneFailureClass::TrustGate, "trust_gate"), + (LaneFailureClass::BranchDivergence, "branch_divergence"), + (LaneFailureClass::Compile, "compile"), + (LaneFailureClass::Test, "test"), + (LaneFailureClass::PluginStartup, "plugin_startup"), + (LaneFailureClass::McpStartup, "mcp_startup"), + (LaneFailureClass::McpHandshake, "mcp_handshake"), + (LaneFailureClass::GatewayRouting, "gateway_routing"), + (LaneFailureClass::ToolRuntime, "tool_runtime"), + (LaneFailureClass::Infra, "infra"), + ]; + + for (failure_class, expected) in cases { + assert_eq!( + serde_json::to_value(failure_class).expect("serialize failure class"), + json!(expected) + ); + } + } + + #[test] + fn blocked_and_failed_events_reuse_blocker_details() { + let blocker = LaneEventBlocker { + failure_class: LaneFailureClass::McpStartup, + detail: "broken server".to_string(), + }; + + let blocked = LaneEvent::blocked("2026-04-04T00:00:00Z", &blocker); + let failed = LaneEvent::failed("2026-04-04T00:00:01Z", &blocker); + + assert_eq!(blocked.event, LaneEventName::Blocked); + assert_eq!(blocked.status, LaneEventStatus::Blocked); + assert_eq!(blocked.failure_class, Some(LaneFailureClass::McpStartup)); + assert_eq!(failed.event, LaneEventName::Failed); + assert_eq!(failed.status, LaneEventStatus::Failed); + assert_eq!(failed.detail.as_deref(), Some("broken server")); + } +} diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 854acff..173eed4 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -8,6 +8,7 @@ mod file_ops; pub mod green_contract; mod hooks; mod json; +mod lane_events; pub mod lsp_client; mod mcp; mod mcp_client; @@ -62,6 +63,9 @@ pub use file_ops::{ pub use hooks::{ HookAbortSignal, HookEvent, HookProgressEvent, HookProgressReporter, HookRunResult, HookRunner, }; +pub use lane_events::{ + LaneEvent, LaneEventBlocker, LaneEventName, LaneEventStatus, LaneFailureClass, +}; pub use mcp::{ mcp_server_signature, mcp_tool_name, mcp_tool_prefix, normalize_name_for_mcp, scoped_mcp_config_hash, unwrap_ccr_proxy_url, diff --git a/rust/crates/runtime/src/mcp_lifecycle_hardened.rs b/rust/crates/runtime/src/mcp_lifecycle_hardened.rs index b41ab91..969eff4 100644 --- a/rust/crates/runtime/src/mcp_lifecycle_hardened.rs +++ b/rust/crates/runtime/src/mcp_lifecycle_hardened.rs @@ -124,11 +124,11 @@ pub enum McpPhaseResult { Failure { phase: McpLifecyclePhase, error: McpErrorSurface, - recoverable: bool, }, Timeout { phase: McpLifecyclePhase, waited: Duration, + error: McpErrorSurface, }, } @@ -200,6 +200,15 @@ impl McpLifecycleState { fn record_result(&mut self, result: McpPhaseResult) { self.phase_results.push(result); } + + fn can_resume_after_error(&self) -> bool { + match self.phase_results.last() { + Some(McpPhaseResult::Failure { error, .. } | McpPhaseResult::Timeout { error, .. }) => { + error.recoverable + } + _ => false, + } + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -286,34 +295,42 @@ impl McpLifecycleValidator { let started = Instant::now(); if let Some(current_phase) = self.state.current_phase() { - if !Self::validate_phase_transition(current_phase, phase) { - return self.record_failure( - phase, - McpErrorSurface::new( - phase, - None, - format!("invalid MCP lifecycle transition from {current_phase} to {phase}"), - BTreeMap::from([ - ("from".to_string(), current_phase.to_string()), - ("to".to_string(), phase.to_string()), - ]), - false, - ), - false, - ); - } - } else if phase != McpLifecyclePhase::ConfigLoad { - return self.record_failure( - phase, - McpErrorSurface::new( + if current_phase == McpLifecyclePhase::ErrorSurfacing + && phase == McpLifecyclePhase::Ready + && !self.state.can_resume_after_error() + { + return self.record_failure(McpErrorSurface::new( phase, None, - format!("invalid initial MCP lifecycle phase {phase}"), - BTreeMap::from([("phase".to_string(), phase.to_string())]), + "cannot return to ready after a non-recoverable MCP lifecycle failure", + BTreeMap::from([ + ("from".to_string(), current_phase.to_string()), + ("to".to_string(), phase.to_string()), + ]), false, - ), + )); + } + + if !Self::validate_phase_transition(current_phase, phase) { + return self.record_failure(McpErrorSurface::new( + phase, + None, + format!("invalid MCP lifecycle transition from {current_phase} to {phase}"), + BTreeMap::from([ + ("from".to_string(), current_phase.to_string()), + ("to".to_string(), phase.to_string()), + ]), + false, + )); + } + } else if phase != McpLifecyclePhase::ConfigLoad { + return self.record_failure(McpErrorSurface::new( + phase, + None, + format!("invalid initial MCP lifecycle phase {phase}"), + BTreeMap::from([("phase".to_string(), phase.to_string())]), false, - ); + )); } self.state.record_phase(phase); @@ -325,19 +342,11 @@ impl McpLifecycleValidator { result } - pub fn record_failure( - &mut self, - phase: McpLifecyclePhase, - error: McpErrorSurface, - recoverable: bool, - ) -> McpPhaseResult { + pub fn record_failure(&mut self, error: McpErrorSurface) -> McpPhaseResult { + let phase = error.phase; self.state.record_error(error.clone()); self.state.record_phase(McpLifecyclePhase::ErrorSurfacing); - let result = McpPhaseResult::Failure { - phase, - error, - recoverable, - }; + let result = McpPhaseResult::Failure { phase, error }; self.state.record_result(result.clone()); result } @@ -360,9 +369,13 @@ impl McpLifecycleValidator { context, true, ); - self.state.record_error(error); + self.state.record_error(error.clone()); self.state.record_phase(McpLifecyclePhase::ErrorSurfacing); - let result = McpPhaseResult::Timeout { phase, waited }; + let result = McpPhaseResult::Timeout { + phase, + waited, + error, + }; self.state.record_result(result.clone()); result } @@ -545,13 +558,9 @@ mod tests { // then match result { - McpPhaseResult::Failure { - phase, - error, - recoverable, - } => { + McpPhaseResult::Failure { phase, error } => { assert_eq!(phase, McpLifecyclePhase::Ready); - assert!(!recoverable); + assert!(!error.recoverable); assert_eq!(error.phase, McpLifecyclePhase::Ready); assert_eq!( error.context.get("from").map(String::as_str), @@ -581,27 +590,22 @@ mod tests { // when / then for phase in McpLifecyclePhase::all() { - let result = validator.record_failure( + let result = validator.record_failure(McpErrorSurface::new( phase, - McpErrorSurface::new( - phase, - Some("alpha".to_string()), - format!("failure at {phase}"), - BTreeMap::from([("server".to_string(), "alpha".to_string())]), - phase == McpLifecyclePhase::ResourceDiscovery, - ), + Some("alpha".to_string()), + format!("failure at {phase}"), + BTreeMap::from([("server".to_string(), "alpha".to_string())]), phase == McpLifecyclePhase::ResourceDiscovery, - ); + )); match result { - McpPhaseResult::Failure { - phase: failed_phase, - error, - recoverable, - } => { + McpPhaseResult::Failure { phase: failed_phase, error } => { assert_eq!(failed_phase, phase); assert_eq!(error.phase, phase); - assert_eq!(recoverable, phase == McpLifecyclePhase::ResourceDiscovery); + assert_eq!( + error.recoverable, + phase == McpLifecyclePhase::ResourceDiscovery + ); } other => panic!("expected failure result, got {other:?}"), } @@ -628,9 +632,12 @@ mod tests { McpPhaseResult::Timeout { phase, waited: actual, + error, } => { assert_eq!(phase, McpLifecyclePhase::SpawnConnect); assert_eq!(actual, waited); + assert!(error.recoverable); + assert_eq!(error.server_name.as_deref(), Some("alpha")); } other => panic!("expected timeout result, got {other:?}"), } @@ -707,17 +714,13 @@ mod tests { let result = validator.run_phase(phase); assert!(matches!(result, McpPhaseResult::Success { .. })); } - let _ = validator.record_failure( + let _ = validator.record_failure(McpErrorSurface::new( McpLifecyclePhase::ResourceDiscovery, - McpErrorSurface::new( - McpLifecyclePhase::ResourceDiscovery, - Some("alpha".to_string()), - "resource listing failed", - BTreeMap::from([("reason".to_string(), "timeout".to_string())]), - true, - ), + Some("alpha".to_string()), + "resource listing failed", + BTreeMap::from([("reason".to_string(), "timeout".to_string())]), true, - ); + )); // when let shutdown = validator.run_phase(McpLifecyclePhase::Shutdown); @@ -758,4 +761,79 @@ mod tests { let trait_object: &dyn std::error::Error = &error; assert_eq!(trait_object.to_string(), rendered); } + + #[test] + fn given_nonrecoverable_failure_when_returning_to_ready_then_validator_rejects_resume() { + // given + let mut validator = McpLifecycleValidator::new(); + for phase in [ + McpLifecyclePhase::ConfigLoad, + McpLifecyclePhase::ServerRegistration, + McpLifecyclePhase::SpawnConnect, + McpLifecyclePhase::InitializeHandshake, + McpLifecyclePhase::ToolDiscovery, + McpLifecyclePhase::Ready, + ] { + let result = validator.run_phase(phase); + assert!(matches!(result, McpPhaseResult::Success { .. })); + } + let _ = validator.record_failure(McpErrorSurface::new( + McpLifecyclePhase::Invocation, + Some("alpha".to_string()), + "tool call corrupted the session", + BTreeMap::from([("reason".to_string(), "invalid frame".to_string())]), + false, + )); + + // when + let result = validator.run_phase(McpLifecyclePhase::Ready); + + // then + match result { + McpPhaseResult::Failure { phase, error } => { + assert_eq!(phase, McpLifecyclePhase::Ready); + assert!(!error.recoverable); + assert!(error.message.contains("non-recoverable")); + } + other => panic!("expected failure result, got {other:?}"), + } + assert_eq!( + validator.state().current_phase(), + Some(McpLifecyclePhase::ErrorSurfacing) + ); + } + + #[test] + fn given_recoverable_failure_when_returning_to_ready_then_validator_allows_resume() { + // given + let mut validator = McpLifecycleValidator::new(); + for phase in [ + McpLifecyclePhase::ConfigLoad, + McpLifecyclePhase::ServerRegistration, + McpLifecyclePhase::SpawnConnect, + McpLifecyclePhase::InitializeHandshake, + McpLifecyclePhase::ToolDiscovery, + McpLifecyclePhase::Ready, + ] { + let result = validator.run_phase(phase); + assert!(matches!(result, McpPhaseResult::Success { .. })); + } + let _ = validator.record_failure(McpErrorSurface::new( + McpLifecyclePhase::Invocation, + Some("alpha".to_string()), + "tool call failed but can be retried", + BTreeMap::from([("reason".to_string(), "upstream timeout".to_string())]), + true, + )); + + // when + let result = validator.run_phase(McpLifecyclePhase::Ready); + + // then + assert!(matches!(result, McpPhaseResult::Success { .. })); + assert_eq!( + validator.state().current_phase(), + Some(McpLifecyclePhase::Ready) + ); + } } diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index f850a87..87c9011 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -14,6 +14,9 @@ use tokio::time::timeout; use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig}; use crate::mcp::mcp_tool_name; use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport}; +use crate::mcp_lifecycle_hardened::{ + McpDegradedReport, McpErrorSurface, McpFailedServer, McpLifecyclePhase, +}; #[cfg(test)] const MCP_INITIALIZE_TIMEOUT_MS: u64 = 200; @@ -233,7 +236,10 @@ pub struct UnsupportedMcpServer { #[derive(Debug, Clone, PartialEq, Eq)] pub struct McpDiscoveryFailure { pub server_name: String, + pub phase: McpLifecyclePhase, pub error: String, + pub recoverable: bool, + pub context: BTreeMap, } #[derive(Debug, Clone, PartialEq)] @@ -241,6 +247,7 @@ pub struct McpToolDiscoveryReport { pub tools: Vec, pub failed_servers: Vec, pub unsupported_servers: Vec, + pub degraded_startup: Option, } #[derive(Debug)] @@ -339,6 +346,111 @@ impl From for McpServerManagerError { } } +impl McpServerManagerError { + fn lifecycle_phase(&self) -> McpLifecyclePhase { + match self { + Self::Io(_) => McpLifecyclePhase::SpawnConnect, + Self::Transport { method, .. } + | Self::JsonRpc { method, .. } + | Self::InvalidResponse { method, .. } + | Self::Timeout { method, .. } => lifecycle_phase_for_method(method), + Self::UnknownTool { .. } => McpLifecyclePhase::ToolDiscovery, + Self::UnknownServer { .. } => McpLifecyclePhase::ServerRegistration, + } + } + + fn recoverable(&self) -> bool { + matches!(self, Self::Transport { .. } | Self::Timeout { .. }) + } + + fn discovery_failure(&self, server_name: &str) -> McpDiscoveryFailure { + let phase = self.lifecycle_phase(); + let recoverable = self.recoverable(); + let context = self.error_context(); + + McpDiscoveryFailure { + server_name: server_name.to_string(), + phase, + error: self.to_string(), + recoverable, + context, + } + } + + fn error_context(&self) -> BTreeMap { + match self { + Self::Io(error) => BTreeMap::from([("kind".to_string(), error.kind().to_string())]), + Self::Transport { + server_name, + method, + source, + } => BTreeMap::from([ + ("server".to_string(), server_name.clone()), + ("method".to_string(), (*method).to_string()), + ("io_kind".to_string(), source.kind().to_string()), + ]), + Self::JsonRpc { + server_name, + method, + error, + } => BTreeMap::from([ + ("server".to_string(), server_name.clone()), + ("method".to_string(), (*method).to_string()), + ("jsonrpc_code".to_string(), error.code.to_string()), + ]), + Self::InvalidResponse { + server_name, + method, + details, + } => BTreeMap::from([ + ("server".to_string(), server_name.clone()), + ("method".to_string(), (*method).to_string()), + ("details".to_string(), details.clone()), + ]), + Self::Timeout { + server_name, + method, + timeout_ms, + } => BTreeMap::from([ + ("server".to_string(), server_name.clone()), + ("method".to_string(), (*method).to_string()), + ("timeout_ms".to_string(), timeout_ms.to_string()), + ]), + Self::UnknownTool { qualified_name } => BTreeMap::from([( + "qualified_tool".to_string(), + qualified_name.clone(), + )]), + Self::UnknownServer { server_name } => { + BTreeMap::from([("server".to_string(), server_name.clone())]) + } + } + } +} + +fn lifecycle_phase_for_method(method: &str) -> McpLifecyclePhase { + match method { + "initialize" => McpLifecyclePhase::InitializeHandshake, + "tools/list" => McpLifecyclePhase::ToolDiscovery, + "resources/list" => McpLifecyclePhase::ResourceDiscovery, + "resources/read" | "tools/call" => McpLifecyclePhase::Invocation, + _ => McpLifecyclePhase::ErrorSurfacing, + } +} + +fn unsupported_server_failed_server(server: &UnsupportedMcpServer) -> McpFailedServer { + McpFailedServer { + server_name: server.server_name.clone(), + phase: McpLifecyclePhase::ServerRegistration, + error: McpErrorSurface::new( + McpLifecyclePhase::ServerRegistration, + Some(server.server_name.clone()), + server.reason.clone(), + BTreeMap::from([("transport".to_string(), format!("{:?}", server.transport))]), + false, + ), + } +} + #[derive(Debug, Clone, PartialEq, Eq)] struct ToolRoute { server_name: String, @@ -441,11 +553,13 @@ impl McpServerManager { pub async fn discover_tools_best_effort(&mut self) -> McpToolDiscoveryReport { let server_names = self.server_names(); let mut discovered_tools = Vec::new(); + let mut working_servers = Vec::new(); let mut failed_servers = Vec::new(); for server_name in server_names { match self.discover_tools_for_server(&server_name).await { Ok(server_tools) => { + working_servers.push(server_name.clone()); self.clear_routes_for_server(&server_name); for tool in server_tools { self.tool_index.insert( @@ -460,18 +574,48 @@ impl McpServerManager { } Err(error) => { self.clear_routes_for_server(&server_name); - failed_servers.push(McpDiscoveryFailure { - server_name, - error: error.to_string(), - }); + failed_servers.push(error.discovery_failure(&server_name)); } } } + let degraded_failed_servers = failed_servers + .iter() + .map(|failure| McpFailedServer { + server_name: failure.server_name.clone(), + phase: failure.phase, + error: McpErrorSurface::new( + failure.phase, + Some(failure.server_name.clone()), + failure.error.clone(), + failure.context.clone(), + failure.recoverable, + ), + }) + .chain( + self.unsupported_servers + .iter() + .map(unsupported_server_failed_server), + ) + .collect::>(); + let degraded_startup = (!working_servers.is_empty() && !degraded_failed_servers.is_empty()) + .then(|| { + McpDegradedReport::new( + working_servers, + degraded_failed_servers, + discovered_tools + .iter() + .map(|tool| tool.qualified_name.clone()) + .collect(), + Vec::new(), + ) + }); + McpToolDiscoveryReport { tools: discovered_tools, failed_servers, unsupported_servers: self.unsupported_servers.clone(), + degraded_startup, } } @@ -1284,7 +1428,9 @@ mod tests { McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo, McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager, McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams, + unsupported_server_failed_server, }; + use crate::McpLifecyclePhase; fn temp_dir() -> PathBuf { static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0); @@ -2544,7 +2690,32 @@ mod tests { ); assert_eq!(report.failed_servers.len(), 1); assert_eq!(report.failed_servers[0].server_name, "broken"); + assert_eq!( + report.failed_servers[0].phase, + McpLifecyclePhase::InitializeHandshake + ); + assert!(!report.failed_servers[0].recoverable); + assert_eq!( + report.failed_servers[0].context.get("method").map(String::as_str), + Some("initialize") + ); assert!(report.failed_servers[0].error.contains("initialize")); + let degraded = report + .degraded_startup + .as_ref() + .expect("partial startup should surface degraded report"); + assert_eq!(degraded.working_servers, vec!["alpha".to_string()]); + assert_eq!(degraded.failed_servers.len(), 1); + assert_eq!(degraded.failed_servers[0].server_name, "broken"); + assert_eq!( + degraded.failed_servers[0].phase, + McpLifecyclePhase::InitializeHandshake + ); + assert_eq!( + degraded.available_tools, + vec![mcp_tool_name("alpha", "echo")] + ); + assert!(degraded.missing_tools.is_empty()); let response = manager .call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "ok"}))) @@ -2608,6 +2779,10 @@ mod tests { assert_eq!(unsupported[0].server_name, "http"); assert_eq!(unsupported[1].server_name, "sdk"); assert_eq!(unsupported[2].server_name, "ws"); + assert_eq!( + unsupported_server_failed_server(&unsupported[0]).phase, + McpLifecyclePhase::ServerRegistration + ); } #[test] diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 02ca704..ac9b023 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -1601,6 +1601,7 @@ struct RuntimeMcpState { runtime: tokio::runtime::Runtime, manager: McpServerManager, pending_servers: Vec, + degraded_report: Option, } struct BuiltRuntime { @@ -1731,12 +1732,63 @@ impl RuntimeMcpState { .collect::>() .into_iter() .collect::>(); + let available_tools = discovery + .tools + .iter() + .map(|tool| tool.qualified_name.clone()) + .collect::>(); + let failed_server_names = pending_servers.iter().cloned().collect::>(); + let working_servers = manager + .server_names() + .into_iter() + .filter(|server_name| !failed_server_names.contains(server_name)) + .collect::>(); + let failed_servers = discovery + .failed_servers + .iter() + .map(|failure| runtime::McpFailedServer { + server_name: failure.server_name.clone(), + phase: runtime::McpLifecyclePhase::ToolDiscovery, + error: runtime::McpErrorSurface::new( + runtime::McpLifecyclePhase::ToolDiscovery, + Some(failure.server_name.clone()), + failure.error.clone(), + std::collections::BTreeMap::new(), + true, + ), + }) + .chain(discovery.unsupported_servers.iter().map(|server| { + runtime::McpFailedServer { + server_name: server.server_name.clone(), + phase: runtime::McpLifecyclePhase::ServerRegistration, + error: runtime::McpErrorSurface::new( + runtime::McpLifecyclePhase::ServerRegistration, + Some(server.server_name.clone()), + server.reason.clone(), + std::collections::BTreeMap::from([( + "transport".to_string(), + format!("{:?}", server.transport).to_ascii_lowercase(), + )]), + false, + ), + } + })) + .collect::>(); + let degraded_report = (!failed_servers.is_empty()).then(|| { + runtime::McpDegradedReport::new( + working_servers, + failed_servers, + available_tools.clone(), + available_tools, + ) + }); Ok(Some(( Self { runtime, manager, pending_servers, + degraded_report, }, discovery, ))) @@ -1751,6 +1803,10 @@ impl RuntimeMcpState { (!self.pending_servers.is_empty()).then(|| self.pending_servers.clone()) } + fn degraded_report(&self) -> Option { + self.degraded_report.clone() + } + fn server_names(&self) -> Vec { self.manager.server_names() } @@ -5286,16 +5342,21 @@ impl CliToolExecutor { fn execute_search_tool(&self, value: serde_json::Value) -> Result { let input: ToolSearchRequest = serde_json::from_value(value) .map_err(|error| ToolError::new(format!("invalid tool input JSON: {error}")))?; - let pending_mcp_servers = self.mcp_state.as_ref().and_then(|state| { - state - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .pending_servers() - }); + let (pending_mcp_servers, mcp_degraded) = self + .mcp_state + .as_ref() + .map(|state| { + let state = state + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + (state.pending_servers(), state.degraded_report()) + }) + .unwrap_or((None, None)); serde_json::to_string_pretty(&self.tool_registry.search( &input.query, input.max_results.unwrap_or(5), pending_mcp_servers, + mcp_degraded, )) .map_err(|error| ToolError::new(error.to_string())) } @@ -7367,6 +7428,18 @@ UU conflicted.rs", serde_json::from_str(&search_output).expect("search output should be json"); assert_eq!(search_json["matches"][0], "mcp__alpha__echo"); assert_eq!(search_json["pending_mcp_servers"][0], "broken"); + assert_eq!( + search_json["mcp_degraded"]["failed_servers"][0]["server_name"], + "broken" + ); + assert_eq!( + search_json["mcp_degraded"]["failed_servers"][0]["phase"], + "tool_discovery" + ); + assert_eq!( + search_json["mcp_degraded"]["available_tools"][0], + "mcp__alpha__echo" + ); let listed = executor .execute("ListMcpResourcesTool", r#"{"server":"alpha"}"#) @@ -7400,6 +7473,54 @@ UU conflicted.rs", let _ = fs::remove_dir_all(workspace); } + #[test] + fn build_runtime_plugin_state_surfaces_unsupported_mcp_servers_structurally() { + let config_home = temp_dir(); + let workspace = temp_dir(); + fs::create_dir_all(&config_home).expect("config home"); + fs::create_dir_all(&workspace).expect("workspace"); + fs::write( + config_home.join("settings.json"), + r#"{ + "mcpServers": { + "remote": { + "url": "https://example.test/mcp" + } + } + }"#, + ) + .expect("write mcp settings"); + + let loader = ConfigLoader::new(&workspace, &config_home); + let runtime_config = loader.load().expect("runtime config should load"); + let state = build_runtime_plugin_state_with_loader(&workspace, &loader, &runtime_config) + .expect("runtime plugin state should load"); + let mut executor = + CliToolExecutor::new(None, false, state.tool_registry.clone(), state.mcp_state.clone()); + + let search_output = executor + .execute("ToolSearch", r#"{"query":"remote","max_results":5}"#) + .expect("tool search should execute"); + let search_json: serde_json::Value = + serde_json::from_str(&search_output).expect("search output should be json"); + assert_eq!(search_json["pending_mcp_servers"][0], "remote"); + assert_eq!( + search_json["mcp_degraded"]["failed_servers"][0]["server_name"], + "remote" + ); + assert_eq!( + search_json["mcp_degraded"]["failed_servers"][0]["phase"], + "server_registration" + ); + assert_eq!( + search_json["mcp_degraded"]["failed_servers"][0]["error"]["context"]["transport"], + "http" + ); + + let _ = fs::remove_dir_all(config_home); + let _ = fs::remove_dir_all(workspace); + } + #[test] fn build_runtime_runs_plugin_lifecycle_init_and_shutdown() { let config_home = temp_dir(); diff --git a/rust/crates/tools/src/lane_completion.rs b/rust/crates/tools/src/lane_completion.rs index 0e499bb..2850127 100644 --- a/rust/crates/tools/src/lane_completion.rs +++ b/rust/crates/tools/src/lane_completion.rs @@ -30,7 +30,9 @@ pub(crate) fn detect_lane_completion( } // Must have finished status - if output.status != "Finished" { + if !output.status.eq_ignore_ascii_case("completed") + && !output.status.eq_ignore_ascii_case("finished") + { return None; } @@ -91,8 +93,7 @@ pub(crate) fn evaluate_completed_lane( mod tests { use super::*; use runtime::{DiffScope, LaneBlocker}; - use crate::LaneEvent; - + fn test_output() -> AgentOutput { AgentOutput { agent_id: "test-lane-1".to_string(), @@ -176,4 +177,4 @@ mod tests { assert!(actions.contains(&PolicyAction::CloseoutLane)); assert!(actions.contains(&PolicyAction::CleanupSession)); } -} \ No newline at end of file +} diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index ad1459b..4cec927 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -22,8 +22,9 @@ use runtime::{ worker_boot::{WorkerReadySnapshot, WorkerRegistry}, write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, BashCommandOutput, BranchFreshness, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, - MessageRole, PermissionMode, PermissionPolicy, PromptCacheEvent, RuntimeError, Session, - ToolError, ToolExecutor, + LaneEvent, LaneEventBlocker, LaneEventName, LaneEventStatus, LaneFailureClass, + McpDegradedReport, MessageRole, PermissionMode, PermissionPolicy, PromptCacheEvent, + RuntimeError, Session, ToolError, ToolExecutor, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -313,6 +314,7 @@ impl GlobalToolRegistry { query: &str, max_results: usize, pending_mcp_servers: Option>, + mcp_degraded: Option, ) -> ToolSearchOutput { let query = query.trim().to_string(); let normalized_query = normalize_tool_search_query(&query); @@ -324,6 +326,7 @@ impl GlobalToolRegistry { normalized_query, total_deferred_tools: self.searchable_tool_specs().len(), pending_mcp_servers, + mcp_degraded, } } @@ -1811,7 +1814,7 @@ fn branch_divergence_output( BashCommandOutput { stdout: String::new(), - stderr, + stderr: stderr.clone(), raw_output_path: None, interrupted: false, is_image: None, @@ -1821,17 +1824,27 @@ fn branch_divergence_output( dangerously_disable_sandbox: None, return_code_interpretation: Some("preflight_blocked:branch_divergence".to_string()), no_output_expected: Some(false), - structured_content: Some(vec![json!({ - "event": "branch.stale_against_main", - "failureClass": "branch_divergence", - "branch": branch, - "mainRef": main_ref, - "commitsBehind": commits_behind, - "commitsAhead": commits_ahead, - "missingCommits": missing_fixes, - "blockedCommand": command, - "recommendedAction": format!("merge or rebase {main_ref} before workspace tests") - })]), + structured_content: Some(vec![ + serde_json::to_value( + LaneEvent::new( + LaneEventName::BranchStaleAgainstMain, + LaneEventStatus::Blocked, + iso8601_now(), + ) + .with_failure_class(LaneFailureClass::BranchDivergence) + .with_detail(stderr.clone()) + .with_data(json!({ + "branch": branch, + "mainRef": main_ref, + "commitsBehind": commits_behind, + "commitsAhead": commits_ahead, + "missingCommits": missing_fixes, + "blockedCommand": command, + "recommendedAction": format!("merge or rebase {main_ref} before workspace tests") + })), + ) + .expect("lane event should serialize"), + ]), persisted_output_path: None, persisted_output_size: None, sandbox_status: None, @@ -2277,58 +2290,6 @@ struct SkillOutput { prompt: String, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -enum LaneEventName { - #[serde(rename = "lane.started")] - Started, - #[serde(rename = "lane.blocked")] - Blocked, - #[serde(rename = "lane.finished")] - Finished, - #[serde(rename = "lane.failed")] - Failed, - #[serde(rename = "lane.reconciled")] - Reconciled, - #[serde(rename = "lane.merged")] - Merged, - #[serde(rename = "lane.superseded")] - Superseded, - #[serde(rename = "lane.closed")] - Closed, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -enum LaneFailureClass { - PromptDelivery, - TrustGate, - BranchDivergence, - Compile, - Test, - PluginStartup, - McpStartup, - Infra, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -struct LaneBlocker { - #[serde(rename = "failureClass")] - failure_class: LaneFailureClass, - detail: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -struct LaneEvent { - event: LaneEventName, - status: String, - #[serde(rename = "emittedAt")] - emitted_at: String, - #[serde(rename = "failureClass", skip_serializing_if = "Option::is_none")] - failure_class: Option, - #[serde(skip_serializing_if = "Option::is_none")] - detail: Option, -} - #[derive(Debug, Clone, Serialize, Deserialize)] struct AgentOutput { #[serde(rename = "agentId")] @@ -2352,7 +2313,7 @@ struct AgentOutput { #[serde(rename = "laneEvents", default, skip_serializing_if = "Vec::is_empty")] lane_events: Vec, #[serde(rename = "currentBlocker", skip_serializing_if = "Option::is_none")] - current_blocker: Option, + current_blocker: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, } @@ -2374,6 +2335,8 @@ pub struct ToolSearchOutput { total_deferred_tools: usize, #[serde(rename = "pending_mcp_servers")] pending_mcp_servers: Option>, + #[serde(rename = "mcp_degraded", skip_serializing_if = "Option::is_none")] + mcp_degraded: Option, } #[derive(Debug, Serialize)] @@ -3064,13 +3027,7 @@ where created_at: created_at.clone(), started_at: Some(created_at), completed_at: None, - lane_events: vec![LaneEvent { - event: LaneEventName::Started, - status: String::from("running"), - emitted_at: iso8601_now(), - failure_class: None, - detail: None, - }], + lane_events: vec![LaneEvent::started(iso8601_now())], current_blocker: None, error: None, }; @@ -3286,32 +3243,20 @@ fn persist_agent_terminal_state( next_manifest.current_blocker = blocker.clone(); next_manifest.error = error; if let Some(blocker) = blocker { - next_manifest.lane_events.push(LaneEvent { - event: LaneEventName::Blocked, - status: status.to_string(), - emitted_at: iso8601_now(), - failure_class: Some(blocker.failure_class.clone()), - detail: Some(blocker.detail.clone()), - }); - next_manifest.lane_events.push(LaneEvent { - event: LaneEventName::Failed, - status: status.to_string(), - emitted_at: iso8601_now(), - failure_class: Some(blocker.failure_class), - detail: Some(blocker.detail), - }); + next_manifest.lane_events.push( + LaneEvent::blocked(iso8601_now(), &blocker), + ); + next_manifest.lane_events.push( + LaneEvent::failed(iso8601_now(), &blocker), + ); } else { next_manifest.current_blocker = None; let compressed_detail = result .filter(|value| !value.trim().is_empty()) .map(|value| compress_summary_text(value.trim())); - next_manifest.lane_events.push(LaneEvent { - event: LaneEventName::Finished, - status: status.to_string(), - emitted_at: iso8601_now(), - failure_class: None, - detail: compressed_detail, - }); + next_manifest + .lane_events + .push(LaneEvent::finished(iso8601_now(), compressed_detail)); } write_agent_manifest(&next_manifest) } @@ -3330,7 +3275,7 @@ fn append_agent_output(path: &str, suffix: &str) -> Result<(), String> { fn format_agent_terminal_output( status: &str, result: Option<&str>, - blocker: Option<&LaneBlocker>, + blocker: Option<&LaneEventBlocker>, error: Option<&str>, ) -> String { let mut sections = vec![format!("\n## Result\n\n- status: {status}\n")]; @@ -3352,9 +3297,9 @@ fn format_agent_terminal_output( sections.join("") } -fn classify_lane_blocker(error: &str) -> LaneBlocker { +fn classify_lane_blocker(error: &str) -> LaneEventBlocker { let detail = error.trim().to_string(); - LaneBlocker { + LaneEventBlocker { failure_class: classify_lane_failure(error), detail, } @@ -3371,6 +3316,8 @@ fn classify_lane_failure(error: &str) -> LaneFailureClass { && (normalized.contains("stale") || normalized.contains("diverg")) { LaneFailureClass::BranchDivergence + } else if normalized.contains("gateway") || normalized.contains("routing") { + LaneFailureClass::GatewayRouting } else if normalized.contains("compile") || normalized.contains("build failed") || normalized.contains("cargo check") @@ -3378,8 +3325,15 @@ fn classify_lane_failure(error: &str) -> LaneFailureClass { LaneFailureClass::Compile } else if normalized.contains("test") { LaneFailureClass::Test + } else if normalized.contains("tool failed") + || normalized.contains("runtime tool") + || normalized.contains("tool runtime") + { + LaneFailureClass::ToolRuntime } else if normalized.contains("plugin") { LaneFailureClass::PluginStartup + } else if normalized.contains("mcp") && normalized.contains("handshake") { + LaneFailureClass::McpHandshake } else if normalized.contains("mcp") { LaneFailureClass::McpStartup } else { @@ -3688,7 +3642,7 @@ fn final_assistant_text(summary: &runtime::TurnSummary) -> String { #[allow(clippy::needless_pass_by_value)] fn execute_tool_search(input: ToolSearchInput) -> ToolSearchOutput { - GlobalToolRegistry::builtin().search(&input.query, input.max_results.unwrap_or(5), None) + GlobalToolRegistry::builtin().search(&input.query, input.max_results.unwrap_or(5), None, None) } fn deferred_tool_specs() -> Vec { @@ -4944,7 +4898,7 @@ mod tests { agent_permission_policy, allowed_tools_for_subagent, classify_lane_failure, execute_agent_with_spawn, execute_tool, final_assistant_text, mvp_tool_specs, permission_mode_from_plugin, persist_agent_terminal_state, push_output_block, AgentInput, - AgentJob, GlobalToolRegistry, LaneFailureClass, SubagentToolExecutor, + AgentJob, GlobalToolRegistry, LaneEventName, LaneFailureClass, SubagentToolExecutor, }; use api::OutputContentBlock; use runtime::{ @@ -5264,10 +5218,34 @@ mod tests { )] ); - let search = registry.search("demo echo", 5, Some(vec!["pending-server".to_string()])); + let search = registry.search( + "demo echo", + 5, + Some(vec!["pending-server".to_string()]), + Some(runtime::McpDegradedReport::new( + vec!["demo".to_string()], + vec![runtime::McpFailedServer { + server_name: "pending-server".to_string(), + phase: runtime::McpLifecyclePhase::ToolDiscovery, + error: runtime::McpErrorSurface::new( + runtime::McpLifecyclePhase::ToolDiscovery, + Some("pending-server".to_string()), + "tool discovery failed", + BTreeMap::new(), + true, + ), + }], + vec!["mcp__demo__echo".to_string()], + vec!["mcp__demo__echo".to_string()], + )), + ); let output = serde_json::to_value(search).expect("search output should serialize"); assert_eq!(output["matches"][0], "mcp__demo__echo"); assert_eq!(output["pending_mcp_servers"][0], "pending-server"); + assert_eq!( + output["mcp_degraded"]["failed_servers"][0]["phase"], + "tool_discovery" + ); } #[test] @@ -5917,16 +5895,16 @@ mod tests { ), ("targeted tests failed", LaneFailureClass::Test), ("plugin bootstrap failed", LaneFailureClass::PluginStartup), - ("mcp handshake timed out", LaneFailureClass::McpStartup), + ("mcp handshake timed out", LaneFailureClass::McpHandshake), ( "mcp startup failed before listing tools", LaneFailureClass::McpStartup, ), ( "gateway routing rejected the request", - LaneFailureClass::Infra, + LaneFailureClass::GatewayRouting, ), - ("denied tool execution from hook", LaneFailureClass::Infra), + ("tool failed: denied tool execution from hook", LaneFailureClass::ToolRuntime), ("thread creation failed", LaneFailureClass::Infra), ]; @@ -5935,6 +5913,28 @@ mod tests { } } + #[test] + fn lane_event_schema_serializes_to_canonical_names() { + let cases = [ + (LaneEventName::Started, "lane.started"), + (LaneEventName::Ready, "lane.ready"), + (LaneEventName::PromptMisdelivery, "lane.prompt_misdelivery"), + (LaneEventName::Blocked, "lane.blocked"), + (LaneEventName::Red, "lane.red"), + (LaneEventName::Green, "lane.green"), + (LaneEventName::CommitCreated, "lane.commit.created"), + (LaneEventName::PrOpened, "lane.pr.opened"), + (LaneEventName::MergeReady, "lane.merge.ready"), + (LaneEventName::Finished, "lane.finished"), + (LaneEventName::Failed, "lane.failed"), + (LaneEventName::BranchStaleAgainstMain, "branch.stale_against_main"), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(event).expect("serialize lane event"), json!(expected)); + } + } + #[test] fn agent_tool_subset_mapping_is_expected() { let general = allowed_tools_for_subagent("general-purpose"); @@ -6258,7 +6258,7 @@ mod tests { "branch_divergence" ); assert_eq!( - output_json["structuredContent"][0]["missingCommits"][0], + output_json["structuredContent"][0]["data"]["missingCommits"][0], "fix: unblock workspace tests" );