Close the MCP lifecycle gap from config to runtime tool execution

This wires configured MCP servers into the CLI/runtime path so discovered
MCP tools, resource wrappers, search visibility, shutdown handling, and
best-effort discovery all work together instead of living as isolated
runtime primitives.

Constraint: Keep non-MCP startup flows working without new required config
Constraint: Preserve partial availability when one configured MCP server fails discovery
Rejected: Fail runtime startup on any MCP discovery error | too brittle for mixed healthy/broken server configs
Rejected: Keep MCP support runtime-only without registry wiring | left discovery and invocation unreachable from the CLI tool lane
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Runtime MCP tools are registry-backed but executed through CliToolExecutor state; keep future tool-registry changes aligned with that split
Tested: cargo test -p runtime mcp -- --nocapture; cargo test -p tools -- --nocapture; cargo test -p rusty-claude-cli -- --nocapture; cargo test --workspace -- --nocapture
Not-tested: Live remote MCP transports (http/sse/ws/sdk) remain unsupported in the CLI execution path
This commit is contained in:
Yeachan-Heo
2026-04-03 14:31:25 +00:00
parent 8805386bea
commit b3fe057559
6 changed files with 1155 additions and 105 deletions

View File

@@ -55,11 +55,12 @@ pub use mcp_client::{
};
pub use mcp_stdio::{
spawn_mcp_stdio_process, JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
ManagedMcpTool, McpInitializeClientInfo, McpInitializeParams, McpInitializeResult,
McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult, McpListToolsParams,
McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpResource,
McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess, McpTool,
McpToolCallContent, McpToolCallParams, McpToolCallResult, UnsupportedMcpServer,
ManagedMcpTool, McpDiscoveryFailure, McpInitializeClientInfo, McpInitializeParams,
McpInitializeResult, McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult,
McpListToolsParams, McpListToolsResult, McpReadResourceParams, McpReadResourceResult,
McpResource, McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess,
McpTool, McpToolCallContent, McpToolCallParams, McpToolCallResult, McpToolDiscoveryReport,
UnsupportedMcpServer,
};
pub use oauth::{
clear_oauth_credentials, code_challenge_s256, credentials_path, generate_pkce_pair,

View File

@@ -230,6 +230,19 @@ pub struct UnsupportedMcpServer {
pub reason: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct McpDiscoveryFailure {
pub server_name: String,
pub error: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct McpToolDiscoveryReport {
pub tools: Vec<ManagedMcpTool>,
pub failed_servers: Vec<McpDiscoveryFailure>,
pub unsupported_servers: Vec<UnsupportedMcpServer>,
}
#[derive(Debug)]
pub enum McpServerManagerError {
Io(io::Error),
@@ -397,6 +410,11 @@ impl McpServerManager {
&self.unsupported_servers
}
#[must_use]
pub fn server_names(&self) -> Vec<String> {
self.servers.keys().cloned().collect()
}
pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
let mut discovered_tools = Vec::new();
@@ -420,6 +438,43 @@ impl McpServerManager {
Ok(discovered_tools)
}
pub async fn discover_tools_best_effort(&mut self) -> McpToolDiscoveryReport {
let server_names = self.server_names();
let mut discovered_tools = Vec::new();
let mut failed_servers = Vec::new();
for server_name in server_names {
match self.discover_tools_for_server(&server_name).await {
Ok(server_tools) => {
self.clear_routes_for_server(&server_name);
for tool in server_tools {
self.tool_index.insert(
tool.qualified_name.clone(),
ToolRoute {
server_name: tool.server_name.clone(),
raw_name: tool.raw_name.clone(),
},
);
discovered_tools.push(tool);
}
}
Err(error) => {
self.clear_routes_for_server(&server_name);
failed_servers.push(McpDiscoveryFailure {
server_name,
error: error.to_string(),
});
}
}
}
McpToolDiscoveryReport {
tools: discovered_tools,
failed_servers,
unsupported_servers: self.unsupported_servers.clone(),
}
}
pub async fn call_tool(
&mut self,
qualified_tool_name: &str,
@@ -437,30 +492,31 @@ impl McpServerManager {
self.ensure_server_ready(&route.server_name).await?;
let request_id = self.take_request_id();
let response = {
let server = self.server_mut(&route.server_name)?;
let process = server.process.as_mut().ok_or_else(|| {
McpServerManagerError::InvalidResponse {
server_name: route.server_name.clone(),
method: "tools/call",
details: "server process missing after initialization".to_string(),
}
})?;
Self::run_process_request(
&route.server_name,
"tools/call",
timeout_ms,
process.call_tool(
request_id,
McpToolCallParams {
name: route.raw_name,
arguments,
meta: None,
},
),
)
.await
};
let response =
{
let server = self.server_mut(&route.server_name)?;
let process = server.process.as_mut().ok_or_else(|| {
McpServerManagerError::InvalidResponse {
server_name: route.server_name.clone(),
method: "tools/call",
details: "server process missing after initialization".to_string(),
}
})?;
Self::run_process_request(
&route.server_name,
"tools/call",
timeout_ms,
process.call_tool(
request_id,
McpToolCallParams {
name: route.raw_name,
arguments,
meta: None,
},
),
)
.await
};
if let Err(error) = &response {
if Self::should_reset_server(error) {
@@ -471,6 +527,53 @@ impl McpServerManager {
response
}
pub async fn list_resources(
&mut self,
server_name: &str,
) -> Result<McpListResourcesResult, McpServerManagerError> {
let mut attempts = 0;
loop {
match self.list_resources_once(server_name).await {
Ok(resources) => return Ok(resources),
Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
self.reset_server(server_name).await?;
attempts += 1;
}
Err(error) => {
if Self::should_reset_server(&error) {
self.reset_server(server_name).await?;
}
return Err(error);
}
}
}
}
pub async fn read_resource(
&mut self,
server_name: &str,
uri: &str,
) -> Result<McpReadResourceResult, McpServerManagerError> {
let mut attempts = 0;
loop {
match self.read_resource_once(server_name, uri).await {
Ok(resource) => return Ok(resource),
Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
self.reset_server(server_name).await?;
attempts += 1;
}
Err(error) => {
if Self::should_reset_server(&error) {
self.reset_server(server_name).await?;
}
return Err(error);
}
}
}
}
pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
for server_name in server_names {
@@ -507,12 +610,12 @@ impl McpServerManager {
}
fn tool_call_timeout_ms(&self, server_name: &str) -> Result<u64, McpServerManagerError> {
let server = self
.servers
.get(server_name)
.ok_or_else(|| McpServerManagerError::UnknownServer {
server_name: server_name.to_string(),
})?;
let server =
self.servers
.get(server_name)
.ok_or_else(|| McpServerManagerError::UnknownServer {
server_name: server_name.to_string(),
})?;
match &server.bootstrap.transport {
McpClientTransport::Stdio(transport) => Ok(transport.resolved_tool_call_timeout_ms()),
other => Err(McpServerManagerError::InvalidResponse {
@@ -595,14 +698,13 @@ impl McpServerManager {
});
}
let result =
response
.result
.ok_or_else(|| McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "tools/list",
details: "missing result payload".to_string(),
})?;
let result = response
.result
.ok_or_else(|| McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "tools/list",
details: "missing result payload".to_string(),
})?;
for tool in result.tools {
let qualified_name = mcp_tool_name(server_name, &tool.name);
@@ -623,6 +725,118 @@ impl McpServerManager {
Ok(discovered_tools)
}
async fn list_resources_once(
&mut self,
server_name: &str,
) -> Result<McpListResourcesResult, McpServerManagerError> {
self.ensure_server_ready(server_name).await?;
let mut resources = Vec::new();
let mut cursor = None;
loop {
let request_id = self.take_request_id();
let response = {
let server = self.server_mut(server_name)?;
let process = server.process.as_mut().ok_or_else(|| {
McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/list",
details: "server process missing after initialization".to_string(),
}
})?;
Self::run_process_request(
server_name,
"resources/list",
MCP_LIST_TOOLS_TIMEOUT_MS,
process.list_resources(
request_id,
Some(McpListResourcesParams {
cursor: cursor.clone(),
}),
),
)
.await?
};
if let Some(error) = response.error {
return Err(McpServerManagerError::JsonRpc {
server_name: server_name.to_string(),
method: "resources/list",
error,
});
}
let result = response
.result
.ok_or_else(|| McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/list",
details: "missing result payload".to_string(),
})?;
resources.extend(result.resources);
match result.next_cursor {
Some(next_cursor) => cursor = Some(next_cursor),
None => break,
}
}
Ok(McpListResourcesResult {
resources,
next_cursor: None,
})
}
async fn read_resource_once(
&mut self,
server_name: &str,
uri: &str,
) -> Result<McpReadResourceResult, McpServerManagerError> {
self.ensure_server_ready(server_name).await?;
let request_id = self.take_request_id();
let response =
{
let server = self.server_mut(server_name)?;
let process = server.process.as_mut().ok_or_else(|| {
McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/read",
details: "server process missing after initialization".to_string(),
}
})?;
Self::run_process_request(
server_name,
"resources/read",
MCP_LIST_TOOLS_TIMEOUT_MS,
process.read_resource(
request_id,
McpReadResourceParams {
uri: uri.to_string(),
},
),
)
.await?
};
if let Some(error) = response.error {
return Err(McpServerManagerError::JsonRpc {
server_name: server_name.to_string(),
method: "resources/read",
error,
});
}
response
.result
.ok_or_else(|| McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/read",
details: "missing result payload".to_string(),
})
}
async fn reset_server(&mut self, server_name: &str) -> Result<(), McpServerManagerError> {
let mut process = {
let server = self.server_mut(server_name)?;
@@ -1614,10 +1828,7 @@ mod tests {
let script_path = write_jsonrpc_script();
let transport = script_transport_with_env(
&script_path,
BTreeMap::from([(
"MCP_LOWERCASE_CONTENT_LENGTH".to_string(),
"1".to_string(),
)]),
BTreeMap::from([("MCP_LOWERCASE_CONTENT_LENGTH".to_string(), "1".to_string())]),
);
let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
@@ -1657,10 +1868,7 @@ mod tests {
let script_path = write_jsonrpc_script();
let transport = script_transport_with_env(
&script_path,
BTreeMap::from([(
"MCP_MISMATCHED_RESPONSE_ID".to_string(),
"1".to_string(),
)]),
BTreeMap::from([("MCP_MISMATCHED_RESPONSE_ID".to_string(), "1".to_string())]),
);
let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
@@ -1971,7 +2179,10 @@ mod tests {
manager.discover_tools().await.expect("discover tools");
let error = manager
.call_tool(&mcp_tool_name("slow", "echo"), Some(json!({"text": "slow"})))
.call_tool(
&mcp_tool_name("slow", "echo"),
Some(json!({"text": "slow"})),
)
.await
.expect_err("slow tool call should time out");
@@ -2036,7 +2247,9 @@ mod tests {
} => {
assert_eq!(server_name, "broken");
assert_eq!(method, "tools/call");
assert!(details.contains("expected ident") || details.contains("expected value"));
assert!(
details.contains("expected ident") || details.contains("expected value")
);
}
other => panic!("expected invalid response error, got {other:?}"),
}
@@ -2047,7 +2260,8 @@ mod tests {
}
#[test]
fn given_child_exits_after_discovery_when_calling_twice_then_second_call_succeeds_after_reset() {
fn given_child_exits_after_discovery_when_calling_twice_then_second_call_succeeds_after_reset()
{
let runtime = Builder::new_current_thread()
.enable_all()
.build()
@@ -2062,10 +2276,7 @@ mod tests {
&script_path,
"alpha",
&log_path,
BTreeMap::from([(
"MCP_EXIT_AFTER_TOOLS_LIST".to_string(),
"1".to_string(),
)]),
BTreeMap::from([("MCP_EXIT_AFTER_TOOLS_LIST".to_string(), "1".to_string())]),
),
)]);
let mut manager = McpServerManager::from_servers(&servers);
@@ -2150,7 +2361,10 @@ mod tests {
)]);
let mut manager = McpServerManager::from_servers(&servers);
let tools = manager.discover_tools().await.expect("discover tools after retry");
let tools = manager
.discover_tools()
.await
.expect("discover tools after retry");
assert_eq!(tools.len(), 1);
assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
@@ -2166,7 +2380,8 @@ mod tests {
}
#[test]
fn given_tool_call_disconnects_once_when_calling_twice_then_manager_resets_and_next_call_succeeds() {
fn given_tool_call_disconnects_once_when_calling_twice_then_manager_resets_and_next_call_succeeds(
) {
let runtime = Builder::new_current_thread()
.enable_all()
.build()
@@ -2198,7 +2413,10 @@ mod tests {
manager.discover_tools().await.expect("discover tools");
let first_error = manager
.call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "first"})))
.call_tool(
&mcp_tool_name("alpha", "echo"),
Some(json!({"text": "first"})),
)
.await
.expect_err("first tool call should fail when transport drops");
@@ -2216,7 +2434,10 @@ mod tests {
}
let response = manager
.call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "second"})))
.call_tool(
&mcp_tool_name("alpha", "echo"),
Some(json!({"text": "second"})),
)
.await
.expect("second tool call should succeed after reset");
@@ -2246,6 +2467,103 @@ mod tests {
});
}
#[test]
fn manager_lists_and_reads_resources_from_stdio_servers() {
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
runtime.block_on(async {
let script_path = write_mcp_server_script();
let root = script_path.parent().expect("script parent");
let log_path = root.join("resources.log");
let servers = BTreeMap::from([(
"alpha".to_string(),
manager_server_config(&script_path, "alpha", &log_path),
)]);
let mut manager = McpServerManager::from_servers(&servers);
let listed = manager
.list_resources("alpha")
.await
.expect("list resources");
assert_eq!(listed.resources.len(), 1);
assert_eq!(listed.resources[0].uri, "file://guide.txt");
let read = manager
.read_resource("alpha", "file://guide.txt")
.await
.expect("read resource");
assert_eq!(read.contents.len(), 1);
assert_eq!(
read.contents[0].text.as_deref(),
Some("contents for file://guide.txt")
);
manager.shutdown().await.expect("shutdown");
cleanup_script(&script_path);
});
}
#[test]
fn manager_discovery_report_keeps_healthy_servers_when_one_server_fails() {
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
runtime.block_on(async {
let script_path = write_manager_mcp_server_script();
let root = script_path.parent().expect("script parent");
let alpha_log = root.join("alpha.log");
let servers = BTreeMap::from([
(
"alpha".to_string(),
manager_server_config(&script_path, "alpha", &alpha_log),
),
(
"broken".to_string(),
ScopedMcpServerConfig {
scope: ConfigSource::Local,
config: McpServerConfig::Stdio(McpStdioServerConfig {
command: "python3".to_string(),
args: vec!["-c".to_string(), "import sys; sys.exit(0)".to_string()],
env: BTreeMap::new(),
tool_call_timeout_ms: None,
}),
},
),
]);
let mut manager = McpServerManager::from_servers(&servers);
let report = manager.discover_tools_best_effort().await;
assert_eq!(report.tools.len(), 1);
assert_eq!(
report.tools[0].qualified_name,
mcp_tool_name("alpha", "echo")
);
assert_eq!(report.failed_servers.len(), 1);
assert_eq!(report.failed_servers[0].server_name, "broken");
assert!(report.failed_servers[0].error.contains("initialize"));
let response = manager
.call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "ok"})))
.await
.expect("healthy server should remain callable");
assert_eq!(
response
.result
.as_ref()
.and_then(|result| result.structured_content.as_ref())
.and_then(|value| value.get("echoed")),
Some(&json!("ok"))
);
manager.shutdown().await.expect("shutdown");
cleanup_script(&script_path);
});
}
#[test]
fn manager_records_unsupported_non_stdio_servers_without_panicking() {
let servers = BTreeMap::from([