diff --git a/source/_remoteClient/localMachine.py b/source/_remoteClient/localMachine.py index 7e1f8274989..34a095920e6 100644 --- a/source/_remoteClient/localMachine.py +++ b/source/_remoteClient/localMachine.py @@ -27,7 +27,6 @@ import api import braille from config.registry import RegistryKey -import inputCore import nvwave import speech import tones @@ -126,9 +125,6 @@ def __init__(self) -> None: self.receivingBraille = False - self._cachedSizes: list[int] | None = None - """Cached braille display sizes from remote machines""" - self._showingLocalUiMessage: bool = False """Whether we're currently showing a `ui.message` while showing remote braille.""" @@ -257,41 +253,7 @@ def brailleInput(self, **kwargs: dict[str, Any]) -> None: :param kwargs: Gesture parameters passed to BrailleInputGesture :note: Silently ignores gestures that have no associated action. """ - try: - inputCore.manager.executeGesture(input.BrailleInputGesture(**kwargs)) - except inputCore.NoInputGestureAction: - pass - - def setBrailleDisplaySize(self, sizes: list[int]) -> None: - """Cache remote braille display sizes for size negotiation. - - :param sizes: List of display sizes (cells) from remote machines - """ - self._cachedSizes = sizes - - def _handleFilterDisplayDimensions(self, value: braille.DisplayDimensions) -> braille.DisplayDimensions: - """Filter the local display dimensions based on remote display dimensions. - - Determines the optimal display dimensions when sharing braille output by - finding the smallest positive width among local and remote displays. - - .. note:: - We can currently only support a single line of braille, - as sending display dimensions would require changing the Remote Access protocol. - - :param value: Local display dimensions - :return: The negotiated display dimensions to use. - """ - if not self._cachedSizes: - # We cannot support multiline displays without breaking the Remote Access protocol, - # so always force numRows to 1. - return value._replace(numRows=1) - # There is no point storing the number of rows if we are always going to set it to 1. - sizes = self._cachedSizes + [value.numCols] - try: - return braille.DisplayDimensions(numRows=1, numCols=min(i for i in sizes if i > 0)) - except ValueError: - return value._replace(numRows=1) + braille.injectGesture(input.BrailleInputGesture(**kwargs)) def handleDecideEnabled(self) -> bool: """Determine if the local braille display should be enabled. diff --git a/source/_remoteClient/session.py b/source/_remoteClient/session.py index d068b30a8c3..d73dfc6be6a 100644 --- a/source/_remoteClient/session.py +++ b/source/_remoteClient/session.py @@ -270,6 +270,26 @@ def connectedClientsCount(self) -> int: return self.connectedLeadersCount + self.connectedFollowersCount +class _FollowerBrailleMirror(braille.BrailleMirror): + """BrailleMirror that forwards display updates to connected leader machines. + + Registered while at least one leader with a braille display is connected. + :meth:`numCells` returns the smallest positive leader display size so NVDA + negotiates a compatible display width. + """ + + def __init__(self, session: "FollowerSession") -> None: + self._session = session + + def display(self, cells: list[int]) -> None: + if self._session.hasBrailleLeaders(): + self._session.transport.send(type=RemoteMessageType.DISPLAY, cells=cells) + + def numCells(self) -> int: + sizes = [s for s in self._session.leaderDisplaySizes if s > 0] + return min(sizes) if sizes else 0 + + class FollowerSession(RemoteSession): """Session that runs on the controlled (follower) NVDA instance. @@ -302,6 +322,9 @@ def __init__( self.leaders = defaultdict(dict) self.leaderDisplaySizes = [] self.followers = set() + self._brailleMirror = _FollowerBrailleMirror(self) + # The remote protocol only supports single-row braille; force numRows to 1. + braille.filter_displayDimensions.register(self._filterNumRowsToOne) self.transport.transportClosing.register(self.handleTransportClosing) self.transport.registerInbound( RemoteMessageType.CHANNEL_JOINED, @@ -315,9 +338,6 @@ def __init__( RemoteMessageType.SET_DISPLAY_SIZE, self.setDisplaySize, ) - braille.filter_displayDimensions.register( - self.localMachine._handleFilterDisplayDimensions, - ) self.transport.registerInbound( RemoteMessageType.BRAILLE_INPUT, self.localMachine.brailleInput, @@ -340,7 +360,7 @@ def registerCallbacks(self) -> None: ) self.transport.registerOutbound(decide_playWaveFile, RemoteMessageType.WAVE) self.transport.registerOutbound(post_speechPaused, RemoteMessageType.PAUSE_SPEECH) - braille.pre_writeCells.register(self.display) + braille.registerMirror(self._brailleMirror) pre_speechQueued.register(self.sendSpeech) self.callbacksAdded = True @@ -351,7 +371,7 @@ def unregisterCallbacks(self) -> None: self.transport.unregisterOutbound(RemoteMessageType.CANCEL) self.transport.unregisterOutbound(RemoteMessageType.WAVE) self.transport.unregisterOutbound(RemoteMessageType.PAUSE_SPEECH) - braille.pre_writeCells.unregister(self.display) + braille.unregisterMirror(self._brailleMirror) pre_speechQueued.unregister(self.sendSpeech) self.callbacksAdded = False @@ -382,6 +402,7 @@ def handleTransportClosing(self) -> None: to ensure clean shutdown of remote features. """ self.unregisterCallbacks() + braille.filter_displayDimensions.unregister(self._filterNumRowsToOne) def handleTransportDisconnected(self) -> None: """Handle disconnection from the transport layer. @@ -408,7 +429,14 @@ def setDisplaySize(self, sizes: list[int] | None = None) -> None: sizes if sizes else [info.get("braille_numCells", 0) for info in self.leaders.values()] ) log.debug(f"Setting follower display size to: {self.leaderDisplaySizes!r}") - self.localMachine.setBrailleDisplaySize(self.leaderDisplaySizes) + + def _filterNumRowsToOne(self, value: braille.DisplayDimensions) -> braille.DisplayDimensions: + """Force single-row braille output. + + The remote protocol does not support multi-row displays, + so we always constrain numRows to 1. + """ + return value._replace(numRows=1) def handleBrailleInfo( self, @@ -451,15 +479,6 @@ def pauseSpeech(self, switch: bool) -> None: """Toggle speech pause state on leader instances.""" self.transport.send(type=RemoteMessageType.PAUSE_SPEECH, switch=switch) - def display(self, cells: list[int]) -> None: - """Forward braille display content to leader instances. - - Only sends braille data if there are connected leaders with braille displays. - """ - # Only send braille data when there are controlling machines with a braille display - if self.hasBrailleLeaders(): - self.transport.send(type=RemoteMessageType.DISPLAY, cells=cells) - def hasBrailleLeaders(self) -> bool: """Check if any connected leaders have braille displays. diff --git a/source/braille.py b/source/braille.py index c1c0abd4d8c..2e8ed075934 100644 --- a/source/braille.py +++ b/source/braille.py @@ -1,8 +1,8 @@ # A part of NonVisual Desktop Access (NVDA) # This file is covered by the GNU General Public License. # See the file COPYING for more details. -# Copyright (C) 2008-2025 NV Access Limited, Joseph Lee, Babbage B.V., Davy Kager, Bram Duvigneau, -# Leonard de Ruijter, Burman's Computer and Education Ltd., Julien Cochuyt +# Copyright (C) 2008-2026 NV Access Limited, Joseph Lee, Babbage B.V., Davy Kager, Bram Duvigneau, +# Leonard de Ruijter, Burman's Computer and Education Ltd., Julien Cochuyt, Pneuma Solutions from enum import StrEnum import itertools @@ -67,6 +67,7 @@ import hwPortUtils import bdDetect import queueHandler +import winUser import brailleViewer from autoSettingsUtils.driverSetting import BooleanDriverSetting, NumericDriverSetting from utils.security import objectBelowLockScreenAndWindowsIsLocked, post_sessionLockStateChanged @@ -3434,6 +3435,9 @@ def initialize(): handler = BrailleHandler() handler.handlePostConfigProfileSwitch() config.post_configProfileSwitch.register(handler.handlePostConfigProfileSwitch) + import braillePipeServer + + braillePipeServer.initialize() def pumpAll(): @@ -3443,6 +3447,9 @@ def pumpAll(): def terminate(): global handler + import braillePipeServer + + braillePipeServer.terminate() handler.terminate() handler = None @@ -3969,6 +3976,177 @@ def getDisplayTextForIdentifier(cls, identifier): inputCore.registerGestureSource("br", BrailleDisplayGesture) +class BrailleMirror: + """Abstract base class for a braille mirror. + + A mirror intercepts every braille display update and can optionally influence the negotiated display width. + Both the physical display and all registered mirrors receive the same cells simultaneously; the mirror does **not** suppress the local display. + Register an instance with :func:`registerMirror` and remove it with :func:`unregisterMirror`. To inject a gesture back into NVDA (e.g. a routing key received over a remote channel) use :func:`injectGesture`. + """ + + def display(self, cells: List[int]) -> None: + """Called with the full cell array on every display update. + + :param cells: The braille cells written to the display. + """ + + def numCells(self) -> int: + """Return the number of cells this mirror can show. + + Return 0 (the default) to have no effect on the negotiated display + width. A positive value caps the display width used by + :data:`filter_displayDimensions` to the smallest value across all registered mirrors and the physical display. + """ + return 0 + + +_registeredMirrors: List["BrailleMirror"] = [] + + +def _mirrorPreWriteCells(cells: List[int], **kwargs) -> None: + for mirror in _registeredMirrors: + mirror.display(cells) + + +def _mirrorFilterDisplayDimensions(value: DisplayDimensions) -> DisplayDimensions: + sizes = [m.numCells() for m in _registeredMirrors if m.numCells() > 0] + if not sizes: + return value + cap = min(sizes) + if cap >= value.numCols: + return value + return value._replace(numCols=cap) + + +def registerMirror(mirror: BrailleMirror) -> None: + """Register *mirror* to receive braille display updates. + + :meth:`BrailleMirror.display` will be called on the main thread for every subsequent :meth:`BrailleHandler._writeCells` call. If *mirror* returns a positive value from :meth:`BrailleMirror.numCells`, it will also participate in display-width negotiation via :data:`filter_displayDimensions`. + """ + if not _registeredMirrors: + pre_writeCells.register(_mirrorPreWriteCells) + filter_displayDimensions.register(_mirrorFilterDisplayDimensions) + _registeredMirrors.append(mirror) + if handler: + handler._refreshEnabled(block=True) + + +def unregisterMirror(mirror: BrailleMirror) -> None: + """Remove a previously registered mirror. + + Safe to call even if *mirror* is not currently registered. + """ + try: + _registeredMirrors.remove(mirror) + except ValueError: + return + if not _registeredMirrors: + pre_writeCells.unregister(_mirrorPreWriteCells) + filter_displayDimensions.unregister(_mirrorFilterDisplayDimensions) + if handler: + handler._refreshEnabled(block=True) + + +def injectGesture(gesture: BrailleDisplayGesture) -> None: + """Inject *gesture* into NVDA's input pipeline. + + This is a thin wrapper around :func:`inputCore.manager.executeGesture` that silently swallows :class:`inputCore.NoInputGestureAction` so callers do not need to handle the common case where no script is bound. + + Thread safety: must be called on the main thread, or scheduled via ``wx.CallAfter`` from a background thread. + """ + try: + inputCore.manager.executeGesture(gesture) + except inputCore.NoInputGestureAction: + pass + + +class DirectBrailleWindow: + """Take over braille output and input while a specific window has focus. + + When the window identified by *hwnd* is the foreground window, NVDA's own braille rendering is suppressed. The application drives what appears on the physical display by calling :meth:`display`, and all braille gestures are forwarded to :meth:`onGesture` instead of being processed by NVDA. + When the window loses focus, normal NVDA rendering resumes automatically. + + :param hwnd: The HWND of the window that triggers direct braille mode. + :param numCells: Advertised display width; 0 means use whatever NVDA provides. A positive value caps the negotiated display width via :data:`filter_displayDimensions`. + """ + + def __init__(self, hwnd: int, numCells: int = 0) -> None: + self._hwnd = hwnd + self._numCells = numCells + self._active = False + + def _isForeground(self) -> bool: + """Return True if our registered window is currently in the foreground.""" + fg = winUser.getForegroundWindow() + return fg == self._hwnd or winUser.isDescendantWindow(self._hwnd, fg) + + def display(self, cells: List[int]) -> None: + """Push *cells* to the physical braille display. + + Has no effect if the registered window is not currently foreground or if no braille display is connected. + + Thread safety: safe to call from any thread; the actual write is dispatched to the main thread via ``wx.CallAfter``. + """ + if handler and self._isForeground(): + wx.CallAfter(handler._writeCells, cells) + + def onGesture(self, gesture: BrailleDisplayGesture) -> None: + """Called when a braille gesture arrives while this window is active. + + The default implementation does nothing, so gestures are suppressed. Subclass and override to handle them. + + :param gesture: The braille gesture that was intercepted. + """ + + def _handleDecideEnabled(self) -> bool: + return not self._isForeground() + + def _handleDecideExecuteGesture(self, gesture: inputCore.InputGesture) -> bool: + if not self._isForeground(): + return True + if isinstance(gesture, BrailleDisplayGesture): + self.onGesture(gesture) + return False + return True + + def _handleFilterDisplayDimensions(self, value: DisplayDimensions) -> DisplayDimensions: + if self._numCells <= 0 or not self._isForeground(): + return value + if self._numCells >= value.numCols: + return value + return value._replace(numCols=self._numCells) + + def activate(self) -> None: + """Start intercepting braille output and input for the registered window. + + Safe to call even if already active (subsequent calls are no-ops). + """ + if self._active: + return + self._active = True + decide_enabled.register(self._handleDecideEnabled) + inputCore.decide_executeGesture.register(self._handleDecideExecuteGesture) + if self._numCells > 0: + filter_displayDimensions.register(self._handleFilterDisplayDimensions) + if handler: + handler._refreshEnabled(block=True) + + def deactivate(self) -> None: + """Stop intercepting braille, restoring NVDA's normal rendering. + + Safe to call even if not currently active. + """ + if not self._active: + return + self._active = False + decide_enabled.unregister(self._handleDecideEnabled) + inputCore.decide_executeGesture.unregister(self._handleDecideExecuteGesture) + if self._numCells > 0: + filter_displayDimensions.unregister(self._handleFilterDisplayDimensions) + if handler: + handler._refreshEnabled(block=True) + + def getSerialPorts(filterFunc=None) -> typing.Iterator[typing.Tuple[str, str]]: """Get available serial ports in a format suitable for L{BrailleDisplayDriver.getManualPorts}. @param filterFunc: a function executed on every dictionary retrieved using L{hwPortUtils.listComPorts}. diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py new file mode 100644 index 00000000000..3cbf5f240ef --- /dev/null +++ b/source/braillePipeServer.py @@ -0,0 +1,480 @@ +# A part of NonVisual Desktop Access (NVDA) +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +# Copyright (C) 2026 Pneuma Solutions + +"""Named-pipe IPC server that exposes the Braille Mirror and Direct Braille Window APIs to external processes. + +Two pipe names are used: + +* ``\\\\.\\pipe\\screen_reader_braille`` – normal desktop instance +* ``\\\\.\\pipe\\screen_reader_braille_secure`` – secure desktop instance + +Wire protocol: +Every message is length-prefixed: 4 bytes little-endian uint32 body length, followed by a UTF-8 JSON object. + +One connection handles one role (mirror or direct braille). +""" + +import ctypes +import ctypes.wintypes +import json +import queue +import struct +import threading +from typing import Optional + +import wx + +import braille +from logHandler import log +from utils.security import isRunningOnSecureDesktop + +_kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] +_advapi32 = ctypes.windll.advapi32 # type: ignore[attr-defined] + +INVALID_HANDLE_VALUE = ctypes.wintypes.HANDLE(-1).value +ERROR_PIPE_CONNECTED = 535 +ERROR_BROKEN_PIPE = 109 +ERROR_IO_PENDING = 997 +PIPE_ACCESS_DUPLEX = 0x00000003 +PIPE_TYPE_BYTE = 0x00000000 +PIPE_READMODE_BYTE = 0x00000000 +PIPE_WAIT = 0x00000000 +PIPE_UNLIMITED_INSTANCES = 255 +NMPWAIT_USE_DEFAULT_WAIT = 0 +FILE_FLAG_OVERLAPPED = 0x40000000 + +SDDL_REVISION_1 = 1 + +# SDDL for the normal-desktop pipe. +# The normal-desktop NVDA instance runs as the logged-in user. Its default token DACL grants that user and local Administrators access, but does NOT reliably include NT AUTHORITY\SYSTEM, which is the identity used by RIM's elevated service component. +_NORMAL_PIPE_SDDL = "D:(A;;GA;;;OW)(A;;GRGW;;;SY)" + + +class _SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.c_void_p), + ("bInheritHandle", ctypes.wintypes.BOOL), + ] + + def __init__(self, **kwargs): + super().__init__(nLength=ctypes.sizeof(self), **kwargs) + + +def _buildSystemAccessSA() -> tuple["_SECURITY_ATTRIBUTES", ctypes.c_void_p]: + """Build a SECURITY_ATTRIBUTES granting SYSTEM read/write on the normal-desktop pipe. + + Returns ``(sa, sd)`` where *sd* is the LocalAlloc'd security descriptor that must be freed with ``_kernel32.LocalFree(sd)`` when the pipe server stops. + """ + sd = ctypes.c_void_p() + ok = _advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW( + _NORMAL_PIPE_SDDL, + SDDL_REVISION_1, + ctypes.byref(sd), + None, + ) + if not ok: + raise ctypes.WinError() + sa = _SECURITY_ATTRIBUTES(lpSecurityDescriptor=sd) + return sa, sd + + +def _createPipeInstance( + name: str, + sa: Optional["_SECURITY_ATTRIBUTES"] = None, +) -> ctypes.wintypes.HANDLE: + """Create a single named-pipe instance and return its handle.""" + handle = _kernel32.CreateNamedPipeW( + name, + PIPE_ACCESS_DUPLEX, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + NMPWAIT_USE_DEFAULT_WAIT, + ctypes.byref(sa) if sa is not None else None, + ) + if handle == INVALID_HANDLE_VALUE: + raise ctypes.WinError() + return handle + + +def _connectClient(handle: ctypes.wintypes.HANDLE) -> bool: + """Block until a client connects. Return False if the pipe is broken.""" + result = _kernel32.ConnectNamedPipe(handle, None) + if result: + return True + err = ctypes.GetLastError() + if err == ERROR_PIPE_CONNECTED: + return True + return False + + +def _readExact(handle: ctypes.wintypes.HANDLE, n: int) -> Optional[bytes]: + """Read exactly *n* bytes from *handle*. Return None on pipe break.""" + buf = (ctypes.c_char * n)() + total = 0 + while total < n: + read = ctypes.wintypes.DWORD(0) + ok = _kernel32.ReadFile(handle, ctypes.byref(buf, total), n - total, ctypes.byref(read), None) + if not ok or read.value == 0: + return None + total += read.value + return bytes(buf) + + +def _writeAll(handle: ctypes.wintypes.HANDLE, data: bytes) -> bool: + """Write all of *data* to *handle*. Return False on pipe break.""" + offset = 0 + while offset < len(data): + written = ctypes.wintypes.DWORD(0) + ok = _kernel32.WriteFile( + handle, + ctypes.c_char_p(data[offset:]), + len(data) - offset, + ctypes.byref(written), + None, + ) + if not ok or written.value == 0: + return False + offset += written.value + return True + + +def _frameMessage(obj: dict) -> bytes: + body = json.dumps(obj).encode("utf-8") + return struct.pack(" bool: + return _writeAll(handle, _frameMessage(obj)) + + +def _recvMessage(handle: ctypes.wintypes.HANDLE) -> Optional[dict]: + header = _readExact(handle, 4) + if header is None: + return None + (length,) = struct.unpack(" None: + super().__init__() + self._source = source + self._model = model or None + self._id = id_ + self.routingIndex = routingIndex + self.dots = dots + self.space = space + + def _get_source(self) -> str: + return self._source + + def _get_model(self) -> str: + return self._model + + def _get_id(self) -> str: + return self._id + + +class _AsyncWriter: + """Background writer thread that drains a queue to a pipe handle. + + All sends from NVDA's main-thread callbacks (display updates, gesture + notifications) go through here so pipe I/O never blocks the core loop. + Sending ``None`` to the queue is the stop sentinel. + """ + + def __init__(self, handle: ctypes.wintypes.HANDLE) -> None: + self._handle = handle + self._queue: queue.SimpleQueue = queue.SimpleQueue() + self._thread = threading.Thread(target=self._loop, daemon=True, name="braillePipeWriter") + self._thread.start() + + def send(self, obj: dict) -> None: + """Enqueue *obj* for asynchronous delivery to the pipe client.""" + self._queue.put(_frameMessage(obj)) + + def stop(self) -> None: + """Signal the writer thread to stop after draining remaining items.""" + self._queue.put(None) + + def _loop(self) -> None: + while True: + frame = self._queue.get() + if frame is None: + break + if not _writeAll(self._handle, frame): + break + + +class _MirrorSession(braille.BrailleMirror): + """BrailleMirror that forwards display updates over the pipe.""" + + def __init__(self, handle: ctypes.wintypes.HANDLE, numCells: int) -> None: + self._numCells = numCells + self._writer = _AsyncWriter(handle) + braille.registerMirror(self) + braille.displaySizeChanged.register(self._handleDisplaySizeChanged) + if braille.handler: + self._writer.send({"type": "display_size", "numCols": braille.handler.displayDimensions.numCols}) + + def numCells(self) -> int: + return self._numCells + + def display(self, cells: list) -> None: + self._writer.send({"type": "display", "cells": cells}) + + def _handleDisplaySizeChanged(self, displaySize: int, numRows: int, numCols: int) -> None: + self._writer.send({"type": "display_size", "numCols": numCols}) + + def handleMessage(self, msg: dict) -> None: + mtype = msg.get("type") + if mtype == "inject_gesture": + gesture = _PipeBrailleGesture( + source=msg.get("source", ""), + model=msg.get("model", ""), + id_=msg.get("id", ""), + routingIndex=msg.get("routingIndex"), + dots=msg.get("dots", 0), + space=msg.get("space", False), + ) + wx.CallAfter(braille.injectGesture, gesture) + else: + log.debug(f"braillePipeServer: unexpected mirror message type {mtype!r}") + + def close(self) -> None: + braille.displaySizeChanged.unregister(self._handleDisplaySizeChanged) + braille.unregisterMirror(self) + self._writer.stop() + + +class _DirectSession(braille.DirectBrailleWindow): + """DirectBrailleWindow that forwards gestures over the pipe.""" + + def __init__(self, handle: ctypes.wintypes.HANDLE, hwnd: int, numCells: int) -> None: + super().__init__(hwnd=hwnd, numCells=numCells) + self._writer = _AsyncWriter(handle) + self.activate() + + def onGesture(self, gesture: braille.BrailleDisplayGesture) -> None: + self._writer.send( + { + "type": "gesture", + "source": getattr(gesture, "source", ""), + "model": getattr(gesture, "model", "") or "", + "id": getattr(gesture, "id", ""), + "routingIndex": getattr(gesture, "routingIndex", None), + "dots": getattr(gesture, "dots", 0), + "space": getattr(gesture, "space", False), + }, + ) + + def handleMessage(self, msg: dict) -> None: + mtype = msg.get("type") + if mtype == "display": + cells = msg.get("cells", []) + self.display(cells) + else: + log.debug(f"braillePipeServer: unexpected direct message type {mtype!r}") + + def close(self) -> None: + self.deactivate() + self._writer.stop() + + +def _handleConnection(handle: ctypes.wintypes.HANDLE, pending_q: "queue.Queue") -> None: + """Thread that drives one client connection from registration to close.""" + session = None + try: + # First message must be a registration. + msg = _recvMessage(handle) + if msg is None: + return + mtype = msg.get("type") + if mtype == "register_mirror": + numCells = int(msg.get("numCells", 0)) + # If braille is not yet initialised, queue registration. + if braille.handler is None: + pending_q.put(("mirror", handle, numCells)) + return + session = _MirrorSession(handle, numCells) + elif mtype == "register_direct_braille": + hwnd = int(msg.get("hwnd", 0)) + numCells = int(msg.get("numCells", 0)) + if braille.handler is None: + pending_q.put(("direct", handle, hwnd, numCells)) + return + session = _DirectSession(handle, hwnd, numCells) + else: + log.warning(f"braillePipeServer: unexpected first message type {mtype!r}") + return + while True: + msg = _recvMessage(handle) + if msg is None: + break + session.handleMessage(msg) + except Exception: + log.exception("braillePipeServer: error in connection handler") + finally: + if session is not None: + session.close() + _kernel32.CloseHandle(handle) + + +class _PipeServer: + """Listens on a named pipe and spawns per-connection threads.""" + + def __init__(self, pipeName: str, useSystemDacl: bool = False) -> None: + self._pipeName = pipeName + self._useSystemDacl = useSystemDacl + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._sa: Optional[_SECURITY_ATTRIBUTES] = None + self._sd: Optional[ctypes.c_void_p] = None + # Queue for registrations that arrive before braille.handler is ready. + self._pending: queue.Queue = queue.Queue() + + def start(self) -> None: + if self._useSystemDacl: + try: + self._sa, self._sd = _buildSystemAccessSA() + except OSError: + log.exception( + "braillePipeServer: failed to build SYSTEM-access security descriptor;" + " falling back to default DACL (SYSTEM clients may be unable to connect)", + ) + self._thread = threading.Thread(target=self._loop, name="braillePipeServer", daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop.set() + # Unblock the accept loop by opening a dummy connection. + try: + dummy = _kernel32.CreateFileW( + self._pipeName, + 0, # GENERIC_READ | GENERIC_WRITE not needed, just unblock + 0, + None, + 3, # OPEN_EXISTING + 0, + None, + ) + if dummy != INVALID_HANDLE_VALUE: + _kernel32.CloseHandle(dummy) + except Exception: + pass + if self._thread: + self._thread.join(timeout=2.0) + if self._sd is not None: + _kernel32.LocalFree(self._sd) + self._sd = None + self._sa = None + + def processPending(self) -> None: + """Process any registrations that arrived before braille was ready.""" + while not self._pending.empty(): + try: + item = self._pending.get_nowait() + except queue.Empty: + break + role = item[0] + if role == "mirror": + _, handle, numCells = item + try: + session = _MirrorSession(handle, numCells) + threading.Thread( + target=_driveSession, + args=(handle, session), + daemon=True, + ).start() + except Exception: + log.exception("braillePipeServer: error creating pending mirror session") + _kernel32.CloseHandle(handle) + elif role == "direct": + _, handle, hwnd, numCells = item + try: + session = _DirectSession(handle, hwnd, numCells) + threading.Thread( + target=_driveSession, + args=(handle, session), + daemon=True, + ).start() + except Exception: + log.exception("braillePipeServer: error creating pending direct session") + _kernel32.CloseHandle(handle) + + def _loop(self) -> None: + while not self._stop.is_set(): + try: + handle = _createPipeInstance(self._pipeName, self._sa) + except OSError: + log.exception("braillePipeServer: failed to create pipe instance") + break + connected = _connectClient(handle) + if self._stop.is_set(): + _kernel32.CloseHandle(handle) + break + if not connected: + _kernel32.CloseHandle(handle) + continue + threading.Thread( + target=_handleConnection, + args=(handle, self._pending), + daemon=True, + ).start() + + +def _driveSession(handle: ctypes.wintypes.HANDLE, session) -> None: + """Message loop for a session created from a pending registration.""" + try: + while True: + msg = _recvMessage(handle) + if msg is None: + break + session.handleMessage(msg) + except Exception: + log.exception("braillePipeServer: error in deferred session") + finally: + session.close() + _kernel32.CloseHandle(handle) + + +_server: Optional[_PipeServer] = None + + +def initialize() -> None: + """Start the named pipe server.""" + global _server + if _server is not None: + return + isSecure = isRunningOnSecureDesktop() + pipeName = r"\\.\pipe\screen_reader_braille_secure" if isSecure else r"\\.\pipe\screen_reader_braille" + # The secure-desktop instance runs as SYSTEM, so its default DACL already permits SYSTEM connections. Only the normal-desktop instance needs the explicit SYSTEM ACE. + _server = _PipeServer(pipeName, useSystemDacl=not isSecure) + _server.start() + log.info(f"braillePipeServer: listening on {pipeName}") + # Resolve any registrations that beat us to it (shouldn't normally happen since we start early, but be defensive). + _server.processPending() + + +def terminate() -> None: + """Stop the named pipe server.""" + global _server + if _server is None: + return + _server.stop() + _server = None + log.info("braillePipeServer: stopped") diff --git a/tests/unit/test_braille/test_mirrorAndDirectWindow.py b/tests/unit/test_braille/test_mirrorAndDirectWindow.py new file mode 100644 index 00000000000..d1e92a9a191 --- /dev/null +++ b/tests/unit/test_braille/test_mirrorAndDirectWindow.py @@ -0,0 +1,280 @@ +# A part of NonVisual Desktop Access (NVDA) +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +# Copyright (C) 2026 Pneuma Solutions + +"""Unit tests for braille.BrailleMirror, braille.DirectBrailleWindow, braille.registerMirror, braille.unregisterMirror, and braille.injectGesture.""" + +import unittest +from unittest.mock import MagicMock, patch + +import braille +import inputCore + + +class _CountingMirror(braille.BrailleMirror): + """A mirror that records every display() call.""" + + def __init__(self, numCells_: int = 0) -> None: + self._numCells_ = numCells_ + self.received: list[list[int]] = [] + + def display(self, cells: list[int]) -> None: + self.received.append(list(cells)) + + def numCells(self) -> int: + return self._numCells_ + + +class TestBrailleMirrorRegistration(unittest.TestCase): + """Test register/unregisterMirror lifecycle.""" + + def tearDown(self) -> None: + # Ensure no mirrors leak between tests. + for m in list(braille._registeredMirrors): + braille.unregisterMirror(m) + + def test_registerAddsToList(self): + m = _CountingMirror() + braille.registerMirror(m) + self.assertIn(m, braille._registeredMirrors) + + def test_unregisterRemovesFromList(self): + m = _CountingMirror() + braille.registerMirror(m) + braille.unregisterMirror(m) + self.assertNotIn(m, braille._registeredMirrors) + + def test_unregisterNonRegisteredIsSafe(self): + m = _CountingMirror() + # Should not raise. + braille.unregisterMirror(m) + + def test_extensionPointsHookedOnFirstRegister(self): + self.assertFalse(braille._registeredMirrors) + m = _CountingMirror() + braille.registerMirror(m) + # The shared handler should now be registered. + self.assertIn(braille._mirrorPreWriteCells, list(braille.pre_writeCells.handlers)) + + def test_extensionPointsUnhookedAfterLastUnregister(self): + m = _CountingMirror() + braille.registerMirror(m) + braille.unregisterMirror(m) + self.assertNotIn(braille._mirrorPreWriteCells, list(braille.pre_writeCells.handlers)) + + def test_multipleRegistrationsKeepHandlerOnce(self): + m1 = _CountingMirror() + m2 = _CountingMirror() + braille.registerMirror(m1) + braille.registerMirror(m2) + # The shared pre_writeCells handler should be registered exactly once. + self.assertEqual(list(braille.pre_writeCells.handlers).count(braille._mirrorPreWriteCells), 1) + braille.unregisterMirror(m1) + braille.unregisterMirror(m2) + + +class TestBrailleMirrorDisplay(unittest.TestCase): + def tearDown(self) -> None: + for m in list(braille._registeredMirrors): + braille.unregisterMirror(m) + + def test_mirrorReceivesCellsOnWriteCells(self): + m = _CountingMirror() + braille.registerMirror(m) + cells = [0] * braille.handler.displaySize + braille.handler._writeCells(cells) + self.assertEqual(len(m.received), 1) + self.assertEqual(m.received[0], cells) + + def test_multipleMirrorsAllReceiveCells(self): + m1 = _CountingMirror() + m2 = _CountingMirror() + braille.registerMirror(m1) + braille.registerMirror(m2) + cells = [1] * braille.handler.displaySize + braille.handler._writeCells(cells) + self.assertEqual(len(m1.received), 1) + self.assertEqual(len(m2.received), 1) + + def test_unregisteredMirrorReceivesNothing(self): + m = _CountingMirror() + braille.registerMirror(m) + braille.unregisterMirror(m) + braille.handler._writeCells([0] * braille.handler.displaySize) + self.assertEqual(m.received, []) + + +class TestBrailleMirrorNumCells(unittest.TestCase): + def tearDown(self) -> None: + for m in list(braille._registeredMirrors): + braille.unregisterMirror(m) + + def test_numCellsZeroHasNoEffect(self): + """A mirror with numCells()==0 should not shrink the display width.""" + m = _CountingMirror(numCells_=0) + braille.registerMirror(m) + original = braille.handler.displayDimensions + # Apply the mirror filter manually. + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result, original) + + def test_numCellsShrinksCap(self): + """A mirror reporting fewer cells should cap numCols.""" + displayCols = braille.handler.displayDimensions.numCols + cap = max(1, displayCols - 4) + m = _CountingMirror(numCells_=cap) + braille.registerMirror(m) + original = braille.handler.displayDimensions + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result.numCols, cap) + + def test_numCellsLargerThanDisplayHasNoEffect(self): + """A mirror reporting more cells than the display should not widen it.""" + displayCols = braille.handler.displayDimensions.numCols + m = _CountingMirror(numCells_=displayCols + 100) + braille.registerMirror(m) + original = braille.handler.displayDimensions + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result.numCols, displayCols) + + def test_smallestMirrorWins(self): + """When multiple mirrors are registered, the smallest positive numCells wins.""" + displayCols = braille.handler.displayDimensions.numCols + small = max(1, displayCols - 10) + big = max(1, displayCols - 5) + m1 = _CountingMirror(numCells_=big) + m2 = _CountingMirror(numCells_=small) + braille.registerMirror(m1) + braille.registerMirror(m2) + original = braille.handler.displayDimensions + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result.numCols, small) + + +class TestInjectGesture(unittest.TestCase): + def test_injectGestureCallsExecuteGesture(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + mock_manager = MagicMock() + with patch.object(inputCore, "manager", mock_manager): + braille.injectGesture(gesture) + mock_manager.executeGesture.assert_called_once_with(gesture) + + def test_injectGestureSwallowsNoInputGestureAction(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + mock_manager = MagicMock() + mock_manager.executeGesture.side_effect = inputCore.NoInputGestureAction + with patch.object(inputCore, "manager", mock_manager): + # Should not raise. + braille.injectGesture(gesture) + + +class TestDirectBrailleWindow(unittest.TestCase): + """Tests for DirectBrailleWindow using a mock foreground-window check.""" + + FAKE_HWND = 0xDEADBEEF + + def setUp(self) -> None: + self._win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=0) + + def tearDown(self) -> None: + self._win.deactivate() + + def _patch_foreground(self, is_fg: bool): + return patch.object(self._win, "_isForeground", return_value=is_fg) + + def test_activateRegistersDecideEnabled(self): + self._win.activate() + self.assertIn(self._win._handleDecideEnabled, list(braille.decide_enabled.handlers)) + + def test_deactivateUnregistersDecideEnabled(self): + self._win.activate() + self._win.deactivate() + self.assertNotIn(self._win._handleDecideEnabled, list(braille.decide_enabled.handlers)) + + def test_doubleActivateIsNoop(self): + self._win.activate() + self._win.activate() + count = list(braille.decide_enabled.handlers).count(self._win._handleDecideEnabled) + self.assertEqual(count, 1) + self._win.deactivate() + + def test_doubleDeactivateIsNoop(self): + self._win.activate() + self._win.deactivate() + # Second deactivate should not raise. + self._win.deactivate() + + def test_handleDecideEnabledReturnsFalseWhenForeground(self): + with self._patch_foreground(True): + self.assertFalse(self._win._handleDecideEnabled()) + + def test_handleDecideEnabledReturnsTrueWhenNotForeground(self): + with self._patch_foreground(False): + self.assertTrue(self._win._handleDecideEnabled()) + + def test_gestureInterceptedWhenForeground(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + self._win.onGesture = MagicMock() + with self._patch_foreground(True): + result = self._win._handleDecideExecuteGesture(gesture) + self.assertFalse(result) + self._win.onGesture.assert_called_once_with(gesture) + + def test_gesturePassedThroughWhenNotForeground(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + self._win.onGesture = MagicMock() + with self._patch_foreground(False): + result = self._win._handleDecideExecuteGesture(gesture) + self.assertTrue(result) + self._win.onGesture.assert_not_called() + + def test_nonBrailleGestureAlwaysPassedThrough(self): + gesture = MagicMock() # Not a BrailleDisplayGesture + with self._patch_foreground(True): + result = self._win._handleDecideExecuteGesture(gesture) + self.assertTrue(result) + + def test_numCellsZeroSkipsFilter(self): + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=0) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=True): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result, dims) + + def test_numCellsCapsWhenForeground(self): + displayCols = braille.handler.displayDimensions.numCols + cap = max(1, displayCols - 4) + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=cap) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=True): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result.numCols, cap) + + def test_numCellsIgnoredWhenNotForeground(self): + displayCols = braille.handler.displayDimensions.numCols + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=max(1, displayCols - 4)) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=False): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result, dims) + + def test_numCellsLargerThanDisplayHasNoEffect(self): + displayCols = braille.handler.displayDimensions.numCols + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=displayCols + 100) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=True): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result.numCols, displayCols) + + def test_filterRegisteredWhenNumCellsPositive(self): + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=20) + win.activate() + self.assertIn(win._handleFilterDisplayDimensions, list(braille.filter_displayDimensions.handlers)) + win.deactivate() + + def test_filterNotRegisteredWhenNumCellsZero(self): + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=0) + win.activate() + self.assertNotIn(win._handleFilterDisplayDimensions, list(braille.filter_displayDimensions.handlers)) + win.deactivate()