From 11046ff3a90bd00fcf4458c3420f4e22756b840b Mon Sep 17 00:00:00 2001 From: emranemran Date: Mon, 13 Apr 2026 20:26:22 -0700 Subject: [PATCH] feat: add Kafka error events to Livepeer cloud mode Publish connection error events to Kafka in LivepeerConnection, matching the existing CloudConnectionManager behavior. Uses the same error_type values (cloud_connection_failed, cloud_disconnect_error) and metadata shape (app_id) so Kafka consumers require no changes. Signed-off-by: emranemran Signed-off-by: emranemran --- src/scope/server/livepeer.py | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/scope/server/livepeer.py b/src/scope/server/livepeer.py index 696d216db..713a45b5d 100644 --- a/src/scope/server/livepeer.py +++ b/src/scope/server/livepeer.py @@ -16,6 +16,7 @@ import numpy as np from av import AudioFrame, VideoFrame +from .kafka_publisher import publish_event # noqa: F401 from .livepeer_client import LivepeerClient logger = logging.getLogger(__name__) @@ -74,6 +75,33 @@ def webrtc_connected(self) -> bool: """Whether an active Livepeer job is running.""" return self._client is not None and self._client.media_connected + def _publish_cloud_error( + self, + error_message: str, + exception_type: str, + error_type: str, + extra_error_fields: dict | None = None, + ) -> None: + """Publish a cloud connection error event (fire-and-forget).""" + error = { + "error_type": error_type, + "message": error_message, + "exception_type": exception_type, + "recoverable": True, + } + if extra_error_fields: + error.update(extra_error_fields) + + publish_event( + event_type="error", + connection_id=( + self._client.connection_id if self._client is not None else None + ), + user_id=self._user_id, + error=error, + metadata={"app_id": "livepeer"}, + ) + def configure(self) -> None: """Enable Livepeer backend mode.""" self._configured = True @@ -133,6 +161,11 @@ async def connect( self._connect_error = str(e) self._last_close_reason = str(e) logger.error(f"Failed to connect job: {e}") + self._publish_cloud_error( + str(e), + type(e).__name__, + error_type="cloud_connection_failed", + ) try: await client.disconnect() except Exception: @@ -167,6 +200,11 @@ async def _do_connect(): self._connect_error = str(e) self._last_close_reason = str(e) logger.exception(f"Background connect failed: {e}") + self._publish_cloud_error( + str(e), + type(e).__name__, + error_type="cloud_connection_failed", + ) self._connecting = True self._connect_error = None @@ -206,8 +244,18 @@ async def disconnect(self) -> None: logger.warning( "Livepeer client disconnect timed out after 15s, forcing cleanup" ) + self._publish_cloud_error( + "Livepeer client disconnect timed out after 15s", + "TimeoutError", + error_type="cloud_disconnect_error", + ) except Exception as e: logger.warning(f"Error during Livepeer client disconnect: {e}") + self._publish_cloud_error( + str(e), + type(e).__name__, + error_type="cloud_disconnect_error", + ) self._client = None self._configured = False self._connecting = False