From 13851d800f29a7f6121439054e52123ef339c66d Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 08:06:10 +0000 Subject: [PATCH] Accept reasoning-style content blocks in the Rust API parser The Rust API layer rejected thinking-enabled responses because it only recognized text and tool_use content blocks. This commit extends the response and SSE parser types to accept reasoning-style content blocks and deltas, with regression coverage for both non-streaming and streaming responses. Constraint: Keep parsing compatible with existing text and tool-use message flows Rejected: Deserialize unknown content blocks into an untyped catch-all | would weaken protocol coverage and test precision Confidence: high Scope-risk: narrow Directive: Keep new protocol variants covered at the API boundary so downstream code can make explicit choices about preservation vs. ignoring Tested: cargo test -p api thinking -- --nocapture Not-tested: Live API traffic from a real thinking-enabled model --- rust/crates/api/src/sse.rs | 60 ++++++++++ rust/crates/api/src/types.rs | 11 ++ rust/crates/api/tests/client_integration.rs | 121 ++++++++++++++++++++ 3 files changed, 192 insertions(+) diff --git a/rust/crates/api/src/sse.rs b/rust/crates/api/src/sse.rs index d7334cd..5f54e50 100644 --- a/rust/crates/api/src/sse.rs +++ b/rust/crates/api/src/sse.rs @@ -216,4 +216,64 @@ mod tests { )) ); } + + #[test] + fn parses_thinking_content_block_start() { + let frame = concat!( + "event: content_block_start\n", + "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\",\"signature\":null}}\n\n" + ); + + let event = parse_frame(frame).expect("frame should parse"); + assert_eq!( + event, + Some(StreamEvent::ContentBlockStart( + crate::types::ContentBlockStartEvent { + index: 0, + content_block: OutputContentBlock::Thinking { + thinking: String::new(), + signature: None, + }, + }, + )) + ); + } + + #[test] + fn parses_thinking_related_deltas() { + let thinking = concat!( + "event: content_block_delta\n", + "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"step 1\"}}\n\n" + ); + let signature = concat!( + "event: content_block_delta\n", + "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"signature_delta\",\"signature\":\"sig_123\"}}\n\n" + ); + + let thinking_event = parse_frame(thinking).expect("thinking delta should parse"); + let signature_event = parse_frame(signature).expect("signature delta should parse"); + + assert_eq!( + thinking_event, + Some(StreamEvent::ContentBlockDelta( + crate::types::ContentBlockDeltaEvent { + index: 0, + delta: ContentBlockDelta::ThinkingDelta { + thinking: "step 1".to_string(), + }, + } + )) + ); + assert_eq!( + signature_event, + Some(StreamEvent::ContentBlockDelta( + crate::types::ContentBlockDeltaEvent { + index: 0, + delta: ContentBlockDelta::SignatureDelta { + signature: "sig_123".to_string(), + }, + } + )) + ); + } } diff --git a/rust/crates/api/src/types.rs b/rust/crates/api/src/types.rs index 45d5c08..c060be6 100644 --- a/rust/crates/api/src/types.rs +++ b/rust/crates/api/src/types.rs @@ -135,6 +135,15 @@ pub enum OutputContentBlock { name: String, input: Value, }, + Thinking { + #[serde(default)] + thinking: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + signature: Option, + }, + RedactedThinking { + data: Value, + }, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -190,6 +199,8 @@ pub struct ContentBlockDeltaEvent { pub enum ContentBlockDelta { TextDelta { text: String }, InputJsonDelta { partial_json: String }, + ThinkingDelta { thinking: String }, + SignatureDelta { signature: String }, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index c37fa99..be4abca 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -75,6 +75,48 @@ async fn send_message_posts_json_and_parses_response() { assert_eq!(body["tool_choice"]["type"], json!("auto")); } +#[tokio::test] +async fn send_message_parses_response_with_thinking_blocks() { + let state = Arc::new(Mutex::new(Vec::::new())); + let body = concat!( + "{", + "\"id\":\"msg_thinking\",", + "\"type\":\"message\",", + "\"role\":\"assistant\",", + "\"content\":[", + "{\"type\":\"thinking\",\"thinking\":\"step 1\",\"signature\":\"sig_123\"},", + "{\"type\":\"text\",\"text\":\"Final answer\"}", + "],", + "\"model\":\"claude-3-7-sonnet-latest\",", + "\"stop_reason\":\"end_turn\",", + "\"stop_sequence\":null,", + "\"usage\":{\"input_tokens\":12,\"output_tokens\":4}", + "}" + ); + let server = spawn_server( + state, + vec![http_response("200 OK", "application/json", body)], + ) + .await; + + let client = AnthropicClient::new("test-key").with_base_url(server.base_url()); + let response = client + .send_message(&sample_request(false)) + .await + .expect("request should succeed"); + + assert_eq!(response.content.len(), 2); + assert!(matches!( + &response.content[0], + OutputContentBlock::Thinking { thinking, signature } + if thinking == "step 1" && signature.as_deref() == Some("sig_123") + )); + assert!(matches!( + &response.content[1], + OutputContentBlock::Text { text } if text == "Final answer" + )); +} + #[tokio::test] async fn stream_message_parses_sse_events_with_tool_use() { let state = Arc::new(Mutex::new(Vec::::new())); @@ -162,6 +204,85 @@ async fn stream_message_parses_sse_events_with_tool_use() { assert!(request.body.contains("\"stream\":true")); } +#[tokio::test] +async fn stream_message_parses_sse_events_with_thinking_blocks() { + let state = Arc::new(Mutex::new(Vec::::new())); + let sse = concat!( + "event: message_start\n", + "data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_stream_thinking\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":8,\"output_tokens\":0}}}\n\n", + "event: content_block_start\n", + "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\"}}\n\n", + "event: content_block_delta\n", + "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"step 1\"}}\n\n", + "event: content_block_delta\n", + "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"signature_delta\",\"signature\":\"sig_123\"}}\n\n", + "event: content_block_stop\n", + "data: {\"type\":\"content_block_stop\",\"index\":0}\n\n", + "event: content_block_start\n", + "data: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"text\",\"text\":\"Final answer\"}}\n\n", + "event: content_block_stop\n", + "data: {\"type\":\"content_block_stop\",\"index\":1}\n\n", + "event: message_delta\n", + "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"input_tokens\":8,\"output_tokens\":1}}\n\n", + "event: message_stop\n", + "data: {\"type\":\"message_stop\"}\n\n", + "data: [DONE]\n\n" + ); + let server = spawn_server( + state, + vec![http_response("200 OK", "text/event-stream", sse)], + ) + .await; + + let client = AnthropicClient::new("test-key").with_base_url(server.base_url()); + let mut stream = client + .stream_message(&sample_request(false)) + .await + .expect("stream should start"); + + let mut events = Vec::new(); + while let Some(event) = stream + .next_event() + .await + .expect("stream event should parse") + { + events.push(event); + } + + assert_eq!(events.len(), 9); + assert!(matches!( + &events[1], + StreamEvent::ContentBlockStart(ContentBlockStartEvent { + content_block: OutputContentBlock::Thinking { thinking, signature }, + .. + }) if thinking.is_empty() && signature.is_none() + )); + assert!(matches!( + &events[2], + StreamEvent::ContentBlockDelta(ContentBlockDeltaEvent { + delta: ContentBlockDelta::ThinkingDelta { thinking }, + .. + }) if thinking == "step 1" + )); + assert!(matches!( + &events[3], + StreamEvent::ContentBlockDelta(ContentBlockDeltaEvent { + delta: ContentBlockDelta::SignatureDelta { signature }, + .. + }) if signature == "sig_123" + )); + assert!(matches!( + &events[5], + StreamEvent::ContentBlockStart(ContentBlockStartEvent { + content_block: OutputContentBlock::Text { text }, + .. + }) if text == "Final answer" + )); + assert!(matches!(events[6], StreamEvent::ContentBlockStop(_))); + assert!(matches!(events[7], StreamEvent::MessageDelta(_))); + assert!(matches!(events[8], StreamEvent::MessageStop(_))); +} + #[tokio::test] async fn retries_retryable_failures_before_succeeding() { let state = Arc::new(Mutex::new(Vec::::new()));