Preserve plugin and hook semantics while finishing the hook-pipeline merge

The merge now keeps plugin lifecycle management, plugin tool permissions,
hook abort/progress handling, permission-rule config, and shared slash-command
help aligned across runtime and CLI codepaths.

Constraint: Merge had to retain both plugin runtime behavior and hook-pipeline permission/abort features
Rejected: Drop plugin-aware runtime paths during merge | would regress installed plugin hooks and lifecycle handling
Rejected: Prefer hook-pipeline tool permissions over the global tool registry | would lose plugin tool permission mapping
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Keep runtime hook flow, permission policy wiring, and slash-command surfaces synchronized across crates during future merges
Tested: cargo test; cargo fmt --all --check; git diff --check
Not-tested: Live networked ANTHROPIC_API_KEY smoke path
This commit is contained in:
Yeachan-Heo
2026-04-01 09:00:55 +00:00
9 changed files with 1515 additions and 186 deletions

View File

@@ -20,7 +20,7 @@ runtime = { path = "../runtime" }
plugins = { path = "../plugins" }
serde_json = "1"
syntect = "5"
tokio = { version = "1", features = ["rt-multi-thread", "time"] }
tokio = { version = "1", features = ["rt-multi-thread", "signal", "time"] }
tools = { path = "../tools" }
[lints]

View File

@@ -10,9 +10,9 @@ use std::io::{self, Read, Write};
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use api::{
@@ -972,6 +972,61 @@ struct LiveCli {
session: SessionHandle,
}
struct HookAbortMonitor {
stop_tx: Option<Sender<()>>,
join_handle: Option<JoinHandle<()>>,
}
impl HookAbortMonitor {
fn spawn(abort_signal: runtime::HookAbortSignal) -> Self {
Self::spawn_with_waiter(abort_signal, move |stop_rx, abort_signal| {
let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
else {
return;
};
runtime.block_on(async move {
let wait_for_stop = tokio::task::spawn_blocking(move || {
let _ = stop_rx.recv();
});
tokio::select! {
result = tokio::signal::ctrl_c() => {
if result.is_ok() {
abort_signal.abort();
}
}
_ = wait_for_stop => {}
}
});
})
}
fn spawn_with_waiter<F>(abort_signal: runtime::HookAbortSignal, wait_for_interrupt: F) -> Self
where
F: FnOnce(Receiver<()>, runtime::HookAbortSignal) + Send + 'static,
{
let (stop_tx, stop_rx) = mpsc::channel();
let join_handle = thread::spawn(move || wait_for_interrupt(stop_rx, abort_signal));
Self {
stop_tx: Some(stop_tx),
join_handle: Some(join_handle),
}
}
fn stop(mut self) {
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(());
}
if let Some(join_handle) = self.join_handle.take() {
let _ = join_handle.join();
}
}
}
impl LiveCli {
fn new(
model: String,
@@ -1028,7 +1083,35 @@ impl LiveCli {
)
}
fn prepare_turn_runtime(
&self,
emit_output: bool,
) -> Result<
(
ConversationRuntime<AnthropicRuntimeClient, CliToolExecutor>,
HookAbortMonitor,
),
Box<dyn std::error::Error>,
> {
let hook_abort_signal = runtime::HookAbortSignal::new();
let runtime = build_runtime(
self.runtime.session().clone(),
self.model.clone(),
self.system_prompt.clone(),
true,
emit_output,
self.allowed_tools.clone(),
self.permission_mode,
None,
)?
.with_hook_abort_signal(hook_abort_signal.clone());
let hook_abort_monitor = HookAbortMonitor::spawn(hook_abort_signal);
Ok((runtime, hook_abort_monitor))
}
fn run_turn(&mut self, input: &str) -> Result<(), Box<dyn std::error::Error>> {
let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(true)?;
let mut spinner = Spinner::new();
let mut stdout = io::stdout();
spinner.tick(
@@ -1037,7 +1120,9 @@ impl LiveCli {
&mut stdout,
)?;
let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode);
let result = self.runtime.run_turn(input, Some(&mut permission_prompter));
let result = runtime.run_turn(input, Some(&mut permission_prompter));
hook_abort_monitor.stop();
self.runtime = runtime;
match result {
Ok(summary) => {
spinner.finish(
@@ -1078,19 +1163,11 @@ impl LiveCli {
}
fn run_prompt_json(&mut self, input: &str) -> Result<(), Box<dyn std::error::Error>> {
let session = self.runtime.session().clone();
let mut runtime = build_runtime(
session,
self.model.clone(),
self.system_prompt.clone(),
true,
false,
self.allowed_tools.clone(),
self.permission_mode,
None,
)?;
let (mut runtime, hook_abort_monitor) = self.prepare_turn_runtime(false)?;
let mut permission_prompter = CliPermissionPrompter::new(self.permission_mode);
let summary = runtime.run_turn(input, Some(&mut permission_prompter))?;
let result = runtime.run_turn(input, Some(&mut permission_prompter));
hook_abort_monitor.stop();
let summary = result?;
self.runtime = runtime;
self.persist_session()?;
println!(
@@ -2756,7 +2833,7 @@ fn build_runtime(
) -> Result<ConversationRuntime<AnthropicRuntimeClient, CliToolExecutor>, Box<dyn std::error::Error>>
{
let (feature_config, plugin_registry, tool_registry) = build_runtime_plugin_state()?;
Ok(ConversationRuntime::new_with_plugins(
let mut runtime = ConversationRuntime::new_with_plugins(
session,
AnthropicRuntimeClient::new(
model,
@@ -2767,11 +2844,48 @@ fn build_runtime(
progress_reporter,
)?,
CliToolExecutor::new(allowed_tools.clone(), emit_output, tool_registry.clone()),
permission_policy(permission_mode, &tool_registry),
permission_policy(permission_mode, &feature_config, &tool_registry),
system_prompt,
feature_config,
plugin_registry,
)?)
)?;
if emit_output {
runtime = runtime.with_hook_progress_reporter(Box::new(CliHookProgressReporter));
}
Ok(runtime)
}
struct CliHookProgressReporter;
impl runtime::HookProgressReporter for CliHookProgressReporter {
fn on_event(&mut self, event: &runtime::HookProgressEvent) {
match event {
runtime::HookProgressEvent::Started {
event,
tool_name,
command,
} => eprintln!(
"[hook {event_name}] {tool_name}: {command}",
event_name = event.as_str()
),
runtime::HookProgressEvent::Completed {
event,
tool_name,
command,
} => eprintln!(
"[hook done {event_name}] {tool_name}: {command}",
event_name = event.as_str()
),
runtime::HookProgressEvent::Cancelled {
event,
tool_name,
command,
} => eprintln!(
"[hook cancelled {event_name}] {tool_name}: {command}",
event_name = event.as_str()
),
}
}
}
struct CliPermissionPrompter {
@@ -3621,9 +3735,13 @@ impl ToolExecutor for CliToolExecutor {
}
}
fn permission_policy(mode: PermissionMode, tool_registry: &GlobalToolRegistry) -> PermissionPolicy {
fn permission_policy(
mode: PermissionMode,
feature_config: &runtime::RuntimeFeatureConfig,
tool_registry: &GlobalToolRegistry,
) -> PermissionPolicy {
tool_registry.permission_specs(None).into_iter().fold(
PermissionPolicy::new(mode),
PermissionPolicy::new(mode).with_permission_rules(feature_config.permission_rules()),
|policy, (name, required_permission)| {
policy.with_tool_requirement(name, required_permission)
},
@@ -3773,14 +3891,18 @@ mod tests {
normalize_permission_mode, parse_args, parse_git_status_metadata, permission_policy,
print_help_to, push_output_block, render_config_report, render_memory_report,
render_repl_help, resolve_model_alias, response_to_events, resume_supported_slash_commands,
status_context, CliAction, CliOutputFormat, InternalPromptProgressEvent,
status_context, CliAction, CliOutputFormat, HookAbortMonitor, InternalPromptProgressEvent,
InternalPromptProgressState, SlashCommand, StatusUsage, DEFAULT_MODEL,
};
use api::{MessageResponse, OutputContentBlock, Usage};
use plugins::{PluginTool, PluginToolDefinition, PluginToolPermission};
use runtime::{AssistantEvent, ContentBlock, ConversationMessage, MessageRole, PermissionMode};
use runtime::{
AssistantEvent, ContentBlock, ConversationMessage, HookAbortSignal, MessageRole,
PermissionMode,
};
use serde_json::json;
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;
use tools::GlobalToolRegistry;
@@ -4041,7 +4163,11 @@ mod tests {
#[test]
fn permission_policy_uses_plugin_tool_permissions() {
let policy = permission_policy(PermissionMode::ReadOnly, &registry_with_plugin_tool());
let policy = permission_policy(
PermissionMode::ReadOnly,
&runtime::RuntimeFeatureConfig::default(),
&registry_with_plugin_tool(),
);
let required = policy.required_mode_for("plugin_echo");
assert_eq!(required, PermissionMode::WorkspaceWrite);
}
@@ -4678,4 +4804,43 @@ mod tests {
));
assert!(!String::from_utf8(out).expect("utf8").contains("step 1"));
}
#[test]
fn hook_abort_monitor_stops_without_aborting() {
let abort_signal = HookAbortSignal::new();
let (ready_tx, ready_rx) = mpsc::channel();
let monitor = HookAbortMonitor::spawn_with_waiter(
abort_signal.clone(),
move |stop_rx, abort_signal| {
ready_tx.send(()).expect("ready signal");
let _ = stop_rx.recv();
assert!(!abort_signal.is_aborted());
},
);
ready_rx.recv().expect("waiter should be ready");
monitor.stop();
assert!(!abort_signal.is_aborted());
}
#[test]
fn hook_abort_monitor_propagates_interrupt() {
let abort_signal = HookAbortSignal::new();
let (done_tx, done_rx) = mpsc::channel();
let monitor = HookAbortMonitor::spawn_with_waiter(
abort_signal.clone(),
move |_stop_rx, abort_signal| {
abort_signal.abort();
done_tx.send(()).expect("done signal");
},
);
done_rx
.recv_timeout(Duration::from_secs(1))
.expect("interrupt should complete");
monitor.stop();
assert!(abort_signal.is_aborted());
}
}

View File

@@ -286,7 +286,7 @@ impl TerminalRenderer {
) {
match event {
Event::Start(Tag::Heading { level, .. }) => {
self.start_heading(state, level as u8, output);
Self::start_heading(state, level as u8, output);
}
Event::End(TagEnd::Paragraph) => output.push_str("\n\n"),
Event::Start(Tag::BlockQuote(..)) => self.start_quote(state, output),
@@ -426,8 +426,7 @@ impl TerminalRenderer {
}
}
#[allow(clippy::unused_self)]
fn start_heading(&self, state: &mut RenderState, level: u8, output: &mut String) {
fn start_heading(state: &mut RenderState, level: u8, output: &mut String) {
state.heading_level = Some(level);
if !output.is_empty() {
output.push('\n');