Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 100 additions & 8 deletions rig-integrations/rig-bedrock/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,30 @@ struct ReasoningState {
signature: Option<String>,
}

/// Convert an accumulated [`ReasoningState`] into a streaming reasoning chunk.
///
/// Adaptive-thinking blocks from Bedrock can arrive as signature-only — i.e. a
/// `Signature` delta with no preceding non-empty `Text` delta. Dropping such
/// blocks loses the signature on the way back to the consumer, which then
/// fails on the next turn with `messages.N.content.0.thinking.signature:
/// Field required` when the conversation is replayed to Bedrock. We must emit
/// whenever either the content or the signature is present; both-empty is
/// still skipped.
fn finalize_reasoning(
state: ReasoningState,
) -> Option<RawStreamingChoice<BedrockStreamingResponse>> {
if state.content.is_empty() && state.signature.is_none() {
return None;
}
Some(RawStreamingChoice::Reasoning {
id: None,
content: ReasoningContent::Text {
text: state.content,
signature: state.signature,
},
})
}

impl CompletionModel {
pub(crate) async fn stream(
&self,
Expand Down Expand Up @@ -168,14 +192,8 @@ impl CompletionModel {
},
aws_bedrock::ConverseStreamOutput::ContentBlockStop(_event) => {
if let Some(reasoning_state) = current_reasoning.take()
&& !reasoning_state.content.is_empty() {
yield Ok(RawStreamingChoice::Reasoning {
id: None,
content: ReasoningContent::Text {
text: reasoning_state.content,
signature: reasoning_state.signature,
},
})
&& let Some(choice) = finalize_reasoning(reasoning_state) {
yield Ok(choice)
}
},
aws_bedrock::ConverseStreamOutput::MessageStop(message_stop_event) => {
Expand Down Expand Up @@ -528,4 +546,78 @@ mod tests {
assert_eq!(state.content, "Reasoning content here");
assert_eq!(state.signature, Some("sig_part1_part2".to_string()));
}

#[test]
fn finalize_reasoning_with_content_and_signature_emits_text_block() {
let state = ReasoningState {
content: "I am thinking".to_string(),
signature: Some("sig-abc".to_string()),
};

let choice = finalize_reasoning(state).expect("should emit reasoning");
match choice {
RawStreamingChoice::Reasoning { id, content } => {
assert!(id.is_none());
match content {
ReasoningContent::Text { text, signature } => {
assert_eq!(text, "I am thinking");
assert_eq!(signature.as_deref(), Some("sig-abc"));
}
other => panic!("expected ReasoningContent::Text, got {:?}", other),
}
}
_ => panic!("expected RawStreamingChoice::Reasoning"),
}
}

#[test]
fn finalize_reasoning_signature_only_still_emits_block() {
// Adaptive-thinking on Bedrock can produce a Signature delta with no
// accompanying non-empty Text delta. Previously this was silently
// dropped, losing the signature and breaking next-turn replay.
let state = ReasoningState {
content: String::new(),
signature: Some("sig-only".to_string()),
};

let choice =
finalize_reasoning(state).expect("should emit reasoning for signature-only state");
match choice {
RawStreamingChoice::Reasoning { content, .. } => match content {
ReasoningContent::Text { text, signature } => {
assert!(text.is_empty());
assert_eq!(signature.as_deref(), Some("sig-only"));
}
other => panic!("expected ReasoningContent::Text, got {:?}", other),
},
_ => panic!("expected RawStreamingChoice::Reasoning"),
}
}

#[test]
fn finalize_reasoning_content_only_still_emits_block() {
let state = ReasoningState {
content: "thoughts without sig".to_string(),
signature: None,
};

let choice =
finalize_reasoning(state).expect("should emit reasoning for content-only state");
match choice {
RawStreamingChoice::Reasoning { content, .. } => match content {
ReasoningContent::Text { text, signature } => {
assert_eq!(text, "thoughts without sig");
assert!(signature.is_none());
}
other => panic!("expected ReasoningContent::Text, got {:?}", other),
},
_ => panic!("expected RawStreamingChoice::Reasoning"),
}
}

#[test]
fn finalize_reasoning_both_empty_emits_nothing() {
let state = ReasoningState::default();
assert!(finalize_reasoning(state).is_none());
}
}
Loading