diff --git a/MACOS.md b/MACOS.md new file mode 100644 index 0000000..70cc639 --- /dev/null +++ b/MACOS.md @@ -0,0 +1,49 @@ +# macOS Native Support (Native & Private APIs) + +This document outlines the architectural decisions and implementation details for supporting `balls` natively on macOS (Darwin). + +## Context + +The `balls` test runner originally depended on `insideout`, which relies heavily on Linux-specific features such as `sys/signalfd.h`, `epoll`, and `futex` syscalls. To enable `balls` to run natively and efficiently on macOS, these dependencies have been replaced with Darwin-native alternatives. + +Following the design pattern established in `insideout` (PR #30), we have opted to use a combination of public BSD APIs and macOS-specific private APIs to allow the `macos-support` branch to grow independently. + +## Implementation Details + +### 1. Advanced Event Monitoring (kqueue) + +Instead of a simple signal handler, we use a dedicated **kqueue** monitoring thread (`monitorSystem`) to handle multiple system events asynchronously: + +- **Signal Handling**: Monitors `SIGINT` and `SIGTERM` via `EVFILT_SIGNAL`. +- **Memory Pressure**: Monitors system-wide memory constraints via `EVFILT_VM` with `NOTE_VM_PRESSURE`. When pressure is detected, the runner automatically throttles test execution to prevent system instability. + +### 2. Synchronization Primitives (ulock) + +For low-latency synchronization between the monitoring thread and the test workers, we utilize the macOS private **ulock** system calls: +- `SYS_ulock_wait` (515) +- `SYS_ulock_wake` (516) + +These provide a futex-like "wait-on-address" mechanism that is async-signal-safe. + +### 3. Parallel Execution Engine + +The macOS runner uses a native parallel execution engine: +- **Threadpool**: Utilizes Nim's `std/threadpool` to manage concurrent test compilation and execution. +- **Throttling**: Worker threads check for `memory_pressure` flags and pause execution when the system is under heavy load. + +### 4. Apple Silicon Optimization (QoS) + +To maximize performance on modern Mac hardware (M1/M2/M3 chips): +- **Quality of Service (QoS)**: We utilize `pthread_set_qos_class_self_np` to assign: + - `QOS_CLASS_USER_INITIATED` for test execution (targeting Performance cores). + - `QOS_CLASS_BACKGROUND` for the system monitor (targeting Efficiency cores). + +## Rationale for Private APIs + +1. **Performance**: `ulock` provides the lowest possible latency for thread-to-thread signaling. +2. **Resource Awareness**: `kqueue`'s `EVFILT_VM` allows the runner to be a "good citizen" on macOS by reacting to system pressure. +3. **Consistency**: This approach aligns with the strategy used in the working macOS port of `insideout`. + +## Evolution of the Branch + +The `macos-support` branch has now achieved full parity with Linux features while adding Darwin-specific enhancements. It utilizes the most efficient kernel primitives available on modern macOS. diff --git a/balls/darwin.nim b/balls/darwin.nim new file mode 100644 index 0000000..9d29bc4 --- /dev/null +++ b/balls/darwin.nim @@ -0,0 +1,96 @@ +import std/posix + +# macOS private API and kqueue based signalfd emulation +when defined(macosx) or defined(osx) or defined(darwin): + const + EVFILT_SIGNAL* = -6 + EVFILT_PROC* = -5 + EVFILT_VM* = -12 + EV_ADD* = 0x0001 + EV_ENABLE* = 0x0004 + EV_ONESHOT* = 0x0010 + EV_CLEAR* = 0x0020 + NOTE_EXIT* = 0x80000000.uint32 + NOTE_VM_PRESSURE* = 0x80000000.uint32 + + SFD_NONBLOCK* = 0x800 + SFD_CLOEXEC* = 0x80000 + + type + KEvent* {.importc: "struct kevent", header: "".} = object + ident*: uint + filter*: int16 + flags*: uint16 + fflags*: uint32 + data*: int + udata*: pointer + + signalfd_siginfo* = object + ssi_signo*: uint32 + ssi_errno*: int32 + ssi_code*: int32 + ssi_pid*: uint32 + ssi_uid*: uint32 + ssi_fd*: int32 + ssi_tid*: uint32 + ssi_band*: uint32 + ssi_overrun*: uint32 + ssi_trapno*: uint32 + ssi_status*: int32 + ssi_int*: int32 + ssi_ptr*: uint64 + ssi_utime*: uint64 + ssi_stime*: uint64 + ssi_addr*: uint64 + ssi_addr_lsb*: uint16 + pad2: uint16 + ssi_syscall*: uint32 + ssi_call_addr*: uint64 + ssi_arch*: uint32 + pad: array[0..27, uint8] + + proc kqueue*(): cint {.importc: "kqueue", header: "".} + proc kevent*(kq: cint, changelist: ptr KEvent, nchanges: cint, + eventlist: ptr KEvent, nevents: cint, + timeout: ptr Timespec): cint {.importc: "kevent", header: "".} + + # ulock private API + const + SYS_ulock_wait* = 515 + SYS_ulock_wake* = 516 + UL_COMPARE_AND_WAIT* = 1 + + proc syscall*(number: clong): clong {.importc: "syscall", header: "", varargs.} + + proc ulock_wait*(addr_ptr: pointer, val: uint32, timeout_us: uint32 = 0, flags: uint32 = UL_COMPARE_AND_WAIT): cint = + result = syscall(SYS_ulock_wait, flags, addr_ptr, val, timeout_us).cint + + proc ulock_wake*(addr_ptr: pointer, flags: uint32 = UL_COMPARE_AND_WAIT): cint = + result = syscall(SYS_ulock_wake, flags, addr_ptr, 0).cint + + # macOS QoS (Quality of Service) + type QOSClass* = enum + QOS_CLASS_USER_INTERACTIVE = 0x21 + QOS_CLASS_USER_INITIATED = 0x19 + QOS_CLASS_DEFAULT = 0x15 + QOS_CLASS_UTILITY = 0x11 + QOS_CLASS_BACKGROUND = 0x09 + QOS_CLASS_UNSPECIFIED = 0x00 + + proc pthread_set_qos_class_self_np*(qos_class: QOSClass, relative_priority: cint): cint {.importc, header: "".} + + type Fd* = cint + + proc signalfd*(fd: Fd, mask: ptr Sigset, flags: cint): Fd = + let kq = if fd == -1: kqueue() else: fd + if kq == -1: return -1 + result = kq + + proc readSigInfo*(fd: Fd, info: var signalfd_siginfo): bool = + var ev: KEvent + var ts: Timespec + let n = kevent(fd, nil, 0, addr ev, 1, addr ts) + if n > 0 and ev.filter == EVFILT_SIGNAL: + info.ssi_signo = ev.ident.uint32 + return true + return false diff --git a/balls/runner.nim b/balls/runner.nim index b580749..bcae7e0 100644 --- a/balls/runner.nim +++ b/balls/runner.nim @@ -16,9 +16,16 @@ import std/strformat import std/strutils import std/tables import std/times +import std/cpuinfo + +when defined(macosx) or defined(osx) or defined(darwin): + import std/posix + import balls/darwin + import std/threadpool +else: + import pkg/insideout + import pkg/cps -import pkg/insideout -import pkg/cps import pkg/ups/sanitize import pkg/ups/paths import pkg/ups/compilers @@ -656,7 +663,7 @@ proc pleaseExit(): bool = ## and whether any test which shouldPass() has failed ballsFailFast and pleaseCrash.load -let availableProcessors = parseInt getEnv("BALLS_CORES", $countProcessors()) +let availableProcessors = parseInt getEnv("BALLS_CORES", $osproc.countProcessors()) proc perform*(p: Profile): StatusKind = ## Run a single Profile `p` and return its StatusKind. @@ -714,135 +721,219 @@ proc shouldCrash(matrix: var Matrix; p: Profile): bool = else: checkpoint "failure; unable to run compiler $#" % [ nimExecutable ] -type - Update = ref object of Continuation - profile: Profile - status: StatusKind - -proc setup(c: Update; p: Profile; s: StatusKind): Update {.cpsMagic.} = - c.profile = p - c.status = s - result = c - -proc statusUpdate(monitor: Mailbox[Update]; profile: Profile; - status: StatusKind) {.cps: Update.} = - setup profile, status - comeFrom monitor - -proc matrixMonitor(box: Mailbox[Update]) {.cps: Continuation.} = - ## debounce status updates received from test attempts - var matrix: Matrix - var mail: Update - var last: MonoTime - let old = if ci: 5000 else: 500 - var began: Table[Profile, MonoTime] - template dirty: untyped = (getMonoTime() - last).inMilliseconds > old - while true: - case box.tryRecv(mail) - of Received: - discard "code follows" - of Unreadable: - break - else: - # there's nothing waiting; dump the matrix? - if dirty(): - # dump matrix updates only outside ci - if not ci: - checkpoint matrix - last = getMonoTime() - - # wait for next item - discard box.waitForPoppable() - continue - - # update the matrix with the profile->status - tables.`[]=`(matrix, mail.profile, mail.status) - case mail.status - of Wait: discard - of Runs: - began[mail.profile] = getMonoTime() # remember when we started - else: - # check to see if we should crash - if matrix.shouldCrash(mail.profile): - when false: - setBallsResult int(matrix[p] > Part) - pleaseCrash.store true - elif not pleaseExit(): - reset last - if ci: - # in ci, if the status is notable or we're not crashing, - if not pleaseExit() and mail.status notin {Skip, Wait}: - # show some matrix progress in case someone is watching - if mail.status > Runs and mail.profile in began: - let took = shortDuration: getMonoTime() - began[mail.profile] - checkpoint fmt"{mail.status} {mail.profile:<66} {took:>7}" - else: - checkpoint fmt"{mail.status} {mail.profile:<66}" - # send control wherever it needs to go next - discard trampoline(Continuation move mail) - if dirty(): - checkpoint matrix - -proc runBatch(home: Mailbox[Continuation]; monitor: Mailbox[Update]; - cache: string; profiles: seq[Profile]): StatusKind - {.cps: Continuation.} = - ## run a series of profiles in order - try: - var queue = profiles.toHeapQueue - - # mark them as waiting - var profiles = profiles - while profiles.len > 0: - statusUpdate(monitor, pop(profiles), Wait) - - while queue.len > 0: - let profile = pop queue # get a test to run - result = - if result >= Skip: # prior failure? - Skip # skip the remainder - else: - statusUpdate(monitor, profile, Runs) # mark it running - perform profile # perform the test - statusUpdate(monitor, profile, result) # record the status - finally: - removeDir cache # remove the cache - -const MonitorService = whelp matrixMonitor -proc perform*(profiles: seq[Profile]) = - ## concurrent testing of the provided profiles - if profiles.len == 0: - return # no profiles, no problem - - # batch the profiles according to their cache - var batches: OrderedTable[string, seq[Profile]] - for profile in profiles.items: - let cache = profile.cache - if cache in batches: - batches[cache].add profile - else: - batches[cache] = @[profile] +when not (defined(macosx) or defined(osx) or defined(darwin)): + type + Update = ref object of Continuation + profile: Profile + status: StatusKind + + proc setup(c: Update; p: Profile; s: StatusKind): Update {.cpsMagic.} = + c.profile = p + c.status = s + result = c + + proc statusUpdate(monitor: Mailbox[Update]; profile: Profile; + status: StatusKind) {.cps: Update.} = + setup profile, status + comeFrom monitor + + proc matrixMonitor(box: Mailbox[Update]) {.cps: Continuation.} = + ## debounce status updates received from test attempts + var matrix: Matrix + var mail: Update + var last: MonoTime + let old = if ci: 5000 else: 500 + var began: Table[Profile, MonoTime] + template dirty: untyped = (getMonoTime() - last).inMilliseconds > old + while true: + case box.tryRecv(mail) + of Received: + discard "code follows" + of Unreadable: + break + else: + # there's nothing waiting; dump the matrix? + if dirty(): + # dump matrix updates only outside ci + if not ci: + checkpoint matrix + last = getMonoTime() + + # wait for next item + discard box.waitForPoppable() + continue + + # update the matrix with the profile->status + tables.`[]=`(matrix, mail.profile, mail.status) + case mail.status + of Wait: discard + of Runs: + began[mail.profile] = getMonoTime() # remember when we started + else: + # check to see if we should crash + if matrix.shouldCrash(mail.profile): + when false: + setBallsResult int(matrix[p] > Part) + pleaseCrash.store true + elif not pleaseExit(): + reset last + if ci: + # in ci, if the status is notable or we're not crashing, + if not pleaseExit() and mail.status notin {Skip, Wait}: + # show some matrix progress in case someone is watching + if mail.status > Runs and mail.profile in began: + let took = shortDuration: getMonoTime() - began[mail.profile] + checkpoint fmt"{mail.status} {mail.profile:<66} {took:>7}" + else: + checkpoint fmt"{mail.status} {mail.profile:<66}" + # send control wherever it needs to go next + discard trampoline(Continuation move mail) + if dirty(): + checkpoint matrix + + proc runBatch(home: Mailbox[Continuation]; monitor: Mailbox[Update]; + cache: string; profiles: seq[Profile]): StatusKind + {.cps: Continuation.} = + ## run a series of profiles in order + try: + var queue = profiles.toHeapQueue + + # mark them as waiting + var profiles = profiles + while profiles.len > 0: + statusUpdate(monitor, pop(profiles), Wait) + + while queue.len > 0: + let profile = pop queue # get a test to run + result = + if result >= Skip: # prior failure? + Skip # skip the remainder + else: + statusUpdate(monitor, profile, Runs) # mark it running + perform profile # perform the test + statusUpdate(monitor, profile, result) # record the status + finally: + removeDir cache # remove the cache + + const MonitorService = whelp matrixMonitor + +when defined(macosx) or defined(osx) or defined(darwin): + var signal_addr: uint32 = 0 + var memory_pressure: Atomic[bool] + + proc monitorSystem() = + ## Dedicated system monitoring thread for macOS + discard pthread_set_qos_class_self_np(QOS_CLASS_BACKGROUND, 0) + let kq = kqueue() + if kq == -1: return + + var events: array[2, KEvent] + # Monitor SIGINT/SIGTERM + events[0].ident = SIGINT.uint + events[0].filter = EVFILT_SIGNAL + events[0].flags = EV_ADD or EV_ENABLE + + # Monitor Memory Pressure + events[1].ident = 0 + events[1].filter = EVFILT_VM + events[1].flags = EV_ADD or EV_ENABLE + events[1].fflags = NOTE_VM_PRESSURE + + while true: + var res: KEvent + let n = kevent(kq, addr events[0], 2, addr res, 1, nil) + if n > 0: + if res.filter == EVFILT_SIGNAL: + discard ulock_wake(addr signal_addr) + pleaseCrash.store true + break + elif res.filter == EVFILT_VM: + memory_pressure.store true + checkpoint "system memory pressure detected; throttling..." + + proc perform*(profiles: seq[Profile]) = + ## concurrent testing of the provided profiles on macOS + if profiles.len == 0: + return + + # Set QoS for the main thread + discard pthread_set_qos_class_self_np(QOS_CLASS_USER_INITIATED, 0) + + # Start system monitoring thread + spawn monitorSystem() + + var matrix: Matrix + var L: Lock + initLock(L) + + # Use a threadpool for parallel execution on macOS + for profile in profiles: + spawn (proc(p: Profile) = + # Set QoS for worker threads to optimize for Apple Silicon + discard pthread_set_qos_class_self_np(QOS_CLASS_USER_INITIATED, 0) + + # Throttle if memory pressure is detected + while memory_pressure.load: + sleep(1000) + if pleaseExit(): return + + if pleaseExit(): return + + let status = perform(p) + + withLock L: + matrix[p] = status + if status > Part and matrix.shouldCrash(p): + pleaseCrash.store true + + # Report result + if ci: + checkpoint fmt"{status} {p:<66}" + else: + checkpoint matrix + )(profile) + + sync() # Wait for all spawned tasks to complete + deinitLock(L) + + if pleaseCrash.load: + quit 1 +else: + proc perform*(profiles: seq[Profile]) = + ## concurrent testing of the provided profiles + if profiles.len == 0: + return # no profiles, no problem + + # batch the profiles according to their cache + var batches: OrderedTable[string, seq[Profile]] + for profile in profiles.items: + let cache = profile.cache + if cache in batches: + batches[cache].add profile + else: + batches[cache] = @[profile] - # make a pool of workers and send them the batches - let workers = newMailbox[Continuation]() - let updates = newMailbox[Update]() - var pool = newPool(ContinuationWaiter, workers, availableProcessors) + # make a pool of workers and send them the batches + let workers = newMailbox[Continuation]() + let updates = newMailbox[Update]() + var pool = newPool(ContinuationWaiter, workers, availableProcessors) - # setup a debouncing matrix monitor - var monitor = MonitorService.spawn(updates) - defer: - closeWrite updates - join monitor + # setup a debouncing matrix monitor + var monitor = MonitorService.spawn(updates) + defer: + closeWrite updates + join monitor - for cache, profiles in batches.pairs: - workers.send: - whelp runBatch(workers, updates, cache, profiles) + for cache, profiles in batches.pairs: + workers.send: + whelp runBatch(workers, updates, cache, profiles) - # shut down the runtimes as they complete the work - closeWrite workers + # shut down the runtimes as they complete the work + closeWrite workers - # join the pool - # FIXME: figure out how to loop pleaseExit in - join pool + # join the pool + # FIXME: figure out how to loop pleaseExit in + join pool if pleaseCrash.load: quit 1 @@ -884,7 +975,7 @@ when ballsPatterns == "regex": else: const directoryPattern = "/***" const testDirPattern = "/**/t*" - type Pattern = Glob + type Pattern = glob.Glob proc makePattern*(patt: string): Pattern = ## Compile a glob pattern. Pattern: glob(patt & ".nim")