diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index 3a48ed49d..d0a57ee4b 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -28,6 +28,121 @@ RUNNER_FAILURE_WINDOW_SECONDS = 60.0 ASSETS_DIR_PATH = "/tmp/.daydream-scope/assets" + +# --------------------------------------------------------------------------- +# Kafka publisher — matches fal_app.py KafkaPublisher for event parity +# --------------------------------------------------------------------------- + +kafka_publisher: "KafkaPublisher | None" = None + + +class KafkaPublisher: + """Async Kafka event publisher for fal.ai websocket events.""" + + def __init__(self): + self._producer = None + self._started = False + self._topic = None + + async def start(self) -> bool: + """Start the Kafka producer.""" + import json as _json # noqa: F811 + + bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS") + self._topic = os.getenv("KAFKA_TOPIC", "network_events") + sasl_username = os.getenv("KAFKA_SASL_USERNAME") + sasl_password = os.getenv("KAFKA_SASL_PASSWORD") + + print( + f"[Kafka] Starting publisher (KAFKA_BOOTSTRAP_SERVERS={bootstrap_servers})" + ) + if not bootstrap_servers: + print("[Kafka] Not configured, event publishing disabled") + return False + + try: + from aiokafka import AIOKafkaProducer + + config = { + "bootstrap_servers": bootstrap_servers, + "value_serializer": lambda v: _json.dumps(v).encode("utf-8"), + "key_serializer": lambda k: k.encode("utf-8") if k else None, + } + + if sasl_username and sasl_password: + import ssl + + ssl_context = ssl.create_default_context() + config.update( + { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "PLAIN", + "sasl_plain_username": sasl_username, + "sasl_plain_password": sasl_password, + "ssl_context": ssl_context, + } + ) + + self._producer = AIOKafkaProducer(**config) + await self._producer.start() + self._started = True + print(f"[Kafka] Publisher started, topic: {self._topic}") + return True + + except ImportError: + print("[Kafka] aiokafka not installed, Kafka disabled") + return False + except Exception as e: + print(f"[Kafka] Failed to start producer: {e}") + return False + + async def stop(self): + """Stop the Kafka producer.""" + if self._producer and self._started: + try: + await self._producer.stop() + print("[Kafka] Publisher stopped") + except Exception as e: + print(f"[Kafka] Error stopping producer: {e}") + finally: + self._started = False + self._producer = None + + async def publish(self, event_type: str, data: dict) -> bool: + """Publish an event to Kafka.""" + import uuid as _uuid + + if not self._started or not self._producer: + return False + + event_id = str(_uuid.uuid4()) + timestamp_ms = str(int(time.time() * 1000)) + + event = { + "id": event_id, + "type": "stream_trace", + "timestamp": timestamp_ms, + "data": { + "type": event_type, + "client_source": "scope", + "timestamp": timestamp_ms, + **data, + }, + } + + try: + await self._producer.send_and_wait(self._topic, value=event, key=event_id) + print(f"[Kafka] Published event: {event_type}") + return True + except Exception as e: + print(f"[Kafka] Failed to publish event {event_type}: {e}") + return False + + @property + def is_running(self) -> bool: + return self._started + + # Gates startup cleanup so only one cleanup run executes at a time. _cleanup_event: asyncio.Event | None = None @@ -148,6 +263,8 @@ def _build_runner_command() -> list[str]: "run", "--extra", "livepeer", + "--extra", + "kafka", "livepeer-runner", "--host", RUNNER_HOST, @@ -162,6 +279,7 @@ async def _proxy_ws(client_ws: WebSocket) -> None: Raises WebSocketDisconnect if the client disconnects. Returns normally if the runner connection drops. """ + import websockets from websockets.exceptions import ConnectionClosed @@ -228,6 +346,7 @@ class LivepeerScopeApp(fal.App, keep_alive=300): requirements = [ "websockets", "httpx", + "aiokafka", ] def setup(self): @@ -266,6 +385,11 @@ def setup(self): "DAYDREAM_SCOPE_BUNDLED_PLUGINS_FILE", "LIVEPEER_DEBUG", "UV_CACHE_DIR", + # Kafka (for scope.server.kafka_publisher in the runner subprocess) + "KAFKA_BOOTSTRAP_SERVERS", + "KAFKA_TOPIC", + "KAFKA_SASL_USERNAME", + "KAFKA_SASL_PASSWORD", ] runner_env = {k: os.environ[k] for k in env_allowlist if k in os.environ} runner_env.setdefault("UV_CACHE_DIR", "/tmp/uv-cache") @@ -317,6 +441,46 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: await client_ws.accept() + # Initialize Kafka publisher (lazy, once per process). + global kafka_publisher + if kafka_publisher is None: + kafka_publisher = KafkaPublisher() + await kafka_publisher.start() + + connection_start_time = time.time() + metadata: dict = {} + manifest_id = client_ws.headers.get("manifest-id") + user_id = client_ws.headers.get("daydream-user-id") + metadata["manifest_id"] = manifest_id + metadata["user_id"] = user_id + + import json + + fal_log_labels_raw = os.getenv("FAL_LOG_LABELS", "unknown") + try: + fal_log_labels = json.loads(fal_log_labels_raw) + except (json.JSONDecodeError, TypeError): + fal_log_labels = fal_log_labels_raw + + connection_info = { + "gpu_type": LivepeerScopeApp.machine_type, + "fal_region": os.getenv("NOMAD_DC", "unknown"), + "fal_runner_id": os.getenv( + "FAL_JOB_ID", os.getenv("FAL_RUNNER_ID", "unknown") + ), + "fal_log_labels": fal_log_labels, + } + metadata["connection_info"] = connection_info + if kafka_publisher is not None and kafka_publisher.is_running: + await kafka_publisher.publish( + "websocket_connected", + { + "user_id": user_id, + "connection_id": manifest_id, + "connection_info": connection_info, + }, + ) + # Ensure any previous session data is cleaned up event = _get_cleanup_event() await event.wait() @@ -358,6 +522,21 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: except Exception as exc: print(f"Livepeer fal ws proxy error: {type(exc).__name__}: {exc}") finally: + if kafka_publisher and kafka_publisher.is_running: + end_time = time.time() + elapsed_ms = int((end_time - connection_start_time) * 1000) + await kafka_publisher.publish( + "websocket_disconnected", + { + "user_id": metadata.get("user_id"), + "connection_id": metadata.get("manifest_id"), + "connection_info": connection_info, + "duration_ms": elapsed_ms, + "session_start_time_ms": int(connection_start_time * 1000), + "session_end_time_ms": int(end_time * 1000), + }, + ) + await run_cleanup() with suppress(Exception): await client_ws.close()