diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6e5671abd8..b74e91ac76 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,11 +1,11 @@ repos: - repo: https://github.com/pycqa/isort - rev: "7.0.0" + rev: "8.0.1" hooks: - id: isort args: ["--profile", "black"] - repo: https://github.com/psf/black - rev: "25.12.0" + rev: "26.3.1" hooks: - id: black diff --git a/examples/moe_lb/archives/loongflow_best_common.py b/examples/moe_lb/archives/loongflow_best_common.py new file mode 100644 index 0000000000..7503c6f244 --- /dev/null +++ b/examples/moe_lb/archives/loongflow_best_common.py @@ -0,0 +1,164 @@ +""" +best solution for general solver for moe load balance +""" + +from typing import Tuple + +import numpy as np + + +def solve_lplb_policy( + initial_workloads: np.ndarray, physical_expert_placement: np.ndarray +) -> Tuple[np.ndarray, float]: + """ + Optimized load balancing solver using enhanced dynamic threshold with prioritized processing. + + Key Improvements: + 1. Weighted initialization (70/30 split) for better starting point + 2. Adaptive threshold with progressive tightening + 3. Proportional transfer limits for stability + 4. Optimized processing order + 5. Early termination conditions + + Args: + initial_workloads: Shape (N,), initial token counts for N logical experts + physical_expert_placement: Shape (ep_size, n_slots_per_rank), defines processing unit topology + + Returns: + allocation_matrix: Shape (N, M), token allocation from experts to processing units + minimized_max_load: Float, the minimized maximum load across all processing units + """ + # Problem dimensions and flattened placement + n_logical_experts = initial_workloads.shape[0] + ep_size, n_slots_per_rank = physical_expert_placement.shape + n_total_slots = ep_size * n_slots_per_rank + flat_placement = physical_expert_placement.flatten() + + # Helper functions + def calculate_gpu_loads(alloc_matrix): + slot_loads = np.sum(alloc_matrix, axis=0) + return slot_loads.reshape((ep_size, n_slots_per_rank)).sum(axis=1) + + def get_gpu_slots(gpu_idx): + return range(gpu_idx * n_slots_per_rank, (gpu_idx + 1) * n_slots_per_rank) + + def calculate_safe_transfer(alloc_matrix, src_slot, dst_slot): + src_load = np.sum(alloc_matrix[:, src_slot]) + dst_load = np.sum(alloc_matrix[:, dst_slot]) + mean_load = np.mean(calculate_gpu_loads(alloc_matrix)) + + src_gpu = src_slot // n_slots_per_rank + dst_gpu = dst_slot // n_slots_per_rank + gpu_loads = calculate_gpu_loads(alloc_matrix) + + # Calculate maximum safe transfer amount + transfer = min( + src_load * 0.3, # Max 30% of source slot's load + (gpu_loads[src_gpu] - gpu_loads[dst_gpu]) * 0.4, # Proportional to imbalance + mean_load - dst_load, # Don't overfill target + ) + return max(transfer, 0) + + def execute_transfer(alloc_matrix, expert, src_slot, dst_slot, amount): + alloc_matrix[expert, src_slot] -= amount + alloc_matrix[expert, dst_slot] += amount + + # Enhanced Initialization Phase (70/30 weighted distribution) + allocation_matrix = np.zeros((n_logical_experts, n_total_slots)) + + for expert_id in range(n_logical_experts): + replica_indices = np.where(flat_placement == expert_id)[0] + if len(replica_indices) > 0: + # Get current loads of replicas + replica_loads = np.sum(allocation_matrix[:, replica_indices], axis=0) + + # 70% to least loaded replica, 30% distributed among others + primary_replica = replica_indices[np.argmin(replica_loads)] + allocation_matrix[expert_id, primary_replica] = initial_workloads[expert_id] * 0.7 + remaining = initial_workloads[expert_id] * 0.3 + + if len(replica_indices) > 1: + allocation_matrix[expert_id, replica_indices] += remaining / len(replica_indices) + else: + allocation_matrix[expert_id, primary_replica] += remaining + + # Track best solution + best_allocation = allocation_matrix.copy() + best_Z = np.max(calculate_gpu_loads(best_allocation)) + + # Adaptive Iterative Refinement + max_iterations = 100 + for iteration in range(max_iterations): + # Calculate current loads + slot_loads = np.sum(allocation_matrix, axis=0) + gpu_loads = calculate_gpu_loads(allocation_matrix) + current_Z = np.max(gpu_loads) + + # Update best solution + if current_Z < best_Z: + best_Z = current_Z + best_allocation = allocation_matrix.copy() + + # Early termination if balanced + if np.max(gpu_loads) - np.min(gpu_loads) < 1e-6: + break + + # Adaptive threshold calculation + mean_load = np.mean(gpu_loads) + std_load = np.std(gpu_loads) + threshold = mean_load + (0.8 - 0.7 * (iteration / max_iterations)) * std_load + + # Process overloaded GPUs in descending order + overloaded_gpus = np.where(gpu_loads > threshold)[0] + for gpu in sorted(overloaded_gpus, key=lambda x: -gpu_loads[x]): + # Process slots in this GPU by descending load + slots = get_gpu_slots(gpu) + for slot in sorted(slots, key=lambda x: -slot_loads[x]): + # Get contributing experts sorted by contribution + contributing_experts = np.where(allocation_matrix[:, slot] > 1e-6)[0] + for expert in sorted( + contributing_experts, key=lambda x: -allocation_matrix[x, slot] + ): + # Get all valid target replicas sorted by load + replica_indices = np.where(flat_placement == expert)[0] + target_replicas = sorted( + replica_indices, key=lambda x: np.sum(allocation_matrix[:, x]) + ) + + for target in target_replicas: + if target == slot: + continue + + transfer = calculate_safe_transfer(allocation_matrix, slot, target) + if transfer > 1e-6: + execute_transfer(allocation_matrix, expert, slot, target, transfer) + + # Update slot and GPU loads + slot_loads[slot] -= transfer + slot_loads[target] += transfer + src_gpu = slot // n_slots_per_rank + dst_gpu = target // n_slots_per_rank + gpu_loads[src_gpu] -= transfer + gpu_loads[dst_gpu] += transfer + + # Early exit if we've balanced this slot + if allocation_matrix[expert, slot] < 1e-6: + break + + # Final validation and cleanup + allocation_matrix = best_allocation + + # Fix floating point discrepancies + for expert_id in range(n_logical_experts): + total = np.sum(allocation_matrix[expert_id]) + if not np.isclose(total, initial_workloads[expert_id]): + diff = initial_workloads[expert_id] - total + replica_indices = np.where(flat_placement == expert_id)[0] + if len(replica_indices) > 0: + allocation_matrix[expert_id, replica_indices[0]] += diff + + # Calculate final maximum load + final_gpu_loads = calculate_gpu_loads(allocation_matrix) + minimized_max_load = np.max(final_gpu_loads) if final_gpu_loads.size > 0 else 0.0 + + return allocation_matrix, float(minimized_max_load) diff --git a/examples/moe_lb/archives/loongflow_best_lp.py b/examples/moe_lb/archives/loongflow_best_lp.py new file mode 100644 index 0000000000..37c6ba1943 --- /dev/null +++ b/examples/moe_lb/archives/loongflow_best_lp.py @@ -0,0 +1,99 @@ +""" +best solution for liner programming solver for moe load balance +""" + +from typing import Tuple + +import numpy as np +from scipy.optimize import linprog + + +def solve_lplb_policy( + initial_workloads: np.ndarray, physical_expert_placement: np.ndarray +) -> Tuple[np.ndarray, float]: + """ + Solves the load balancing problem using linear programming to minimize the maximum GPU load. + + Args: + initial_workloads: Array of shape (N,) with workload for each logical expert. + physical_expert_placement: Array of shape (K, S) mapping experts to processing units. + + Returns: + Tuple containing: + - allocation_matrix: Array of shape (N, M) with optimal token allocations + - minimized_max_load: The minimized maximum GPU load + """ + N = initial_workloads.shape[0] # Number of logical experts + K, S = physical_expert_placement.shape # K GPUs, S slots per GPU + M = K * S # Total processing units + + # Flatten the placement matrix to get processing unit assignments + flat_placement = physical_expert_placement.flatten() + + # Precompute valid (i,j) pairs where expert i can be assigned to unit j + valid_pairs = [] + var_indices = {} # Maps (i,j) to variable index + var_count = 0 + + for i in range(N): + for j in range(M): + if flat_placement[j] == i: + valid_pairs.append((i, j)) + var_indices[(i, j)] = var_count + var_count += 1 + + # Number of variables is the number of valid (i,j) pairs + num_vars = len(valid_pairs) + + # Construct the LP problem: + # Objective: minimize Z (the maximum load) + # We'll represent Z as the last variable (index num_vars) + c = np.zeros(num_vars + 1) + c[-1] = 1 # Minimize Z + + # Constraints: + # 1. GPU load constraints (Z >= sum of loads for each GPU) + # 2. Flow conservation (sum of allocations for each expert = initial workload) + # 3. Non-negativity is handled by bounds + + # GPU load constraints: one per GPU + A_ub = np.zeros((K, num_vars + 1)) + b_ub = np.zeros(K) + + for k in range(K): + # Get all slots in this GPU + slots_in_gpu = range(k * S, (k + 1) * S) + # Find all variables that contribute to this GPU's load + for (i, j), var_idx in var_indices.items(): + if j in slots_in_gpu: + A_ub[k, var_idx] = 1 + A_ub[k, -1] = -1 # -Z term + + # Flow conservation constraints: one per expert + A_eq = np.zeros((N, num_vars + 1)) + b_eq = initial_workloads.copy() + + for i in range(N): + for (expert_i, j), var_idx in var_indices.items(): + if expert_i == i: + A_eq[i, var_idx] = 1 + + # Bounds: all x_ij >= 0, Z is unbounded + bounds = [(0, None) for _ in range(num_vars)] + [(None, None)] + + # Solve the LP problem + result = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method="highs") + + if not result.success: + raise RuntimeError(f"LP solver failed: {result.message}") + + # Extract solution + solution = result.x + minimized_max_load = solution[-1] + + # Build the allocation matrix + allocation_matrix = np.zeros((N, M)) + for (i, j), var_idx in var_indices.items(): + allocation_matrix[i, j] = solution[var_idx] + + return allocation_matrix, float(minimized_max_load) diff --git a/examples/moe_lb/archives/prompt_common.md b/examples/moe_lb/archives/prompt_common.md new file mode 100644 index 0000000000..98ad05d853 --- /dev/null +++ b/examples/moe_lb/archives/prompt_common.md @@ -0,0 +1,75 @@ +Act as an expert in operations research and algorithmic optimization, with a specialization in load balancing for distributed computing systems. + +#### **Objective** + +Your primary goal is to develop a Python function that solves a dynamic load balancing problem for Mixture-of-Experts (MoE) models. The task is to optimally redistribute computational workloads (tokens) from a set of `N` "logical experts" to `M` available "processing units" (slots), which are grouped across `K` physical GPUs. + +The core objective is to **minimize the maximum total load** on any single Graphics Processing Unit (GPU), a classic min-max optimization problem. + +#### **Formal Problem Definition** + +Your solution must find an allocation that adheres to the following mathematical model: + +* **Inputs**: + * An initial workload vector $L = [l_1, l_2, \dots, l_N]^T$, where $l_i$ is the number of tokens assigned to logical expert $e_i$. + * A hardware topology defined by the `physical_expert_placement` matrix of shape `(K, S)`, where `K` is the number of GPUs and `S` is the number of processing units (slots) per GPU. The total number of processing units is $M = K \times S$. + +* **Decision Variables**: + * $x_{ij}$: The number of tokens transferred from logical expert $e_i$ to processing unit $p_j$ (where $j$ is an index from $0$ to $M-1$). + +* **Objective**: + * Find an allocation $x_{ij}$ that minimizes the value of $Z$, where $Z$ is an auxiliary variable representing the maximum load across all GPUs. + * **Minimize:** $Z$ + +* **Constraints**: + 1. **GPU Load Definition Constraint**: The final load on any GPU $G_k$, which is the sum of loads on all its constituent processing units, must not exceed $Z$. Let $\mathcal{S}(k)$ be the set of indices of processing units belonging to GPU $k$. + * $\sum_{j \in \mathcal{S}(k)} \sum_{i=1}^{N} x_{ij} - Z \le 0, \quad \forall k \in 1, \dots, K$ + 2. **Flow Conservation Constraint**: The total number of tokens transferred out of a logical expert $e_i$ must equal its initial workload $l_i$. + * $\sum_{j=1}^{M} x_{ij} = l_i, \quad \forall i \in 1, \dots, N$ + 3. **Non-negativity Constraint**: The number of transferred tokens cannot be negative. + * $x_{ij} \ge 0, \quad \forall i, j$ + 4. **Placement Constraint**: The workload from a logical expert $e_i$ can only be allocated to processing units that are its designated replicas. Let $\mathcal{P}(i)$ be the set of indices of processing units that are replicas of logical expert $e_i$. The allocation must satisfy: + * $x_{ij} = 0, \quad \forall i, \forall j \notin \mathcal{P}(i)$ + +#### **Code Structure & API Contract** + +Your code will be evaluated through a fixed entry point. You **must** implement a function with the following exact signature in your Python script: + +```python +import numpy as np +from typing import Tuple + +def solve_lplb_policy( + initial_workloads: np.ndarray, + physical_expert_placement: np.ndarray +) -> Tuple[np.ndarray, float]: +``` + +This function must return: + +1. `allocation_matrix`: A `numpy.ndarray` of shape `(N, M)`, representing the optimal allocation $x_{ij}$. `N` is the number of logical experts, and `M` is the total number of processing units. +2. `minimized_max_load`: A `float` representing the minimized maximum load $Z$ from your optimal solution. + +The `physical_expert_placement` matrix is crucial oth the valid targets for allocation and the grouping of processing units into GPUs. +The evaluation script will handle the scoring based on this function's output. **Do not change the function signature.** + +#### **Constraints** + +The returned solution **MUST** adhere to these rules: + +1. **Flow Conservation**: The sum of allocated tokens for each logical expert must exactly match its initial workload. (`np.sum(allocation_matrix, axis=1)` must equal `initial_workloads`). +2. **Non-negativity**: All elements in the `allocation_matrix` must be greater than or equal to zero. +3. **Performance**: The entire process, for a moderately sized problem, must complete within a reasonable time limit (e.g., 60 seconds). The evaluation will be run in a sandboxed environment with a timeout. +4. **Placement Correctness**: The workload from a logical expert `e_i` can **only** be allocated to processing units that are replicas of that same expert `e_i`. You cannot send tokens from expert `e_1` to a replica of expert `e_2`. Your solution must respect the mapping provided by `physical_expert_placement`. + +#### **Strategic Guidance for Optimization** + +The goal is to find an optimal (or near-optimal) allocation. You are encouraged to explore a wide range of algorithmic strategies to find a correct and efficient solution. Consider the following diverse approaches: + +* **Greedy & Heuristic Algorithms**: Can you design a fast, simple algorithm that produces good results? For example, you could iteratively assign tokens from each expert to its currently least-loaded replica. +* **Iterative Refinement**: Start with a simple valid allocation (like the baseline "even split" strategy) and iteratively improve it. For instance, you could devise a method to move load from the most-loaded processing units to less-loaded ones while respecting all constraints. +* **Network Flow Models**: This problem can be modeled as a variant of a network flow problem. Exploring algorithms from this domain, such as min-cost max-flow, might yield powerful solutions. +* **Established Optimization Solvers**: While not required, using a generic solver is a valid strategy. The problem can be formally modeled for libraries handling Linear Programming or other forms of convex optimization. This can guarantee optimality but may have performance trade-offs. +* **Metaheuristics**: For very large-scale problems where finding the exact optimum is too slow, metaheuristic approaches like Simulated Annealing or Particle Swarm Optimization could find high-quality approximate solutions quickly. + +The most successful solution will be one that is both correct (obeys all constraints) and computationally efficient, finding the best possible balance of load in the shortest time. Good luck! \ No newline at end of file diff --git a/examples/moe_lb/archives/prompt_lp.md b/examples/moe_lb/archives/prompt_lp.md new file mode 100644 index 0000000000..627dd01bf4 --- /dev/null +++ b/examples/moe_lb/archives/prompt_lp.md @@ -0,0 +1,76 @@ +Act as an expert in operations research and algorithmic optimization, with a specialization in load balancing for distributed computing systems. + +#### **Objective** + +Your primary goal is to develop a Python function that solves a dynamic load balancing problem for Mixture-of-Experts (MoE) models. The task is to optimally redistribute computational workloads (tokens) from a set of `N` "logical experts" to `M` available "processing units" (slots), which are grouped across `K` physical GPUs. + +The core objective is to **minimize the maximum total load** on any single Graphics Processing Unit (GPU), a classic min-max optimization problem. + +#### **Formal Mathematical Definition** + +You are solving a **Linear Programming (LP)** problem defined as follows: + +* **Inputs**: + * An initial workload vector $L = [l_1, l_2, \dots, l_N]^T$, where $l_i$ is the number of tokens assigned to logical expert $e_i$. + * A hardware topology defined by the `physical_expert_placement` matrix of shape `(K, S)`, where `K` is the number of GPUs and `S` is the number of processing units (slots) per GPU. The total number of processing units is $M = K \times S$. + +* **Decision Variables**: + * $x_{ij}$: The number of tokens transferred from logical expert $e_i$ to processing unit $p_j$ (where $j$ is an index from $0$ to $M-1$). + +* **Objective Function**: + * Minimize $Z$, where $Z$ is an auxiliary variable representing the maximum load across all GPUs. + * **Minimize:** $Z$ + +* **Constraints**: + 1. **GPU Load Definition Constraint**: The final load on any GPU $G_k$, which is the sum of loads on all its constituent processing units, must not exceed $Z$. Let $\mathcal{S}(k)$ be the set of indices of processing units belonging to GPU $k$. + * $\sum_{j \in \mathcal{S}(k)} \sum_{i=1}^{N} x_{ij} - Z \le 0, \quad \forall k \in 1, \dots, K$ + 2. **Flow Conservation Constraint**: The total number of tokens transferred out of a logical expert $e_i$ must equal its initial workload $l_i$. + * $\sum_{j=1}^{M} x_{ij} = l_i, \quad \forall i \in 1, \dots, N$ + 3. **Non-negativity Constraint**: The number of transferred tokens cannot be negative. + * $x_{ij} \ge 0, \quad \forall i, j$ + 4. **Placement Constraint**: The workload from a logical expert $e_i$ can only be allocated to processing units that are its designated replicas. Let $\mathcal{P}(i)$ be the set of indices of processing units that are replicas of logical expert $e_i$. The allocation must satisfy: + * $x_{ij} = 0, \quad \forall i, \forall j \notin \mathcal{P}(i)$ + +#### **Code Structure & API Contract** + +Your code will be evaluated through a fixed entry point. You **must** implement a function with the following exact signature in your Python script: + +```python +import numpy as np +from typing import Tuple + +def solve_lplb_policy( + initial_workloads: np.ndarray, + physical_expert_placement: np.ndarray +) -> Tuple[np.ndarray, float]: +``` + +This function must return: + +1. `allocation_matrix`: A `numpy.ndarray` of shape `(N, M)`, representing the optimal allocation $x_{ij}$. `N` is the number of logical experts, and `M` is the total number of processing units. +2. `minimized_max_load`: A `float` representing the minimized maximum load $Z$ from your optimal solution. + +The `physical_expert_placement` matrix is crucial oth the valid targets for allocation and the grouping of processing units into GPUs. +The evaluation script will handle the scoring based on this function's output. **Do not change the function signature.** + +#### **Constraints** + +The returned solution **MUST** adhere to these rules: + +1. **Flow Conservation**: The sum of allocated tokens for each logical expert must exactly match its initial workload. (`np.sum(allocation_matrix, axis=1)` must equal `initial_workloads`). +2. **Non-negativity**: All elements in the `allocation_matrix` must be greater than or equal to zero. +3. **Performance**: The entire process, for a moderately sized problem, must complete within a reasonable time limit (e.g., 60 seconds). The evaluation will be run in a sandboxed environment with a timeout. +4. **Placement Correctness**: The workload from a logical expert `e_i` can **only** be allocated to processing units that are replicas of that same expert `e_i`. You cannot send tokens from expert `e_1` to a replica of expert `e_2`. Your solution must respect the mapping provided by `physical_expert_placement`. + +#### **Strategic Guidance for Optimization** + +To achieve a high-performing and correct solution, consider these strategies: + +* **Problem Formulation**: Recognize that this problem is a classic **Linear Program**. The most direct path to an optimal solution is to formulate and solve it as such. +* **Leverage Existing Solvers**: Instead of implementing an LP solver from scratch, you should use established, high-performance libraries. The Python ecosystem has excellent tools for this. + * A strong and common choice is the `scipy.optimize.linprog` function. It is well-documented and robust. + * Other powerful libraries like `cvxpy` or `PuLP` can also be used to model and solve this problem, often with a more intuitive syntax. +* **Matrix Formulation**: To use solvers like `scipy.optimize.linprog` efficiently, you will need to construct the standard LP matrices: the cost vector `c`, the inequality constraint matrix `A_ub`, the inequality constraint vector `b_ub`, the equality constraint matrix `A_eq`, and the equality constraint vector `b_eq`. Carefully map the mathematical constraints above into these matrix forms. Your formulation must correctly handle the Placement Constraint. A robust approach is to only create decision variables for the valid `(i, j)` pairs where unit `j` is a replica of expert `i`. This drastically reduces the problem size compared to creating variables for all `N*M` possible pairs. +* **Efficiency**: The way you construct these matrices can significantly impact performance. Use `numpy` vectorized operations where possible to build them quickly. + +Focus on evolving the core logic within `solve_lplb_policy` to be a robust and accurate LP solver for this specific load balancing task. Good luck! diff --git a/examples/moe_lb/archives/prompt_multi_obj.md b/examples/moe_lb/archives/prompt_multi_obj.md new file mode 100644 index 0000000000..90d8dc5eff --- /dev/null +++ b/examples/moe_lb/archives/prompt_multi_obj.md @@ -0,0 +1,110 @@ +Act as an expert in operations research and algorithmic optimization, with a specialization in load balancing for distributed computing systems. + +#### Objective + +Your primary goal is to develop a Python function that solves a dynamic load balancing problem for Mixture-of-Experts (MoE) models. The task is to optimally redistribute computational workloads (tokens) from a set of `N` "logical experts" to `M` available "processing units" (slots), which are grouped across `K` physical GPUs. + +The core objective is to **minimize the maximum total load** on any single Graphics Processing Unit (GPU), a classic min-max optimization problem. + +#### Evaluation & Optimization Criteria (Critical) + +Your solution will be evaluated using a multi-objective score: `combined_score = (mean_score, speedup)` + +This score follows a lexicographical ordering: + +1. **Primary objective** (`mean_score`): + - This reflects the **quality of your load balancing solution**, primarily driven by how well you minimize the maximum GPU load + - Lower maximum load → better balance → higher mean_score. + +2. **Secondary objective** (`speedup`): + - This reflects the computational efficiency of your implementation. + - Faster execution (lower runtime) → higher speedup. + +3. **Lexicographical priority**: + - You must **prioritize improving `mean_score` first**. + - Treat `mean_score` as strictly dominant. **Do not trade off `mean_score` for `speedup`.** + - Only when two solutions have **effectively identical `mean_score` (within numerical tolerance)**, `speedup` becomes the deciding factor. + +4. **Validity is mandatory**: + - Validity overrides all objectives. Invalid solutions are strictly unacceptable. + - Any violation of constraints (flow conservation, placement, etc.) will result in `validity = 0`, making the solution unusable regardless of score. + - A correct but slower solution is strictly better than a fast but invalid one. + +Implications for Your Strategy: + +- Focus first on correct LP formulation and optimality. +- Optimize for `speedup` only after achieving a near-optimal or optimal `mean_score`. +- Avoid heuristics that sacrifice solution quality for speed. +- Once correctness and optimality are ensured: + - Reduce problem size (e.g., eliminate invalid (i, j) pairs early). + - Use efficient matrix construction and solver configurations. +- Prefer **deterministic, stable solutions** over aggressive approximations. + +#### Formal Mathematical Definition + +You are solving a **Linear Programming (LP)** problem defined as follows: + +* **Inputs**: + * An initial workload vector $L = [l_1, l_2, \dots, l_N]^T$, where $l_i$ is the number of tokens assigned to logical expert $e_i$. + * A hardware topology defined by the `physical_expert_placement` matrix of shape `(K, S)`, where `K` is the number of GPUs and `S` is the number of processing units (slots) per GPU. The total number of processing units is $M = K \times S$. + +* **Decision Variables**: + * $x_{ij}$: The number of tokens transferred from logical expert $e_i$ to processing unit $p_j$ (where $j$ is an index from $0$ to $M-1$). + +* **Objective Function**: + * Minimize $Z$, where $Z$ is an auxiliary variable representing the maximum load across all GPUs. + * **Minimize:** $Z$ + +* **Constraints**: + 1. **GPU Load Definition Constraint**: The final load on any GPU $G_k$, which is the sum of loads on all its constituent processing units, must not exceed $Z$. Let $\mathcal{S}(k)$ be the set of indices of processing units belonging to GPU $k$. + * $\sum_{j \in \mathcal{S}(k)} \sum_{i=1}^{N} x_{ij} - Z \le 0, \quad \forall k \in 1, \dots, K$ + 2. **Flow Conservation Constraint**: The total number of tokens transferred out of a logical expert $e_i$ must equal its initial workload $l_i$. + * $\sum_{j=1}^{M} x_{ij} = l_i, \quad \forall i \in 1, \dots, N$ + 3. **Non-negativity Constraint**: The number of transferred tokens cannot be negative. + * $x_{ij} \ge 0, \quad \forall i, j$ + 4. **Placement Constraint**: The workload from a logical expert $e_i$ can only be allocated to processing units that are its designated replicas. Let $\mathcal{P}(i)$ be the set of indices of processing units that are replicas of logical expert $e_i$. The allocation must satisfy: + * $x_{ij} = 0, \quad \forall i, \forall j \notin \mathcal{P}(i)$ + +#### Code Structure & API Contract + +Your code will be evaluated through a fixed entry point. You **must** implement a function with the following exact signature in your Python script: + +```python +import numpy as np +from typing import Tuple + +def solve_lplb_policy( + initial_workloads: np.ndarray, + physical_expert_placement: np.ndarray +) -> Tuple[np.ndarray, float]: +``` + +This function must return: + +1. `allocation_matrix`: A `numpy.ndarray` of shape `(N, M)`, representing the optimal allocation $x_{ij}$. `N` is the number of logical experts, and `M` is the total number of processing units. +2. `minimized_max_load`: A `float` representing the minimized maximum load $Z$ from your optimal solution. + +The `physical_expert_placement` matrix is crucial for defining the valid targets for allocation and the grouping of processing units into GPUs. +The evaluation script will handle the scoring based on this function's output. **Do not change the function signature.** + +#### Constraints + +The returned solution **MUST** adhere to these rules: + +1. **Flow Conservation**: The sum of allocated tokens for each logical expert must exactly match its initial workload. (`np.sum(allocation_matrix, axis=1)` must equal `initial_workloads`). +2. **Non-negativity**: All elements in the `allocation_matrix` must be greater than or equal to zero. +3. **Performance**: The entire process, for a moderately sized problem, must complete within a reasonable time limit (e.g., 60 seconds). The evaluation will be run in a sandboxed environment with a timeout. +4. **Placement Correctness**: The workload from a logical expert `e_i` can **only** be allocated to processing units that are replicas of that same expert `e_i`. You cannot send tokens from expert `e_1` to a replica of expert `e_2`. Your solution must respect the mapping provided by `physical_expert_placement`. + +#### Strategic Guidance for Optimization + +To achieve a high-performing and correct solution, consider these strategies: + +* **Problem Formulation**: Recognize that this problem is a classic **Linear Program**. The most direct path to an optimal solution is to formulate and solve it as such. +* **Leverage Existing Solvers**: Instead of implementing an LP solver from scratch, you should use established, high-performance libraries. The Python ecosystem has excellent tools for this. + * A strong and common choice is the `scipy.optimize.linprog` function. It is well-documented and robust. + * Other powerful libraries like `cvxpy` or `PuLP` can also be used to model and solve this problem, often with a more intuitive syntax. +* **Matrix Formulation**: To use solvers like `scipy.optimize.linprog` efficiently, you will need to construct the standard LP matrices: the cost vector `c`, the inequality constraint matrix `A_ub`, the inequality constraint vector `b_ub`, the equality constraint matrix `A_eq`, and the equality constraint vector `b_eq`. Carefully map the mathematical constraints above into these matrix forms. Your formulation must correctly handle the Placement Constraint. A robust approach is to only create decision variables for the valid `(i, j)` pairs where unit `j` is a replica of expert `i`. This drastically reduces the problem size compared to creating variables for all `N*M` possible pairs. +* **Efficiency**: The way you construct these matrices can significantly impact performance. Use `numpy` vectorized operations where possible to build them quickly. + +Focus on evolving the core logic within `solve_lplb_policy` to be a robust and accurate LP solver for this specific load balancing task. Good luck! diff --git a/examples/moe_lb/archives/task_config.yaml.bak b/examples/moe_lb/archives/task_config.yaml.bak new file mode 100644 index 0000000000..9f0c67ee94 --- /dev/null +++ b/examples/moe_lb/archives/task_config.yaml.bak @@ -0,0 +1,159 @@ +# Original task configuration from baidu-baige/LoongFlow +# Reference: https://github.com/baidu-baige/LoongFlow/tree/main/agents/math_agent/examples/moe_lb + +# Global directory configuration +workspace_path: "./output" + +# Global LLM configuration(optional) +# If evaluator or other components do not have their own llm_config, this configuration will be used +llm_config: + url: "http://xxx/v1" + api_key: "xxx" + model: "deepseek-r1-250528" + temperature: 0.8 + context_length: 128000 + max_tokens: 32768 + top_p: 1.0 + timeout: 1200 + +# ------------------------------------------------------------------------------ +# Define the available configurations for all components +# ------------------------------------------------------------------------------ + +# All available Planner configurations +planners: + evolve_planner: + # Planner-specific configurations (e.g., prompt templates, LLM used, etc.) + react_max_steps: 10 + +# All available Executor configurations +executors: + evolve_executor_react: + # ExecutorReact-specific configurations + parallel_candidates: 3 + max_rounds: 3 + react_max_steps: 15 + + evolve_executor_chat: + # ExecutorChat-specific configurations + max_rounds: 3 + + evolve_executor_fuse: + # ExecutorFuse-specific configurations + max_rounds: 3 + react_max_steps: 15 + score_threshold: 0.8 + +# All available Summarizer configurations +summarizers: + evolve_summary: + # Summarizer-specific configurations + react_max_steps: 10 +# ------------------------------------------------------------------------------ +# Define configurations for the main evolution process +# ------------------------------------------------------------------------------ +evolve: + # Task description, is the core objective of the entire evolution process + task: | + Act as an expert in operations research and algorithmic optimization, with a specialization in load balancing for distributed computing systems. + + #### **Objective** + + Your primary goal is to develop a Python function that solves a dynamic load balancing problem for Mixture-of-Experts (MoE) models. The task is to optimally redistribute computational workloads (tokens) from a set of `N` "logical experts" to `M` available "processing units" (slots), which are grouped across `K` physical GPUs. + + The core objective is to **minimize the maximum total load** on any single Graphics Processing Unit (GPU), a classic min-max optimization problem. + + #### **Formal Mathematical Definition** + + You are solving a **Linear Programming (LP)** problem defined as follows: + + * **Inputs**: + * An initial workload vector $L = [l_1, l_2, \dots, l_N]^T$, where $l_i$ is the number of tokens assigned to logical expert $e_i$. + * A hardware topology defined by the `physical_expert_placement` matrix of shape `(K, S)`, where `K` is the number of GPUs and `S` is the number of processing units (slots) per GPU. The total number of processing units is $M = K \times S$. + + * **Decision Variables**: + * $x_{ij}$: The number of tokens transferred from logical expert $e_i$ to processing unit $p_j$ (where $j$ is an index from $0$ to $M-1$). + + * **Objective Function**: + * Minimize $Z$, where $Z$ is an auxiliary variable representing the maximum load across all GPUs. + * **Minimize:** $Z$ + + * **Constraints**: + 1. **GPU Load Definition Constraint**: The final load on any GPU $G_k$, which is the sum of loads on all its constituent processing units, must not exceed $Z$. Let $\mathcal{S}(k)$ be the set of indices of processing units belonging to GPU $k$. + * $\sum_{j \in \mathcal{S}(k)} \sum_{i=1}^{N} x_{ij} - Z \le 0, \quad \forall k \in 1, \dots, K$ + 2. **Flow Conservation Constraint**: The total number of tokens transferred out of a logical expert $e_i$ must equal its initial workload $l_i$. + * $\sum_{j=1}^{M} x_{ij} = l_i, \quad \forall i \in 1, \dots, N$ + 3. **Non-negativity Constraint**: The number of transferred tokens cannot be negative. + * $x_{ij} \ge 0, \quad \forall i, j$ + 4. **Placement Constraint**: The workload from a logical expert $e_i$ can only be allocated to processing units that are its designated replicas. Let $\mathcal{P}(i)$ be the set of indices of processing units that are replicas of logical expert $e_i$. The allocation must satisfy: + * $x_{ij} = 0, \quad \forall i, \forall j \notin \mathcal{P}(i)$ + + #### **Code Structure & API Contract** + + Your code will be evaluated through a fixed entry point. You **must** implement a function with the following exact signature in your Python script: + + ```python + import numpy as np + from typing import Tuple + + def solve_lplb_policy( + initial_workloads: np.ndarray, + physical_expert_placement: np.ndarray + ) -> Tuple[np.ndarray, float]: + ``` + + This function must return: + + 1. `allocation_matrix`: A `numpy.ndarray` of shape `(N, M)`, representing the optimal allocation $x_{ij}$. `N` is the number of logical experts, and `M` is the total number of processing units. + 2. `minimized_max_load`: A `float` representing the minimized maximum load $Z$ from your optimal solution. + + The `physical_expert_placement` matrix is crucial oth the valid targets for allocation and the grouping of processing units into GPUs. + The evaluation script will handle the scoring based on this function's output. **Do not change the function signature.** + + #### **Constraints** + + The returned solution **MUST** adhere to these rules: + + 1. **Flow Conservation**: The sum of allocated tokens for each logical expert must exactly match its initial workload. (`np.sum(allocation_matrix, axis=1)` must equal `initial_workloads`). + 2. **Non-negativity**: All elements in the `allocation_matrix` must be greater than or equal to zero. + 3. **Performance**: The entire process, for a moderately sized problem, must complete within a reasonable time limit (e.g., 60 seconds). The evaluation will be run in a sandboxed environment with a timeout. + 4. **Placement Correctness**: The workload from a logical expert `e_i` can **only** be allocated to processing units that are replicas of that same expert `e_i`. You cannot send tokens from expert `e_1` to a replica of expert `e_2`. Your solution must respect the mapping provided by `physical_expert_placement`. + + #### **Strategic Guidance for Optimization** + + To achieve a high-performing and correct solution, consider these strategies: + + * **Problem Formulation**: Recognize that this problem is a classic **Linear Program**. The most direct path to an optimal solution is to formulate and solve it as such. + * **Leverage Existing Solvers**: Instead of implementing an LP solver from scratch, you should use established, high-performance libraries. The Python ecosystem has excellent tools for this. + * A strong and common choice is the `scipy.optimize.linprog` function. It is well-documented and robust. + * Other powerful libraries like `cvxpy` or `PuLP` can also be used to model and solve this problem, often with a more intuitive syntax. + * **Matrix Formulation**: To use solvers like `scipy.optimize.linprog` efficiently, you will need to construct the standard LP matrices: the cost vector `c`, the inequality constraint matrix `A_ub`, the inequality constraint vector `b_ub`, the equality constraint matrix `A_eq`, and the equality constraint vector `b_eq`. Carefully map the mathematical constraints above into these matrix forms. Your formulation must correctly handle the Placement Constraint. A robust approach is to only create decision variables for the valid `(i, j)` pairs where unit `j` is a replica of expert `i`. This drastically reduces the problem size compared to creating variables for all `N*M` possible pairs. + * **Efficiency**: The way you construct these matrices can significantly impact performance. Use `numpy` vectorized operations where possible to build them quickly. + + Focus on evolving the core logic within `solve_lplb_policy` to be a robust and accurate LP solver for this specific load balancing task. Good luck! + + + # Name of the component selected for current run + planner_name: "evolve_planner" + executor_name: "evolve_executor_fuse" + summary_name: "evolve_summary" + + # Core parameters for the evolution process + max_iterations: 1000 + target_score: 1.0 + concurrency: 2 + + # Evaluator configuration + evaluator: + timeout: 1200 + + + # Database configurations + database: + storage_type: "in_memory" + num_islands: 3 + population_size: 50 + checkpoint_interval: 5 + # If sampling_weight_power < 1, sampling weight has a weak effect + sampling_weight_power: 5 + diff --git a/examples/moe_lb/config.yaml b/examples/moe_lb/config.yaml new file mode 100644 index 0000000000..fc8dda8cbb --- /dev/null +++ b/examples/moe_lb/config.yaml @@ -0,0 +1,150 @@ +max_iterations: 25 +checkpoint_interval: 5 +log_level: "INFO" + +# LLM configuration for Metal kernel optimization +llm: + api_base: "http://101.6.160.66:11800/v1" + models: + - name: Qwen/Qwen3-32B + weight: 1.0 + timeout: 1000000 + +# Specialized prompt for Metal kernel optimization +prompt: + system_message: | + Act as an expert in operations research and algorithmic optimization, with a specialization in load balancing for distributed computing systems. + + #### Objective + + Your primary goal is to develop a Python function that solves a dynamic load balancing problem for Mixture-of-Experts (MoE) models. The task is to optimally redistribute computational workloads (tokens) from a set of `N` "logical experts" to `M` available "processing units" (slots), which are grouped across `K` physical GPUs. + + The core objective is to **minimize the maximum total load** on any single Graphics Processing Unit (GPU), a classic min-max optimization problem. + + #### Evaluation & Optimization Criteria (Critical) + + Your solution will be evaluated using a multi-objective score: `combined_score = (mean_score, speedup)` + + This score follows a lexicographical ordering: + + 1. **Primary objective** (`mean_score`): + - This reflects the **quality of your load balancing solution**, primarily driven by how well you minimize the maximum GPU load + - Lower maximum load → better balance → higher mean_score. + + 2. **Secondary objective** (`speedup`): + - This reflects the computational efficiency of your implementation. + - Faster execution (lower runtime) → higher speedup. + + 3. **Lexicographical priority**: + - You must **prioritize improving `mean_score` first**. + - Treat `mean_score` as strictly dominant. **Do not trade off `mean_score` for `speedup`.** + - Only when two solutions have **effectively identical `mean_score` (within numerical tolerance)**, `speedup` becomes the deciding factor. + + 4. **Validity is mandatory**: + - Validity overrides all objectives. Invalid solutions are strictly unacceptable. + - Any violation of constraints (flow conservation, placement, etc.) will result in `validity = 0`, making the solution unusable regardless of score. + - A correct but slower solution is strictly better than a fast but invalid one. + + Implications for Your Strategy: + + - Focus first on correct LP formulation and optimality. + - Optimize for `speedup` only after achieving a near-optimal or optimal `mean_score`. + - Avoid heuristics that sacrifice solution quality for speed. + - Once correctness and optimality are ensured: + - Reduce problem size (e.g., eliminate invalid (i, j) pairs early). + - Use efficient matrix construction and solver configurations. + - Prefer **deterministic, stable solutions** over aggressive approximations. + + #### Formal Mathematical Definition + + You are solving a **Linear Programming (LP)** problem defined as follows: + + * **Inputs**: + * An initial workload vector $L = [l_1, l_2, \dots, l_N]^T$, where $l_i$ is the number of tokens assigned to logical expert $e_i$. + * A hardware topology defined by the `physical_expert_placement` matrix of shape `(K, S)`, where `K` is the number of GPUs and `S` is the number of processing units (slots) per GPU. The total number of processing units is $M = K \times S$. + + * **Decision Variables**: + * $x_{ij}$: The number of tokens transferred from logical expert $e_i$ to processing unit $p_j$ (where $j$ is an index from $0$ to $M-1$). + + * **Objective Function**: + * Minimize $Z$, where $Z$ is an auxiliary variable representing the maximum load across all GPUs. + * **Minimize:** $Z$ + + * **Constraints**: + 1. **GPU Load Definition Constraint**: The final load on any GPU $G_k$, which is the sum of loads on all its constituent processing units, must not exceed $Z$. Let $\mathcal{S}(k)$ be the set of indices of processing units belonging to GPU $k$. + * $\sum_{j \in \mathcal{S}(k)} \sum_{i=1}^{N} x_{ij} - Z \le 0, \quad \forall k \in 1, \dots, K$ + 2. **Flow Conservation Constraint**: The total number of tokens transferred out of a logical expert $e_i$ must equal its initial workload $l_i$. + * $\sum_{j=1}^{M} x_{ij} = l_i, \quad \forall i \in 1, \dots, N$ + 3. **Non-negativity Constraint**: The number of transferred tokens cannot be negative. + * $x_{ij} \ge 0, \quad \forall i, j$ + 4. **Placement Constraint**: The workload from a logical expert $e_i$ can only be allocated to processing units that are its designated replicas. Let $\mathcal{P}(i)$ be the set of indices of processing units that are replicas of logical expert $e_i$. The allocation must satisfy: + * $x_{ij} = 0, \quad \forall i, \forall j \notin \mathcal{P}(i)$ + + #### Code Structure & API Contract + + Your code will be evaluated through a fixed entry point. You **must** implement a function with the following exact signature in your Python script: + + ```python + import numpy as np + from typing import Tuple + + def solve_lplb_policy( + initial_workloads: np.ndarray, + physical_expert_placement: np.ndarray + ) -> Tuple[np.ndarray, float]: + ``` + + This function must return: + + 1. `allocation_matrix`: A `numpy.ndarray` of shape `(N, M)`, representing the optimal allocation $x_{ij}$. `N` is the number of logical experts, and `M` is the total number of processing units. + 2. `minimized_max_load`: A `float` representing the minimized maximum load $Z$ from your optimal solution. + + The `physical_expert_placement` matrix is crucial for defining the valid targets for allocation and the grouping of processing units into GPUs. + The evaluation script will handle the scoring based on this function's output. **Do not change the function signature.** + + #### Constraints + + The returned solution **MUST** adhere to these rules: + + 1. **Flow Conservation**: The sum of allocated tokens for each logical expert must exactly match its initial workload. (`np.sum(allocation_matrix, axis=1)` must equal `initial_workloads`). + 2. **Non-negativity**: All elements in the `allocation_matrix` must be greater than or equal to zero. + 3. **Performance**: The entire process, for a moderately sized problem, must complete within a reasonable time limit (e.g., 60 seconds). The evaluation will be run in a sandboxed environment with a timeout. + 4. **Placement Correctness**: The workload from a logical expert `e_i` can **only** be allocated to processing units that are replicas of that same expert `e_i`. You cannot send tokens from expert `e_1` to a replica of expert `e_2`. Your solution must respect the mapping provided by `physical_expert_placement`. + + #### Strategic Guidance for Optimization + + To achieve a high-performing and correct solution, consider these strategies: + + * **Problem Formulation**: Recognize that this problem is a classic **Linear Program**. The most direct path to an optimal solution is to formulate and solve it as such. + * **Leverage Existing Solvers**: Instead of implementing an LP solver from scratch, you should use established, high-performance libraries. The Python ecosystem has excellent tools for this. + * A strong and common choice is the `scipy.optimize.linprog` function. It is well-documented and robust. + * Other powerful libraries like `cvxpy` or `PuLP` can also be used to model and solve this problem, often with a more intuitive syntax. + * **Matrix Formulation**: To use solvers like `scipy.optimize.linprog` efficiently, you will need to construct the standard LP matrices: the cost vector `c`, the inequality constraint matrix `A_ub`, the inequality constraint vector `b_ub`, the equality constraint matrix `A_eq`, and the equality constraint vector `b_eq`. Carefully map the mathematical constraints above into these matrix forms. Your formulation must correctly handle the Placement Constraint. A robust approach is to only create decision variables for the valid `(i, j)` pairs where unit `j` is a replica of expert `i`. This drastically reduces the problem size compared to creating variables for all `N*M` possible pairs. + * **Efficiency**: The way you construct these matrices can significantly impact performance. Use `numpy` vectorized operations where possible to build them quickly. + + Focus on evolving the core logic within `solve_lplb_policy` to be a robust and accurate LP solver for this specific load balancing task. Good luck! + + num_top_programs: 3 + num_diverse_programs: 2 + +# Database configuration +database: + population_size: 25 + archive_size: 12 + num_islands: 3 + elite_selection_ratio: 0.3 + exploitation_ratio: 0.65 + exploration_ratio: 0.35 + +# Evaluator configuration +evaluator: + timeout: 900 # 15 minutes for Metal kernel compilation and testing + parallel_evaluations: 1 + cascade_evaluation: false + +# Evolution settings +diff_based_evolution: false +allow_full_rewrites: true +max_code_length: 60000 + +max_tasks_per_child: 1 diff --git a/examples/moe_lb/evaluator.py b/examples/moe_lb/evaluator.py new file mode 100644 index 0000000000..48d692b3eb --- /dev/null +++ b/examples/moe_lb/evaluator.py @@ -0,0 +1,342 @@ +""" +Evaluator for MoE Load Balancing case. +Adopted from LoongFlow's original implementation: +https://github.com/baidu-baige/LoongFlow/tree/main/agents/math_agent/examples/moe_lb +""" + +import inspect +import json +import os +import pickle +import subprocess +import sys +import time +from dataclasses import dataclass, fields, is_dataclass +from functools import wraps +from typing import Any + +import numpy as np + +TARGET_IMBALANCE_RATIO = 1.01 +TIMEOUT_SECONDS = 60 +N_EVAL_EPOCHS = 100 # Runs 100 epochs for evaluation +N_EVAL_RUNS = 5 # Repeat the entire process 5 times to get an average speedup + + +class TimeoutError(Exception): + pass + + +@dataclass +class SingleRunResult: + score: float + imbalance_ratio: float + verified_max_load: float + + +@dataclass +class FailureArtifacts: + failure_stage: str + error_message: str + + +@dataclass +class SuccessArtifacts: + test_case_name: str + + +@dataclass +class Metrics: + mean: float + std: float + min: float | None = None + max: float | None = None + + +@dataclass +class EvalMetrics: + score: Metrics | None = None + imbalance_ratio: Metrics | None = None + runtime: Metrics | None = None + completed_runs: int = 0 + completed_epochs: int = 0 + + +@dataclass +class EvalResult: + validity: int + combined_score: tuple[float, float] # ? (mean_score, speedup) + summary: str + metrics: EvalMetrics + artifacts: dict[str, Any] + + +def to_dict_safe(obj): + if is_dataclass(obj): + return {f.name: to_dict_safe(getattr(obj, f.name)) for f in fields(obj)} + if isinstance(obj, np.ndarray): + return obj.tolist() + if isinstance(obj, np.generic): + return obj.item() + if isinstance(obj, dict): + return {k: to_dict_safe(v) for k, v in obj.items()} + if isinstance(obj, list): + return [to_dict_safe(v) for v in obj] + if isinstance(obj, tuple): + return tuple(to_dict_safe(v) for v in obj) + return obj + + +def return_asdict(func): + @wraps(func) + def wrapper(*args, **kwargs): + res = func(*args, **kwargs) + if is_dataclass(res): + return to_dict_safe(res) + return res + + return wrapper + + +def redundant_policy( + n_logical_experts: int, + ep_size: int, + n_redundants_per_rank: int, + workload_history: np.ndarray, +) -> np.ndarray: + """ + Determines expert placement across GPUs. + Each GPU hosts its primary experts plus replicas of high-load experts from others. + """ + n_original_per_rank = n_logical_experts // ep_size + n_slots_per_rank = n_original_per_rank + n_redundants_per_rank + placement = np.zeros((ep_size, n_slots_per_rank), dtype=int) + + # 1. Map primary experts to their native ranks + primary_experts = np.arange(n_logical_experts).reshape(ep_size, n_original_per_rank) + placement[:, :n_original_per_rank] = primary_experts + + # 2. Select top-N "hot" experts per rank using partition (faster than full sort) + rank_workloads = workload_history[primary_experts] + # np.argpartition puts the largest N elements at the end, then we sort only those + top_n_idx = np.argpartition(rank_workloads, -n_redundants_per_rank, axis=1)[ + :, -n_redundants_per_rank: + ] + # Refine: sort the top-N slice to ensure deterministic replica ordering + top_w = np.take_along_axis(rank_workloads, top_n_idx, axis=1) + refined_order = np.argsort(-top_w, axis=1) + top_indices = np.take_along_axis(top_n_idx, refined_order, axis=1) + + top_expert_ids = np.take_along_axis(primary_experts, top_indices, axis=1) + + # 3. Spread replicas to other ranks using a ring-shift pattern + for i in range(n_redundants_per_rank): + target_ranks = (np.arange(ep_size) + i + 1) % ep_size + placement[target_ranks, n_original_per_rank + i] = top_expert_ids[:, i] + + return placement + + +def _sandbox_exec(): + import importlib.util + import pickle + import sys + import traceback + + try: + args = pickle.load(sys.stdin.buffer) + program_path = args["program_path"] + inputs = args["inputs"] + + spec = importlib.util.spec_from_file_location("program", program_path) + program = importlib.util.module_from_spec(spec) + spec.loader.exec_module(program) + + alloc, max_load = program.solve_lplb_policy( + inputs["initial_workloads"], inputs["physical_expert_placement"] + ) + + result = {"validity": 1, "allocation_matrix": alloc, "minimized_max_load": max_load} + sys.stdout.buffer.write(pickle.dumps(result)) + + except Exception as e: + error_result = {"validity": 0, "error": str(e), "traceback": traceback.format_exc()} + sys.stdout.buffer.write(pickle.dumps(error_result)) + + sys.stdout.buffer.flush() + + +def run_in_sandbox(program_path: os.PathLike, input_data: dict) -> dict: + executor_source = inspect.getsource(_sandbox_exec) + + full_script = f"{executor_source}\nif __name__ == '__main__': _sandbox_exec()" + + process = subprocess.Popen( + [sys.executable, "-c", full_script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + try: + payload = {"program_path": str(program_path), "inputs": input_data} + stdout_data, stderr_data = process.communicate( + input=pickle.dumps(payload), timeout=TIMEOUT_SECONDS + ) + + if process.returncode != 0: + raise RuntimeError( + f"Sandbox process crashed with exit code {process.returncode}. Stderr: {stderr_data.decode()}" + ) + if not stdout_data: + raise RuntimeError("Sandbox produced no output. It might have been killed.") + + result = pickle.loads(stdout_data) + + if result["validity"] == 0: + raise RuntimeError(f"Sandbox Error: {result['error']}\n{result['traceback']}") + + return result + + except subprocess.TimeoutExpired: + process.kill() + process.communicate() + raise TimeoutError(f"User program timed out after {TIMEOUT_SECONDS}s") + except Exception as e: + if process.poll() is None: + process.kill() + raise e + + +def run_single_evaluation(program_path: str) -> SingleRunResult: + # --- 1. Scenario Synthesis --- + n_logical_experts, ep_size, n_redundants_per_rank = 256, 16, 4 + workload_history = np.random.rand(n_logical_experts) * (2**20) + placement = redundant_policy( + n_logical_experts, ep_size, n_redundants_per_rank, workload_history + ) + initial_workloads = np.random.rand(n_logical_experts) * (2**12) + + # --- 2. Sandbox Execution --- + results: dict[str, Any] = run_in_sandbox( + program_path, + { + "initial_workloads": initial_workloads, + "physical_expert_placement": placement, + }, + ) + + alloc_matrix: np.ndarray = results["allocation_matrix"] + + # --- 3. Verification & Metrics --- + # Constraint A: Flow conservation (Workload must be fully processed) + if not np.allclose(alloc_matrix.sum(axis=1), initial_workloads, atol=1e-7): + raise ValueError("Conservation failed: Load assigned does not match workload.") + + # Constraint B: Non-negativity + if (alloc_matrix < -1e-9).any(): + raise ValueError("Non-negativity failed: Found negative allocations.") + + # Constraint C: Placement (Expert replica must exist on the GPU slot) + # Optimized: Use sparse-style check to avoid massive O(N*M) mask creation + active_rows, active_cols = np.nonzero(alloc_matrix) + flat_placement = placement.flatten() + if not np.all(active_rows == flat_placement[active_cols]): + raise ValueError("Placement failed: Expert assigned to a GPU where it isn't stored.") + + # --- 4. Performance Scoring --- + slot_loads = alloc_matrix.sum(axis=0) + gpu_loads = slot_loads.reshape(ep_size, -1).sum(axis=1) + verified_max_load = np.max(gpu_loads) + + avg_load = initial_workloads.sum() / ep_size + imbalance_ratio = verified_max_load / avg_load if avg_load > 0 else 0.0 + + # Score capped at 1.0, decreases as imbalance exceeds target + score = min(1.0, TARGET_IMBALANCE_RATIO / imbalance_ratio) if imbalance_ratio > 0 else 0.0 + + return SingleRunResult( + score=score, imbalance_ratio=imbalance_ratio, verified_max_load=verified_max_load + ) + + +def summarize(data_list: list[float], include_minmax: bool = True) -> Metrics: + return Metrics( + mean=float(np.mean(data_list)), + std=float(np.std(data_list)), + min=float(np.min(data_list)) if include_minmax else None, + max=float(np.max(data_list)) if include_minmax else None, + ) + + +@return_asdict +def evaluate(path_user_py: str) -> dict[str, Any]: + score_list, imbalance_ratio_list, time_list = [], [], [] + + for i in range(N_EVAL_RUNS): + tstart = time.time() + for j in range(N_EVAL_EPOCHS): + try: + result = run_single_evaluation(path_user_py) + + score_list.append(result.score) + imbalance_ratio_list.append(result.imbalance_ratio) + + except (TimeoutError, RuntimeError, TypeError, ValueError) as e: + eval_time = time.time() - tstart + return EvalResult( + validity=0, + combined_score=(0.0, 0.0), + summary=f"[Run {i + 1}] Evaluation failed on epoch {j + 1}/{N_EVAL_EPOCHS}: {str(e)}", + metrics=EvalMetrics( + runtime=Metrics(mean=eval_time, std=0.0, min=eval_time, max=eval_time), + completed_runs=i, + ), + artifacts=FailureArtifacts( + failure_stage="single_run_execution", + error_message=str(e), + ), + ) + tend = time.time() + time_list.append(tend - tstart) + + score = summarize(score_list) + imbalance_ratio = summarize(imbalance_ratio_list, False) + runtime = summarize(time_list) + + speedup = round(TIMEOUT_SECONDS / runtime.mean, 3) + + return EvalResult( + validity=1, + combined_score=(round(score.mean, 3), round(speedup, 3)), + summary=( + f"Evaluation successful across {N_EVAL_RUNS} * {N_EVAL_EPOCHS} executions in average {runtime.mean:.2f} seconds. " + f"Mean score: {score.mean:.4f}, " + f"Mean imbalance: {imbalance_ratio.mean:.4f}" + ), + metrics=EvalMetrics( + score=score, + imbalance_ratio=imbalance_ratio, + runtime=runtime, + completed_runs=N_EVAL_RUNS, + completed_epochs=N_EVAL_RUNS * N_EVAL_EPOCHS, + ), + artifacts=SuccessArtifacts(test_case_name=f"Dynamic_Avg_{N_EVAL_RUNS}_Runs"), + ) + + +if __name__ == "__main__": + program_file = "initial_program.py" + # program_file = "archives/loongflow_best_lp.py" + # program_file = "archives/loongflow_best_common.py" + if not os.path.exists(program_file): + print(f"Error: File not found at {program_file}") + else: + print( + f"--- Evaluating {program_file} ({N_EVAL_RUNS} runs, {N_EVAL_EPOCHS} epochs per run) ---" + ) + report = evaluate(program_file) + print(json.dumps(report, ensure_ascii=False, indent=2)) + combined_score: tuple[float, float] = report.get("combined_score", (0.0, 0.0)) + print(f"\nFinal Mean Score: {combined_score[0]:.3f}") + print(f"Speedup Factor: {combined_score[1]:.3f}x") diff --git a/examples/moe_lb/initial_program.py b/examples/moe_lb/initial_program.py new file mode 100644 index 0000000000..8cfdf2bde5 --- /dev/null +++ b/examples/moe_lb/initial_program.py @@ -0,0 +1,59 @@ +# EVOLVE-BLOCK-START +"""moe lb solver""" + +import numpy as np + + +def solve_lplb_policy( + initial_workloads: np.ndarray, physical_expert_placement: np.ndarray +) -> tuple[np.ndarray, float]: + """ + Calculate and return optimal load allocation matrix & minimized maximum load from initial load and expert deployment + + Args: + initial_workloads (np.ndarray): Shape (N,) 1D array, N is the number of logical experts. + Represent the initial load of each logical expert + physical_expert_placement (np.ndarray): Shape (ep_size, n_slots_per_rank) 2D array + Define the topology of M processing units + + Returns: + Tuple[np.ndarray, float]: + - allocation_matrix (np.ndarray): Shape(N, M) 2D array, M is the total number of processing units. + Represents the allocation amount x_ij from expert i to processing unit j. + - minimized_max_load (float): Minimized maximum load Z. + """ + + n_logical_experts = initial_workloads.shape[0] + ep_size, n_slots_per_rank = physical_expert_placement.shape + n_total_slots = ep_size * n_slots_per_rank # M, Total number of processing units + + # 1. Calculate the number of replicas for each logical expert + # np.bincount efficiently counts occurrences of each expert ID (0, 1, ..., N-1) + flat_placement = physical_expert_placement.flatten() + replica_counts = np.bincount(flat_placement, minlength=n_logical_experts) + + # 2. Calculate allocation matrix x_ij + allocation_matrix = np.zeros((n_logical_experts, n_total_slots)) + for expert_id in range(n_logical_experts): + workload = initial_workloads[expert_id] + num_replicas = replica_counts[expert_id] + + if num_replicas > 0: + workload_per_replica = workload / num_replicas + # Find the linear indices of all replicas of this expert + replica_indices = np.where(flat_placement == expert_id)[0] + # Fill the allocation matrix with the evenly distributed load + allocation_matrix[expert_id, replica_indices] = workload_per_replica + + # 3. Calculate final GPU loads and find the maximum load Z + # First, calculate the total load of each processing unit (slot) + final_slot_loads = np.sum(allocation_matrix, axis=0) + # Then, aggregate the slot loads by GPU + gpu_loads = final_slot_loads.reshape((ep_size, n_slots_per_rank)).sum(axis=1) + # Finally, find the maximum load among all GPUs + minimized_max_load = np.max(gpu_loads) if gpu_loads.size > 0 else 0.0 + + return allocation_matrix, float(minimized_max_load) + + +# EVOLVE-BLOCK-END diff --git a/examples/moe_lb/requirements.txt b/examples/moe_lb/requirements.txt new file mode 100644 index 0000000000..09ac99cfb9 --- /dev/null +++ b/examples/moe_lb/requirements.txt @@ -0,0 +1,5 @@ +numpy +scipy +numba +matplotlib +cvxpy diff --git a/examples/moe_lb/run.sh b/examples/moe_lb/run.sh new file mode 100644 index 0000000000..54f3050fcd --- /dev/null +++ b/examples/moe_lb/run.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +WORKLOAD_DIR="examples/moe_lb" + +python ./openevolve-run.py \ + $WORKLOAD_DIR/initial_program.py \ + $WORKLOAD_DIR/evaluator.py \ + -c $WORKLOAD_DIR/config.yaml \ + -o out/moe_lb diff --git a/openevolve/api.py b/openevolve/api.py index 65c3702064..0ea18a3e4a 100644 --- a/openevolve/api.py +++ b/openevolve/api.py @@ -3,17 +3,18 @@ """ import asyncio -import tempfile +import inspect import os +import tempfile import uuid -import inspect -from typing import Union, Callable, Optional, List, Dict, Any, Tuple from dataclasses import dataclass from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from openevolve.config import Config, LLMModelConfig, load_config from openevolve.controller import OpenEvolve -from openevolve.config import Config, load_config, LLMModelConfig from openevolve.database import Program +from openevolve.utils.format_utils import format_score @dataclass @@ -21,13 +22,13 @@ class EvolutionResult: """Result of an evolution run""" best_program: Optional[Program] - best_score: float + best_score: float | tuple[float, ...] | list[float] best_code: str metrics: Dict[str, Any] output_dir: Optional[str] def __repr__(self): - return f"EvolutionResult(best_score={self.best_score:.4f})" + return f"EvolutionResult(best_score={format_score(self.best_score)})" def run_evolution( diff --git a/openevolve/database.py b/openevolve/database.py index eca5eab0bb..06bb79a496 100644 --- a/openevolve/database.py +++ b/openevolve/database.py @@ -19,7 +19,8 @@ from openevolve.config import DatabaseConfig from openevolve.utils.code_utils import calculate_edit_distance -from openevolve.utils.metrics_utils import safe_numeric_average, get_fitness_score +from openevolve.utils.format_utils import format_score +from openevolve.utils.metrics_utils import get_fitness_score, safe_numeric_average logger = logging.getLogger(__name__) @@ -47,7 +48,9 @@ class Program: # Program identification id: str code: str - changes_description: str = "" # compact program changes description (via LLM) stored per program + changes_description: str = ( + "" # compact program changes description (via LLM) stored per program + ) language: str = "python" # Evolution information @@ -298,9 +301,7 @@ def add( if feature_key not in island_feature_map: # New cell occupation in this island - logger.info( - "New MAP-Elites cell occupied in island %d: %s", island_idx, coords_dict - ) + logger.info(f"New MAP-Elites cell occupied in island {island_idx}: {coords_dict}") # Check coverage milestone for this island total_possible_cells = self.feature_bins ** len(self.config.feature_dimensions) island_coverage = (len(island_feature_map) + 1) / total_possible_cells @@ -321,12 +322,11 @@ def add( existing_fitness = get_fitness_score( existing_program.metrics, self.config.feature_dimensions ) + new_fitness_str = format_score(new_fitness) + existing_fitness_str = format_score(existing_fitness) + logger.info( - "Island %d MAP-Elites cell improved: %s (fitness: %.3f -> %.3f)", - island_idx, - coords_dict, - existing_fitness, - new_fitness, + f"Island {island_idx} MAP-Elites cell improved: {coords_dict} (fitness: {existing_fitness_str} -> {new_fitness_str})" ) # use MAP-Elites to manage archive @@ -993,6 +993,7 @@ def _llm_judge_novelty(self, program: Program, similar_program: Program) -> bool Use LLM to judge if a program is novel compared to a similar existing program """ import asyncio + from openevolve.novelty_judge import NOVELTY_SYSTEM_MSG, NOVELTY_USER_MSG user_msg = NOVELTY_USER_MSG.format( @@ -1081,9 +1082,7 @@ def _is_novel(self, program_id: int, island_idx: int) -> bool: other = self.programs[pid] if other.embedding is None: - logger.warning( - f"Program {other.id} has no embedding, skipping similarity check" - ) + logger.warning(f"Program {other.id} has no embedding, skipping similarity check") continue similarity = self._cosine_similarity(embd, other.embedding) @@ -1208,9 +1207,9 @@ def _update_best_program(self, program: Program) -> None: if "combined_score" in program.metrics and "combined_score" in current_best.metrics: old_score = current_best.metrics["combined_score"] new_score = program.metrics["combined_score"] - score_diff = new_score - old_score + score_diff = np.subtract(new_score, old_score) logger.info( - f"New best program {program.id} replaces {old_id} (combined_score: {old_score:.4f} → {new_score:.4f}, +{score_diff:.4f})" + f"New best program {program.id} replaces {old_id} (combined_score: {format_score(old_score)} → {format_score(new_score)}, +{format_score(score_diff)})" ) else: logger.info(f"New best program {program.id} replaces {old_id}") @@ -1257,10 +1256,10 @@ def _update_island_best_program(self, program: Program, island_idx: int) -> None ): old_score = current_island_best.metrics["combined_score"] new_score = program.metrics["combined_score"] - score_diff = new_score - old_score + score_diff = np.subtract(new_score, old_score) logger.debug( f"Island {island_idx}: New best program {program.id} replaces {old_id} " - f"(combined_score: {old_score:.4f} → {new_score:.4f}, +{score_diff:.4f})" + f"(combined_score: {format_score(old_score)} → {format_score(new_score)}, +{format_score(score_diff)})" ) else: logger.debug( @@ -1983,9 +1982,19 @@ def get_island_stats(self) -> List[dict]: get_fitness_score(p.metrics, self.config.feature_dimensions) for p in island_programs ] + if scores: + best_score = max(scores) + scores_np = np.array(scores) + avg_score: np.generic = scores_np.mean(axis=0) + + if isinstance(best_score, (tuple, list)): + avg_score = type(best_score)(avg_score.tolist()) + else: + avg_score = float(avg_score) + else: + best_score = 0.0 + avg_score = 0.0 - best_score = max(scores) if scores else 0.0 - avg_score = sum(scores) / len(scores) if scores else 0.0 diversity = self._calculate_island_diversity(island_programs) else: best_score = avg_score = diversity = 0.0 @@ -2323,7 +2332,7 @@ def log_island_status(self) -> None: best_indicator = f" (best: {island_best_id})" if island_best_id else "" logger.info( f"{current_marker} Island {stat['island']}: {stat['population_size']} programs, " - f"best={stat['best_score']:.4f}, avg={stat['average_score']:.4f}, " + f"best={format_score(stat['best_score'])}, avg={format_score(stat['average_score'])}, " f"diversity={stat['diversity']:.2f}, gen={stat['generation']}{best_indicator}" ) diff --git a/openevolve/process_parallel.py b/openevolve/process_parallel.py index a2fd6592a9..0cf8b6d872 100644 --- a/openevolve/process_parallel.py +++ b/openevolve/process_parallel.py @@ -14,9 +14,12 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple +import numpy as np + from openevolve.config import Config from openevolve.database import Program, ProgramDatabase -from openevolve.utils.metrics_utils import safe_numeric_average +from openevolve.utils.format_utils import format_score +from openevolve.utils.metrics_utils import broadcast_value, safe_numeric_average logger = logging.getLogger(__name__) @@ -704,15 +707,27 @@ async def run_evolution( ) current_score = safe_numeric_average(child_program.metrics) - if current_score is not None and isinstance(current_score, (int, float)): + if current_score is not None: # Check for improvement if self.config.early_stopping_patience > 0: - improvement = current_score - best_score - if improvement >= self.config.convergence_threshold: + convergence_threshold = self.config.convergence_threshold + convergence_threshold = broadcast_value( + convergence_threshold, current_score + ) + + # if the score is a list/tuple, we need to compare element-wise and check if all elements meet the threshold + if best_score == float("-inf"): + best_score = broadcast_value(best_score, current_score) + + improvement = np.subtract(current_score, best_score) + if isinstance(improvement, np.ndarray): + improvement = type(current_score)(improvement.tolist()) + + if improvement >= convergence_threshold: best_score = current_score iterations_without_improvement = 0 logger.debug( - f"New best score: {best_score:.4f} (improvement: {improvement:+.4f})" + f"New best score: {format_score(best_score)} (improvement: {format_score(improvement)})" ) else: iterations_without_improvement += 1 @@ -729,17 +744,17 @@ async def run_evolution( logger.info( f"🛑 Early stopping triggered at iteration {completed_iteration}: " f"No improvement for {iterations_without_improvement} iterations " - f"(best score: {best_score:.4f})" + f"(best score: {format_score(best_score)})" ) break else: # Event-based early stopping - if current_score == self.config.convergence_threshold: + if np.allclose(current_score, convergence_threshold): best_score = current_score logger.info( f"🛑 Early stopping (event-based) triggered at iteration {completed_iteration}: " - f"Task successfully solved with score {best_score:.4f}." + f"Task successfully solved with score {format_score(best_score)}." ) self.early_stopping_triggered = True break diff --git a/openevolve/prompt/sampler.py b/openevolve/prompt/sampler.py index 61a5b98ba0..4cb4d71c72 100644 --- a/openevolve/prompt/sampler.py +++ b/openevolve/prompt/sampler.py @@ -6,13 +6,16 @@ import random from typing import Any, Dict, List, Optional, Tuple, Union +import numpy as np + from openevolve.config import PromptConfig from openevolve.prompt.templates import TemplateManager -from openevolve.utils.format_utils import format_metrics_safe +from openevolve.utils.format_utils import format_metrics_safe, format_score from openevolve.utils.metrics_utils import ( - safe_numeric_average, - get_fitness_score, + broadcast_value, format_feature_coordinates, + get_fitness_score, + safe_numeric_average, ) logger = logging.getLogger(__name__) @@ -110,11 +113,17 @@ def build_prompt( if self.config.programs_as_changes_description: if self.config.system_message_changes_description: - system_message_changes_description = self.config.system_message_changes_description.strip() + system_message_changes_description = ( + self.config.system_message_changes_description.strip() + ) else: - system_message_changes_description = self.template_manager.get_template("system_message_changes_description") + system_message_changes_description = self.template_manager.get_template( + "system_message_changes_description" + ) - system_message = self.template_manager.get_template("system_message_with_changes_description").format( + system_message = self.template_manager.get_template( + "system_message_with_changes_description" + ).format( system_message=system_message, system_message_changes_description=system_message_changes_description, ) @@ -149,7 +158,7 @@ def build_prompt( # Format the final user message user_message = user_template.format( metrics=metrics_str, - fitness_score=f"{fitness_score:.4f}", + fitness_score=format_score(fitness_score), feature_coords=feature_coords, feature_dimensions=", ".join(feature_dimensions) if feature_dimensions else "None", improvement_areas=improvement_areas, @@ -161,7 +170,9 @@ def build_prompt( ) if self.config.programs_as_changes_description: - user_message = self.template_manager.get_template("user_message_with_changes_description").format( + user_message = self.template_manager.get_template( + "user_message_with_changes_description" + ).format( user_message=user_message, changes_description=current_changes_description.rstrip(), ) @@ -206,19 +217,24 @@ def _identify_improvement_areas( prev_metrics = previous_programs[-1].get("metrics", {}) prev_fitness = get_fitness_score(prev_metrics, feature_dimensions) + current_score_str = format_score(current_fitness) + prev_score_str = format_score(prev_fitness) + if current_fitness > prev_fitness: - msg = self.template_manager.get_fragment( - "fitness_improved", prev=prev_fitness, current=current_fitness - ) - improvement_areas.append(msg) + key = "fitness_improved" + kwargs = dict(prev=prev_score_str, current=current_score_str) elif current_fitness < prev_fitness: - msg = self.template_manager.get_fragment( - "fitness_declined", prev=prev_fitness, current=current_fitness - ) - improvement_areas.append(msg) - elif abs(current_fitness - prev_fitness) < 1e-6: # Essentially unchanged - msg = self.template_manager.get_fragment("fitness_stable", current=current_fitness) - improvement_areas.append(msg) + key = "fitness_declined" + kwargs = dict(prev=prev_score_str, current=current_score_str) + # Essentially unchanged + elif np.allclose(current_fitness, prev_fitness, atol=1e-6, equal_nan=True): + key = "fitness_stable" + kwargs = dict(current=current_score_str) + else: + key = None + + if key: + improvement_areas.append(self.template_manager.get_fragment(key, **kwargs)) # Note feature exploration (not good/bad, just informational) if feature_dimensions: @@ -265,11 +281,8 @@ def _format_evolution_history( for i, program in enumerate(reversed(selected_previous)): attempt_number = len(previous_programs) - i - changes = ( - program.get("changes_description") - or program.get("metadata", {}).get( - "changes", self.template_manager.get_fragment("attempt_unknown_changes") - ) + changes = program.get("changes_description") or program.get("metadata", {}).get( + "changes", self.template_manager.get_fragment("attempt_unknown_changes") ) # Format performance metrics using safe formatting @@ -334,9 +347,7 @@ def _format_evolution_history( for i, program in enumerate(selected_top): use_changes = self.config.programs_as_changes_description program_code = ( - program.get("changes_description", "") - if use_changes - else program.get("code", "") + program.get("changes_description", "") if use_changes else program.get("code", "") ) if not program_code: program_code = "" if use_changes else "" @@ -351,18 +362,27 @@ def _format_evolution_history( for name, value in program.get("metrics", {}).items(): if isinstance(value, (int, float)): try: - key_features.append(self.template_manager.get_fragment("top_program_metrics_prefix") + f" {name} ({value:.4f})") + key_features.append( + self.template_manager.get_fragment("top_program_metrics_prefix") + + f" {name} ({value:.4f})" + ) except (ValueError, TypeError): - key_features.append(self.template_manager.get_fragment("top_program_metrics_prefix") + f" {name} ({value})") + key_features.append( + self.template_manager.get_fragment("top_program_metrics_prefix") + + f" {name} ({value})" + ) else: - key_features.append(self.template_manager.get_fragment("top_program_metrics_prefix") + f" {name} ({value})") + key_features.append( + self.template_manager.get_fragment("top_program_metrics_prefix") + + f" {name} ({value})" + ) key_features_str = ", ".join(key_features) top_programs_str += ( top_program_template.format( program_number=i + 1, - score=f"{score:.4f}", + score=format_score(score), language=("text" if self.config.programs_as_changes_description else language), program_snippet=program_code, key_features=key_features_str, @@ -385,7 +405,11 @@ def _format_evolution_history( # Use random sampling to get diverse programs diverse_programs = random.sample(remaining_programs, num_diverse) - diverse_programs_str += "\n\n## " + self.template_manager.get_fragment("diverse_programs_title") + "\n\n" + diverse_programs_str += ( + "\n\n## " + + self.template_manager.get_fragment("diverse_programs_title") + + "\n\n" + ) for i, program in enumerate(diverse_programs): use_changes = self.config.programs_as_changes_description @@ -404,10 +428,10 @@ def _format_evolution_history( key_features = program.get("key_features", []) if not key_features: key_features = [ - self.template_manager.get_fragment("diverse_program_metrics_prefix") + f" {name}" - for name in list(program.get("metrics", {}).keys())[ - :2 - ] # Just first 2 metrics + self.template_manager.get_fragment("diverse_program_metrics_prefix") + + f" {name}" + # Just first 2 metrics + for name in list(program.get("metrics", {}).keys())[:2] ] key_features_str = ", ".join(key_features) @@ -415,8 +439,10 @@ def _format_evolution_history( diverse_programs_str += ( top_program_template.format( program_number=f"D{i + 1}", - score=f"{score:.4f}", - language=("text" if self.config.programs_as_changes_description else language), + score=format_score(score), + language=( + "text" if self.config.programs_as_changes_description else language + ), program_snippet=program_code, key_features=key_features_str, ) @@ -466,9 +492,7 @@ def _format_inspirations_section( for i, program in enumerate(inspirations): use_changes = self.config.programs_as_changes_description program_code = ( - program.get("changes_description", "") - if use_changes - else program.get("code", "") + program.get("changes_description", "") if use_changes else program.get("code", "") ) if not program_code: program_code = "" if use_changes else "" @@ -485,7 +509,7 @@ def _format_inspirations_section( inspiration_programs_str += ( inspiration_program_template.format( program_number=i + 1, - score=f"{score:.4f}", + score=format_score(score), program_type=program_type, language=("text" if self.config.programs_as_changes_description else language), program_snippet=program_code, @@ -520,12 +544,17 @@ def _determine_program_type( return self.template_manager.get_fragment("inspiration_type_migrant") if metadata.get("random", False): return self.template_manager.get_fragment("inspiration_type_random") + + HIGH_SCORE = broadcast_value(0.8, score) + MED_SCORE = broadcast_value(0.6, score) + LOW_SCORE = broadcast_value(0.4, score) + # Classify based on score ranges - if score >= 0.8: + if score >= HIGH_SCORE: return self.template_manager.get_fragment("inspiration_type_score_high_performer") - elif score >= 0.6: + elif score >= MED_SCORE: return self.template_manager.get_fragment("inspiration_type_score_alternative") - elif score >= 0.4: + elif score >= LOW_SCORE: return self.template_manager.get_fragment("inspiration_type_score_experimental") else: return self.template_manager.get_fragment("inspiration_type_score_exploratory") @@ -551,16 +580,24 @@ def _extract_unique_features(self, program: Dict[str, Any]) -> str: and self.config.include_changes_under_chars and len(changes) < self.config.include_changes_under_chars ): - features.append(self.template_manager.get_fragment("inspiration_changes_prefix").format(changes=changes)) + features.append( + self.template_manager.get_fragment("inspiration_changes_prefix").format( + changes=changes + ) + ) # Analyze metrics for standout characteristics metrics = program.get("metrics", {}) for metric_name, value in metrics.items(): if isinstance(value, (int, float)): if value >= 0.9: - features.append(f"{self.template_manager.get_fragment('inspiration_metrics_excellent').format(metric_name=metric_name, value=value)}") + features.append( + f"{self.template_manager.get_fragment('inspiration_metrics_excellent').format(metric_name=metric_name, value=value)}" + ) elif value <= 0.3: - features.append(f"{self.template_manager.get_fragment('inspiration_metrics_alternative').format(metric_name=metric_name)}") + features.append( + f"{self.template_manager.get_fragment('inspiration_metrics_alternative').format(metric_name=metric_name)}" + ) # Code-based features (simple heuristics) code = program.get("code", "") @@ -571,22 +608,32 @@ def _extract_unique_features(self, program: Dict[str, Any]) -> str: if "numpy" in code_lower or "np." in code_lower: features.append(self.template_manager.get_fragment("inspiration_code_with_numpy")) if "for" in code_lower and "while" in code_lower: - features.append(self.template_manager.get_fragment("inspiration_code_with_mixed_iteration")) + features.append( + self.template_manager.get_fragment("inspiration_code_with_mixed_iteration") + ) if ( self.config.concise_implementation_max_lines and len(code.split("\n")) <= self.config.concise_implementation_max_lines ): - features.append(self.template_manager.get_fragment("inspiration_code_with_concise_line")) + features.append( + self.template_manager.get_fragment("inspiration_code_with_concise_line") + ) elif ( self.config.comprehensive_implementation_min_lines and len(code.split("\n")) >= self.config.comprehensive_implementation_min_lines ): - features.append(self.template_manager.get_fragment("inspiration_code_with_comprehensive_line")) + features.append( + self.template_manager.get_fragment("inspiration_code_with_comprehensive_line") + ) # Default if no specific features found if not features: program_type = self._determine_program_type(program) - features.append(self.template_manager.get_fragment("inspiration_no_features_postfix").format(program_type=program_type)) + features.append( + self.template_manager.get_fragment("inspiration_no_features_postfix").format( + program_type=program_type + ) + ) # Use num_top_programs as limit for features (similar to how we limit programs) feature_limit = self.config.num_top_programs @@ -629,7 +676,12 @@ def _render_artifacts(self, artifacts: Dict[str, Union[str, bytes]]) -> str: sections.append(f"### {key}\n```\n{content}\n```") if sections: - return "## " + self.template_manager.get_fragment("artifact_title") + "\n\n" + "\n\n".join(sections) + return ( + "## " + + self.template_manager.get_fragment("artifact_title") + + "\n\n" + + "\n\n".join(sections) + ) else: return "" diff --git a/openevolve/prompts/defaults/fragments.json b/openevolve/prompts/defaults/fragments.json index b59410405a..52a6462eaa 100644 --- a/openevolve/prompts/defaults/fragments.json +++ b/openevolve/prompts/defaults/fragments.json @@ -1,7 +1,7 @@ { - "fitness_improved": "Fitness improved: {prev:.4f} → {current:.4f}", - "fitness_declined": "Fitness declined: {prev:.4f} → {current:.4f}. Consider revising recent changes.", - "fitness_stable": "Fitness unchanged at {current:.4f}", + "fitness_improved": "Fitness improved: {prev} → {current}", + "fitness_declined": "Fitness declined: {prev} → {current}. Consider revising recent changes.", + "fitness_stable": "Fitness unchanged at {current}", "exploring_region": "Exploring {features} region of solution space", "metrics_label": "Metrics: {metrics}", "outcome_all_improved": "All metrics improved", diff --git a/openevolve/utils/format_utils.py b/openevolve/utils/format_utils.py index 4dd83c7ac0..538f01b669 100644 --- a/openevolve/utils/format_utils.py +++ b/openevolve/utils/format_utils.py @@ -4,6 +4,8 @@ from typing import Any, Dict +import numpy as np + def format_metrics_safe(metrics: Dict[str, Any]) -> str: """ @@ -35,6 +37,9 @@ def format_metrics_safe(metrics: Dict[str, Any]) -> str: return ", ".join(formatted_parts) +DTYPES = (int, float, tuple, list, np.ndarray) + + def format_improvement_safe(parent_metrics: Dict[str, Any], child_metrics: Dict[str, Any]) -> str: """ Safely format improvement metrics for logging. @@ -54,12 +59,21 @@ def format_improvement_safe(parent_metrics: Dict[str, Any], child_metrics: Dict[ if metric in parent_metrics: parent_value = parent_metrics[metric] # Only calculate improvement for numeric values - if isinstance(child_value, (int, float)) and isinstance(parent_value, (int, float)): + if isinstance(child_value, DTYPES) and isinstance(parent_value, DTYPES): try: - diff = child_value - parent_value - improvement_parts.append(f"{metric}={diff:+.4f}") + diff = np.subtract(child_value, parent_value) + improvement_parts.append(f"{metric}={format_score(diff)}") except (ValueError, TypeError): # Skip non-numeric comparisons continue return ", ".join(improvement_parts) + + +def format_score(x: tuple[float, ...] | list[float] | float) -> str: + try: + if isinstance(x, (tuple, list, np.ndarray)): + return "(" + ", ".join(f"{float(v):.4f}" for v in x) + ")" + return f"{float(x):.4f}" + except (TypeError, ValueError): + return str(x) diff --git a/openevolve/utils/metrics_utils.py b/openevolve/utils/metrics_utils.py index 3efd18e25b..43caf6ed4e 100644 --- a/openevolve/utils/metrics_utils.py +++ b/openevolve/utils/metrics_utils.py @@ -68,7 +68,7 @@ def safe_numeric_sum(metrics: Dict[str, Any]) -> float: def get_fitness_score( metrics: Dict[str, Any], feature_dimensions: Optional[List[str]] = None -) -> float: +) -> float | list[float] | tuple[float, ...]: """ Calculate fitness score, excluding MAP-Elites feature dimensions @@ -88,7 +88,11 @@ def get_fitness_score( # Always prefer combined_score if available if "combined_score" in metrics: try: - return float(metrics["combined_score"]) + v = metrics["combined_score"] + if isinstance(v, (int, float)): + return float(v) + elif isinstance(v, (list, tuple)): + return metrics["combined_score"] except (ValueError, TypeError): pass @@ -143,3 +147,10 @@ def format_feature_coordinates(metrics: Dict[str, Any], feature_dimensions: List return "" return ", ".join(feature_values) + + +def broadcast_value(value: int | float, x: tuple[float, ...] | list[float] | float | int): + if isinstance(x, (tuple, list)): + return type(x)([value] * len(x)) + else: + return value diff --git a/tests/test_database.py b/tests/test_database.py index d9677dcb47..4293a54dff 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -4,12 +4,57 @@ import unittest import uuid + from openevolve.config import Config from openevolve.database import Program, ProgramDatabase class TestProgramDatabase(unittest.TestCase): - """Tests for program database""" + def test_get_island_stats_best_and_avg_score(self): + """Test get_island_stats best_score and average_score logic for float, list, tuple""" + from openevolve.database import Program + + # single-island float + db = self.db + db.islands = [set()] + db.island_best_programs = [None] + db.current_island = 0 + db.programs.clear() + p1 = Program(id="f1", code="a", metrics={"combined_score": 1.0}) + p2 = Program(id="f2", code="b", metrics={"combined_score": 2.0}) + db.add(p1) + db.add(p2) + stats = db.get_island_stats()[0] + self.assertEqual(stats["best_score"], 2.0) + self.assertAlmostEqual(stats["average_score"], 1.5) + + # single-island list (lexicographic best, element-wise average) + db = self.db + db.islands = [set()] + db.island_best_programs = [None] + db.current_island = 0 + db.programs.clear() + p3 = Program(id="l1", code="a", metrics={"combined_score": [1.0, 2.0]}) + p4 = Program(id="l2", code="b", metrics={"combined_score": [3.0, 0.0]}) + db.add(p3) + db.add(p4) + stats = db.get_island_stats()[0] + self.assertEqual(stats["best_score"], [3.0, 0.0]) + self.assertEqual(stats["average_score"], [2.0, 1.0]) + + # single-island tuple (lexicographic best, element-wise average) + db = self.db + db.islands = [set()] + db.island_best_programs = [None] + db.current_island = 0 + db.programs.clear() + p5 = Program(id="t1", code="a", metrics={"combined_score": (1.0, 2.0, 3.0)}) + p6 = Program(id="t2", code="b", metrics={"combined_score": (3.0, 0.0, 4.0)}) + db.add(p5) + db.add(p6) + stats = db.get_island_stats()[0] + self.assertEqual(stats["best_score"], (3.0, 0.0, 4.0)) + self.assertEqual(stats["average_score"], (2.0, 1.0, 3.5)) def setUp(self): """Set up test database""" @@ -183,12 +228,16 @@ def test_feature_map_operations(self): all_feature_map_values.extend(island_map.values()) # At least one of our test programs should be in some island's feature map - test_programs_in_map = [v for v in all_feature_map_values if v in ["map_test1", "map_test2"]] + test_programs_in_map = [ + v for v in all_feature_map_values if v in ["map_test1", "map_test2"] + ] self.assertGreater( - len(test_programs_in_map), 0, "At least one test program should be in island feature maps" + len(test_programs_in_map), + 0, + "At least one test program should be in island feature maps", ) - # If both are in the same island's map with the same feature coordinates, + # If both are in the same island's map with the same feature coordinates, # verify the better program is kept for island_map in self.db.island_feature_maps: if "map_test1" in island_map.values() and "map_test2" in island_map.values(): @@ -199,7 +248,7 @@ def test_feature_map_operations(self): key1 = k elif v == "map_test2": key2 = k - + # If they have the same key, the better program should be kept if key1 == key2: self.assertEqual(island_map[key1], "map_test2") @@ -505,11 +554,15 @@ def test_migration_prevents_re_migration(self): # With new implementation, no programs should have _migrant_ suffixes new_programs = set(multi_db.programs.keys()) new_migrant_ids = [pid for pid in new_programs if "_migrant_" in pid] - self.assertEqual(len(new_migrant_ids), 0, "New implementation should not create _migrant suffix programs") - + self.assertEqual( + len(new_migrant_ids), 0, "New implementation should not create _migrant suffix programs" + ) + # Verify that programs are still distributed across islands (migration occurred) total_programs_in_maps = sum(len(island_map) for island_map in multi_db.island_feature_maps) - self.assertGreaterEqual(total_programs_in_maps, 3, "Programs should be distributed in island feature maps") + self.assertGreaterEqual( + total_programs_in_maps, 3, "Programs should be distributed in island feature maps" + ) def test_empty_island_initialization_creates_copies(self): """Test that empty islands are initialized with copies, not shared references""" diff --git a/tests/test_tuple_score.py b/tests/test_tuple_score.py new file mode 100644 index 0000000000..16771e1a62 --- /dev/null +++ b/tests/test_tuple_score.py @@ -0,0 +1,38 @@ +""" +Test for tuple and list combined_score handling +""" + +import unittest + +from openevolve.utils.metrics_utils import get_fitness_score + + +class TestCombinedScoreTypes(unittest.TestCase): + def test_combined_score_float(self): + metrics = {"combined_score": 1.23} + result = get_fitness_score(metrics) + self.assertIsInstance(result, float) + self.assertAlmostEqual(result, 1.23) + + def test_combined_score_list(self): + metrics = {"combined_score": [1.0, 2.0, 3.0]} + result = get_fitness_score(metrics) + self.assertIsInstance(result, list) + self.assertEqual(result, [1.0, 2.0, 3.0]) + + def test_combined_score_tuple(self): + metrics = {"combined_score": (4.0, 5.0, 6.0)} + result = get_fitness_score(metrics) + self.assertIsInstance(result, tuple) + self.assertEqual(result, (4.0, 5.0, 6.0)) + + def test_combined_score_invalid(self): + metrics = {"combined_score": "not_a_number"} + result = get_fitness_score(metrics) + # fallback to 0.0 for invalid types when no other valid metrics are present + self.assertIsInstance(result, float) + self.assertEqual(result, 0.0) + + +if __name__ == "__main__": + unittest.main()