Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/workflow/CommandExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
import subprocess
import threading
from pathlib import Path
from typing import Callable, Optional
from .Logger import Logger
from .ParameterManager import ParameterManager
import sys
import importlib.util
import json


class WorkflowCancelled(Exception):
"""Raised when a workflow is cancelled by the user."""
pass


class CommandExecutor:
"""
Manages the execution of external shell commands such as OpenMS TOPP tools within a Streamlit application.
Expand All @@ -24,6 +31,27 @@ def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: Parame
self.pid_dir = Path(workflow_dir, "pids")
self.logger = logger
self.parameter_manager = parameter_manager
self._should_stop: Optional[Callable[[], bool]] = None

def set_cancellation_check(self, should_stop_func: Callable[[], bool]) -> None:
"""
Set a function that checks if the workflow should be cancelled.

Args:
should_stop_func: A callable that returns True if the workflow should stop.
"""
self._should_stop = should_stop_func

def _check_cancellation(self) -> None:
"""
Check if the workflow was cancelled and raise WorkflowCancelled if so.

This is called before each command execution to allow stopping workflows
between commands when running in Redis queue mode.
"""
if self._should_stop and self._should_stop():
self.logger.log("Workflow cancelled by user")
raise WorkflowCancelled("Workflow was cancelled")

def run_multiple_commands(
self, commands: list[str]
Expand Down Expand Up @@ -81,8 +109,12 @@ def run_command(self, command: list[str]) -> bool:
command (list[str]): The shell command to execute, provided as a list of strings.

Raises:
WorkflowCancelled: If the workflow was cancelled by the user.
Exception: If the command execution results in any errors.
"""
# Check for cancellation before starting the command
self._check_cancellation()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate WorkflowCancelled out of parallel execution

When run_command() raises inside run_multiple_commands() threads, the exception is swallowed and all(results) can still return True, so cancellations can be ignored. Consider capturing thread exceptions and re-raising after joins.

Suggested fix
-        results = []
+        results = []
+        exceptions = []
         lock = threading.Lock()
 
         def run_and_track(cmd):
-            success = self.run_command(cmd)
-            with lock:
-                results.append(success)
+            try:
+                success = self.run_command(cmd)
+                with lock:
+                    results.append(success)
+            except Exception as e:
+                with lock:
+                    exceptions.append(e)
@@
         for thread in threads:
             thread.join()
 
+        if exceptions:
+            for e in exceptions:
+                if isinstance(e, WorkflowCancelled):
+                    raise e
+            raise exceptions[0]
+
-        return all(results)
+        return len(results) == len(commands) and all(results)
🤖 Prompt for AI Agents
In `@src/workflow/CommandExecutor.py` around lines 112 - 116,
run_multiple_commands() currently swallows exceptions raised by run_command()
running in threads, so WorkflowCancelled may be ignored; modify
run_multiple_commands() to collect exceptions from each thread (e.g., append
exception objects to a shared list or set via a thread-safe structure) and also
store per-task success flags, then after joining all threads check the collected
exceptions and if any contain a WorkflowCancelled instance re-raise that
exception (or raise the first WorkflowCancelled) before returning; ensure
run_command(), run_multiple_commands(), and any usage of _check_cancellation()
reflect that exceptions propagate out of the parallel join logic rather than
being masked by all(results).


# Ensure all command parts are strings
command = [str(c) for c in command]

Expand Down
38 changes: 38 additions & 0 deletions src/workflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import traceback
from pathlib import Path

from .CommandExecutor import WorkflowCancelled


def execute_workflow(
workflow_dir: str,
Expand Down Expand Up @@ -74,6 +76,16 @@ def execute_workflow(
executor = CommandExecutor(workflow_path, logger, parameter_manager)
executor.pid_dir.mkdir(parents=True, exist_ok=True)

# Set up cancellation check for Redis queue mode
if job is not None:
def should_stop():
try:
job.refresh() # Get latest status from Redis
return job.is_stopped
except Exception:
return False
executor.set_cancellation_check(should_stop)

_update_progress(job, 0.1, "Starting workflow execution...")

# Create workflow instance
Expand Down Expand Up @@ -118,6 +130,32 @@ def execute_workflow(
"message": "Workflow completed successfully"
}

except WorkflowCancelled:
# Handle user-initiated cancellation cleanly
try:
log_dir = workflow_path / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]:
log_file = log_dir / log_name
with open(log_file, "a") as f:
f.write("\n\nWorkflow cancelled by user\n")
except Exception:
pass

# Clean up pid directory
try:
pid_dir = workflow_path / "pids"
shutil.rmtree(pid_dir, ignore_errors=True)
except Exception:
pass

return {
"success": False,
"workflow_dir": str(workflow_path),
"cancelled": True,
"error": "Workflow cancelled by user"
}

except Exception as e:
error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}"

Expand Down
Loading