Skip to content

Implement or document how to "hard cancel" a task #29

@ctrueden

Description

@ctrueden

It would be nice if task.cancel could force a cancelation, i.e. "kill" a task more thoroughly. Generally speaking, there are three levels of task interruption:

  1. Ask the task nicely to cancel itself. (Currently implemented approach) Pass a cancelation request to the worker, which must be respected by the running task, noticed via polling. This is the current approach. It is gentle and avoids leaving dangling resources. But some worker tasks cannot be interrupted easily: if your task invokes a long-running model inference, for example, the cancelation cannot actually occur until after the current long-running function call completes.

  2. Force-stop the running task thread somehow:

    • In Java, this was Thread.stop() although newer versions of Java no longer support this "deadlock-prone" and "unsafe" operation. So it's not really an option for the Groovy worker.
    • In Python, ctypes.pythonapi.PyThreadState_SetAsyncExc is possibly useful:
      import ctypes
      
      def raise_in_thread(thread_id: int, exc_type: type) -> int:
          return ctypes.pythonapi.PyThreadState_SetAsyncExc(
              ctypes.c_ulong(thread_id),
              ctypes.py_object(exc_type)
          )
      We would call it from the worker's _process_input loop when a CANCEL request arrives, injecting SystemExit or a custom exception into the task's thread (task._thread.ident). But this only fires at Python bytecode boundaries; Cellpose, for example, spends most of its time in NumPy/PyTorch C extensions that hold the GIL without returning to the Python interpreter, so it wouldn't be sufficient to kill the thread quickly.
  3. Run the difficult-to-cancel task in a dedicated subprocess of the worker, which can be process-killed if a cancelation request comes in:

    import multiprocessing
    import time
    
    def _cellpose_worker(image_shm_name, shape, dtype, result_queue):
        """Runs in isolated child process — safe to hard-kill."""
        import numpy as np
        from multiprocessing.shared_memory import SharedMemory
        from cellpose import models
    
        shm = SharedMemory(name=image_shm_name)
        image = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    
        model = models.Cellpose(gpu=True)
        masks, *_ = model.eval(image)
        result_queue.put(masks)
        shm.close()
    
    # --- In the task script: ---
    ctx = multiprocessing.get_context("spawn")  # CRITICAL: see below
    q = ctx.Queue()
    p = ctx.Process(target=_cellpose_worker, args=(shm_name, shape, dtype, q))
    p.start()
    
    while p.is_alive():
        if task.cancel_requested:
            p.kill()
            p.join()
            task.cancel()
            break
        time.sleep(0.05)
    else:
        task.outputs["masks"] = q.get()

    The spawn is critical because the worker process has its stdin/stdout wired to the service's pipes (the Appose IPC channel). With fork, the child inherits those file descriptors — any accidental print() in the child corrupts the JSON protocol. spawn starts the child clean with no inherited pipes. The downside is startup overhead: spawn must re-import Python + your libraries (numpy, torch, cellpose...) from scratch each time. For a one-shot long Cellpose run this is acceptable. For many short calls it isn't.

    • task.cancel_requestedp.kill() → guaranteed termination regardless of what C extension is running
    • The above recipe should already work: no changes to appose-python needed
    • The polling loop is already idiomatic for Appose tasks
    • The child is fully isolated; killing it doesn't disturb the worker's state or exports

To reduce verbosity of approach (3) above, we could add a helper like task.run_killable(fn, *args) to Appose worker implementations.

At minimum, the above code should be added to docs.apposed.org as a recipe to follow to enable more aggressive cancelability for deep-running tasks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    documentationImprovements or additions to documentationenhancementNew feature or request

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions