From d78c77141b4f534fc0cc135e305413bac7fa4e9a Mon Sep 17 00:00:00 2001 From: "nick.yi" Date: Wed, 5 Mar 2025 21:48:06 +0800 Subject: [PATCH] 1.hotfix timestamp,mark and interruption handling 2.Hotfix message persistence --- .../BotSharp.Core/Realtime/RealtimeHub.cs | 19 +- .../Services/Stream/TwilioStreamMiddleware.cs | 210 ++++++++++++++---- 2 files changed, 171 insertions(+), 58 deletions(-) diff --git a/src/Infrastructure/BotSharp.Core/Realtime/RealtimeHub.cs b/src/Infrastructure/BotSharp.Core/Realtime/RealtimeHub.cs index 36397b0ac..17c5162a5 100644 --- a/src/Infrastructure/BotSharp.Core/Realtime/RealtimeHub.cs +++ b/src/Infrastructure/BotSharp.Core/Realtime/RealtimeHub.cs @@ -160,19 +160,16 @@ await completer.Connect(conn, await completer.TriggerModelInference("Reply based on the function's output."); } } - else - { - // append output audio transcript to conversation - storage.Append(conn.ConversationId, message); - dialogs.Add(message); + // append output audio transcript to conversation + storage.Append(conn.ConversationId, message); + dialogs.Add(message); - foreach (var hook in hookProvider.HooksOrderByPriority) - { - hook.SetAgent(agent) - .SetConversation(conversation); + foreach (var hook in hookProvider.HooksOrderByPriority) + { + hook.SetAgent(agent) + .SetConversation(conversation); - await hook.OnResponseGenerated(message); - } + await hook.OnResponseGenerated(message); } } }, diff --git a/src/Plugins/BotSharp.Plugin.Twilio/Services/Stream/TwilioStreamMiddleware.cs b/src/Plugins/BotSharp.Plugin.Twilio/Services/Stream/TwilioStreamMiddleware.cs index 492b5dcf7..089663969 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/Services/Stream/TwilioStreamMiddleware.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/Services/Stream/TwilioStreamMiddleware.cs @@ -5,20 +5,25 @@ using BotSharp.Plugin.Twilio.Models.Stream; using Microsoft.AspNetCore.Http; using System.Net.WebSockets; +using System.Text.Json; +using System.Collections.Concurrent; +using System.Text; using Task = System.Threading.Tasks.Task; namespace BotSharp.Plugin.Twilio.Services.Stream; /// -/// Refrence to https://github.com/twilio-samples/speech-assistant-openai-realtime-api-node/blob/main/index.js +/// Reference to https://github.com/twilio-samples/speech-assistant-openai-realtime-api-node/blob/main/index.js /// public class TwilioStreamMiddleware { private readonly RequestDelegate _next; + private readonly ILogger _logger; - public TwilioStreamMiddleware(RequestDelegate next) + public TwilioStreamMiddleware(RequestDelegate next, ILogger logger) { _next = next; + _logger = logger; } public async Task Invoke(HttpContext httpContext) @@ -43,15 +48,23 @@ public async Task Invoke(HttpContext httpContext) private async Task HandleWebSocket(IServiceProvider services, string conversationId, WebSocket webSocket) { var hub = services.GetRequiredService(); - + var convService = services.GetRequiredService(); + + // Session state var conn = new RealtimeHubConnection { ConversationId = conversationId }; - // load conversation and state - var convService = services.GetRequiredService(); - convService.SetConversationId(conversationId, []); + // Variables for timestamp and interruption handling + string streamSid = null; + long latestMediaTimestamp = 0; + string lastAssistantItem = null; + var markQueue = new ConcurrentQueue(); + long? responseStartTimestampTwilio = null; + + // Load session and state + convService.SetConversationId(conversationId, new List()); var hooks = services.GetServices(); foreach (var hook in hooks) { @@ -59,56 +72,159 @@ private async Task HandleWebSocket(IServiceProvider services, string conversatio } convService.States.Save(); - await hub.Listen(webSocket, (receivedText) => + // Set up event handlers + conn.OnModelMessageReceived = message => { - var response = JsonSerializer.Deserialize(receivedText); - conn.StreamId = response.StreamSid; - conn.Event = response.Event switch + // Record last assistant item ID for interruption handling + if (!string.IsNullOrEmpty(conn.StreamId)) { - "start" => "user_connected", - "media" => "user_data_received", - "stop" => "user_disconnected", - _ => response.Event - }; + lastAssistantItem = conn.StreamId; + } - if (string.IsNullOrEmpty(conn.Event)) + // If this is the first delta of a new response, set the start timestamp + if (!responseStartTimestampTwilio.HasValue) { - return conn; + responseStartTimestampTwilio = latestMediaTimestamp; + _logger.LogDebug($"Setting start timestamp for new response: {responseStartTimestampTwilio}ms"); } - conn.OnModelMessageReceived = message => - new - { - @event = "media", - streamSid = response.StreamSid, - media = new { payload = message } - }; - conn.OnModelAudioResponseDone = () => - new - { - @event = "mark", - streamSid = response.StreamSid, - mark = new { name = "responsePart" } - }; - conn.OnModelUserInterrupted = () => - new - { - @event = "clear", - streamSid = response.StreamSid - }; + // Add mark to queue + markQueue.Enqueue("responsePart"); - if (response.Event == "start") + return new { - var startResponse = JsonSerializer.Deserialize(receivedText); - conn.Data = JsonSerializer.Serialize(startResponse.Body.CustomParameters); - } - else if (response.Event == "media") + @event = "media", + streamSid = conn.StreamId, + media = new { payload = message } + }; + }; + + conn.OnModelAudioResponseDone = () => + { + return new { - var mediaResponse = JsonSerializer.Deserialize(receivedText); - conn.Data = mediaResponse.Body.Payload; - } + @event = "mark", + streamSid = conn.StreamId, + mark = new { name = "responsePart" } + }; + }; + + conn.OnModelUserInterrupted = () => + { + // Reset states + markQueue.Clear(); + lastAssistantItem = null; + responseStartTimestampTwilio = null; + + return new + { + @event = "clear", + streamSid = conn.StreamId + }; + }; + + try + { + await hub.Listen(webSocket, receivedText => + { + var response = JsonSerializer.Deserialize(receivedText); + if (response == null) + { + _logger.LogWarning("Failed to parse received WebSocket message"); + return conn; + } + + conn.StreamId = response.StreamSid; + + switch (response.Event) + { + case "start": + conn.Event = "user_connected"; + streamSid = response.StreamSid; + _logger.LogInformation($"Incoming stream started: {streamSid}"); + + // Reset start and media timestamps + responseStartTimestampTwilio = null; + latestMediaTimestamp = 0; + + var startResponse = JsonSerializer.Deserialize(receivedText); + if (startResponse?.Body?.CustomParameters != null) + { + conn.Data = JsonSerializer.Serialize(startResponse.Body.CustomParameters); + } + break; + + case "media": + conn.Event = "user_data_received"; + var mediaResponse = JsonSerializer.Deserialize(receivedText); + if (mediaResponse?.Body != null) + { + conn.Data = mediaResponse.Body.Payload; + + // Update latest media timestamp + if (long.TryParse(mediaResponse.Body.Timestamp, out latestMediaTimestamp)) + { + _logger.LogDebug($"Received media message with timestamp: {latestMediaTimestamp}ms"); + } + + // Check if user started speaking (interruption handling) + if (markQueue.Count > 0 && responseStartTimestampTwilio.HasValue && + !string.IsNullOrEmpty(lastAssistantItem)) + { + // Detect voice activity - more complex logic can be added here + // e.g., check audio energy levels or use VAD (Voice Activity Detection) + + // If voice activity detected, handle interruption + if (ShouldHandleInterruption(mediaResponse.Body.Payload)) + { + conn.Event = "user_interrupted"; + long elapsedTime = latestMediaTimestamp - responseStartTimestampTwilio.Value; + _logger.LogDebug($"Calculating elapsed time for truncation: {latestMediaTimestamp} - {responseStartTimestampTwilio} = {elapsedTime}ms"); + } + } + } + break; + + case "mark": + // Handle mark event + if (markQueue.TryDequeue(out _)) + { + _logger.LogDebug("Processing mark event, removing one mark from queue"); + } + break; - return conn; - }); + case "stop": + conn.Event = "user_disconnected"; + break; + + default: + _logger.LogInformation($"Received non-media event: {response.Event}"); + break; + } + + return conn; + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in WebSocket communication"); + } + } + + // Simple interruption detection logic - can be extended as needed + private bool ShouldHandleInterruption(string audioPayload) + { + // Here should implement actual voice activity detection logic + // e.g., analyze audio energy levels or use VAD algorithm + + // Simple example - should be replaced with real detection logic in production + if (!string.IsNullOrEmpty(audioPayload)) + { + // Check if audio payload contains sufficient energy + // This is just a placeholder - needs actual VAD implementation + return false; // Default to false to avoid false interruptions + } + + return false; } }