diff --git a/api-reference/server/utilities/turn-management/user-mute-strategies.mdx b/api-reference/server/utilities/turn-management/user-mute-strategies.mdx index 3ab66ffd..5cf8bebc 100644 --- a/api-reference/server/utilities/turn-management/user-mute-strategies.mdx +++ b/api-reference/server/utilities/turn-management/user-mute-strategies.mdx @@ -190,6 +190,249 @@ user_aggregator, assistant_aggregator = LLMContextAggregatorPair( ) ``` +## Building Custom Strategies + +Subclass `BaseUserMuteStrategy` when none of the built-in strategies fit. A strategy only needs to answer one question per frame: should the user be muted right now? + +### The base class + +`BaseUserMuteStrategy` (in `pipecat.turns.user_mute`) exposes the following interface. The signatures below are a simplified view of what your subclass can override: + +```python +# Simplified interface; see the source for the full definition. +class BaseUserMuteStrategy: + async def setup(self, task_manager): ... + async def cleanup(self): ... + async def reset(self): ... + + async def process_frame(self, frame: Frame) -> bool: + """Return True if the user should be muted after this frame.""" + return False +``` + +Override `process_frame` to update internal state and return the current mute decision. Override `reset` if your strategy tracks turn-based state that should clear between conversations. + +### Which frames reach a strategy + +Each strategy's `process_frame` is called for every frame that passes through the user aggregator, **except** `StartFrame`, `EndFrame`, and `CancelFrame`. This includes: + +- User-direction frames from the input transport and STT: `TranscriptionFrame`, `InterimTranscriptionFrame`, `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`, `VADUserStartedSpeakingFrame`, `VADUserStoppedSpeakingFrame`, `InputAudioRawFrame`, `InterruptionFrame` +- `SystemFrame` broadcasts from elsewhere in the pipeline: `BotStartedSpeakingFrame`, `BotStoppedSpeakingFrame`, `FunctionCallsStartedFrame`, `FunctionCallResultFrame`, `FunctionCallCancelFrame` + + + Frames that don't naturally reach the user aggregator (for example + `LLMTextFrame` or `TTSTextFrame`, which flow downstream from the LLM or TTS) + won't be seen by a strategy directly. To react to those signals, place a + companion `FrameProcessor` where the frames do flow and have it toggle state + on your strategy. See [Toggling a strategy at + runtime](#toggling-a-strategy-at-runtime) below. + + +### Which frames get suppressed when muted + +Returning `True` from your strategy sets the aggregator's mute state. While muted, only these frame types are actually dropped: + +- `InterruptionFrame` +- `VADUserStartedSpeakingFrame`, `VADUserStoppedSpeakingFrame` +- `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame` +- `InputAudioRawFrame` +- `InterimTranscriptionFrame`, `TranscriptionFrame` + +All other frames continue to flow so the rest of the pipeline keeps functioning. + +### Example: a simple custom strategy + +Mute the user whenever the bot is speaking, but only after a specific number of bot turns: + +```python +from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame +from pipecat.turns.user_mute import BaseUserMuteStrategy + + +class AfterNTurnsUserMuteStrategy(BaseUserMuteStrategy): + def __init__(self, mute_after_turn: int = 3): + super().__init__() + self._mute_after_turn = mute_after_turn + self._bot_turns = 0 + self._bot_speaking = False + + async def reset(self): + self._bot_turns = 0 + self._bot_speaking = False + + async def process_frame(self, frame: Frame) -> bool: + await super().process_frame(frame) + + if isinstance(frame, BotStartedSpeakingFrame): + self._bot_speaking = True + elif isinstance(frame, BotStoppedSpeakingFrame): + self._bot_speaking = False + self._bot_turns += 1 + + return self._bot_speaking and self._bot_turns >= self._mute_after_turn +``` + +### Toggling a strategy at runtime + +Strategies are plain Python objects. Anything that holds a reference to one can flip its state between frames, which means a companion processor placed elsewhere in the pipeline can drive the mute decision based on signals the strategy can't observe directly (LLM text, tool results, external events). + +This example strategy adds its own `enable`/`disable` methods (not part of the base contract) and returns their state from `process_frame`: + +```python +from pipecat.frames.frames import Frame +from pipecat.turns.user_mute import BaseUserMuteStrategy + + +class ToggleableUserMuteStrategy(BaseUserMuteStrategy): + def __init__(self): + super().__init__() + self._muted = False + + def enable(self): + self._muted = True + + def disable(self): + self._muted = False + + async def reset(self): + self._muted = False + + async def process_frame(self, frame: Frame) -> bool: + await super().process_frame(frame) + return self._muted +``` + +A companion processor watches for the trigger and toggles the strategy: + +```python +from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, + Frame, + LLMTextFrame, +) +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class DisclaimerGuardProcessor(FrameProcessor): + def __init__(self, strategy: ToggleableUserMuteStrategy, trigger_phrase: str, **kwargs): + super().__init__(**kwargs) + self._strategy = strategy + self._trigger = trigger_phrase + # Keep a small sliding window so cross-frame matches work without + # the buffer growing unbounded if the trigger never appears. + self._max_buffer = max(len(trigger_phrase) * 4, 512) + self._buffer = "" + self._active = False + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, BotStartedSpeakingFrame): + # Start each bot turn with a fresh buffer. + self._buffer = "" + elif isinstance(frame, LLMTextFrame) and direction == FrameDirection.DOWNSTREAM: + self._buffer = (self._buffer + frame.text)[-self._max_buffer :] + if not self._active and self._trigger in self._buffer: + self._active = True + self._strategy.enable() + elif isinstance(frame, BotStoppedSpeakingFrame) and self._active: + self._active = False + self._buffer = "" + self._strategy.disable() + + await self.push_frame(frame, direction) +``` + +Wire them together by passing the same strategy instance to both the aggregator and the processor: + +```python +mute_strategy = ToggleableUserMuteStrategy() + +user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams(user_mute_strategies=[mute_strategy]), +) + +disclaimer_guard = DisclaimerGuardProcessor( + strategy=mute_strategy, + trigger_phrase="Please read the following disclosure", +) + +pipeline = Pipeline([ + transport.input(), + stt, + user_aggregator, + llm, + disclaimer_guard, # positioned where LLMTextFrame flows downstream + tts, + transport.output(), + assistant_aggregator, +]) +``` + +### Example: mute for the first N words of the bot speaking + +Count words as the LLM streams text and keep the user muted until the threshold is reached. The strategy owns the counter and resets it each turn; a companion processor feeds it text: + +```python +from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame +from pipecat.turns.user_mute import BaseUserMuteStrategy + + +class FirstNWordsUserMuteStrategy(BaseUserMuteStrategy): + def __init__(self, word_count: int = 10): + super().__init__() + self._threshold = word_count + self._words_seen = 0 + self._bot_speaking = False + + def add_words(self, text: str): + self._words_seen += len(text.split()) + + async def reset(self): + self._words_seen = 0 + self._bot_speaking = False + + async def process_frame(self, frame: Frame) -> bool: + await super().process_frame(frame) + + if isinstance(frame, BotStartedSpeakingFrame): + self._bot_speaking = True + self._words_seen = 0 + elif isinstance(frame, BotStoppedSpeakingFrame): + self._bot_speaking = False + + return self._bot_speaking and self._words_seen < self._threshold +``` + +```python +from pipecat.frames.frames import Frame, LLMTextFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + + +class LLMTextWordCounter(FrameProcessor): + def __init__(self, strategy: FirstNWordsUserMuteStrategy, **kwargs): + super().__init__(**kwargs) + self._strategy = strategy + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, LLMTextFrame) and direction == FrameDirection.DOWNSTREAM: + self._strategy.add_words(frame.text) + + await self.push_frame(frame, direction) +``` + + + Because mute decisions are only re-evaluated when frames pass through the + aggregator, the unmute point here aligns with the next user or bot frame after + the threshold is crossed, not the exact word boundary. For tighter control, + drop the word count and gate on a sentinel phrase the LLM emits at the end of + the protected section, as shown in the disclaimer example above. + + ## Event Handlers You can register event handlers to be notified when user muting starts or stops. This is useful for observability or providing feedback to users.