Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
repos:
- repo: https://github.com/pycqa/isort
rev: "7.0.0"
rev: "8.0.1"
hooks:
- id: isort
args: ["--profile", "black"]

- repo: https://github.com/psf/black
rev: "25.12.0"
rev: "26.3.1"
hooks:
- id: black
164 changes: 164 additions & 0 deletions examples/moe_lb/archives/loongflow_best_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
best solution for general solver for moe load balance
"""

from typing import Tuple

import numpy as np


def solve_lplb_policy(
initial_workloads: np.ndarray, physical_expert_placement: np.ndarray
) -> Tuple[np.ndarray, float]:
"""
Optimized load balancing solver using enhanced dynamic threshold with prioritized processing.

Key Improvements:
1. Weighted initialization (70/30 split) for better starting point
2. Adaptive threshold with progressive tightening
3. Proportional transfer limits for stability
4. Optimized processing order
5. Early termination conditions

Args:
initial_workloads: Shape (N,), initial token counts for N logical experts
physical_expert_placement: Shape (ep_size, n_slots_per_rank), defines processing unit topology

Returns:
allocation_matrix: Shape (N, M), token allocation from experts to processing units
minimized_max_load: Float, the minimized maximum load across all processing units
"""
# Problem dimensions and flattened placement
n_logical_experts = initial_workloads.shape[0]
ep_size, n_slots_per_rank = physical_expert_placement.shape
n_total_slots = ep_size * n_slots_per_rank
flat_placement = physical_expert_placement.flatten()

# Helper functions
def calculate_gpu_loads(alloc_matrix):
slot_loads = np.sum(alloc_matrix, axis=0)
return slot_loads.reshape((ep_size, n_slots_per_rank)).sum(axis=1)

def get_gpu_slots(gpu_idx):
return range(gpu_idx * n_slots_per_rank, (gpu_idx + 1) * n_slots_per_rank)

def calculate_safe_transfer(alloc_matrix, src_slot, dst_slot):
src_load = np.sum(alloc_matrix[:, src_slot])
dst_load = np.sum(alloc_matrix[:, dst_slot])
mean_load = np.mean(calculate_gpu_loads(alloc_matrix))

src_gpu = src_slot // n_slots_per_rank
dst_gpu = dst_slot // n_slots_per_rank
gpu_loads = calculate_gpu_loads(alloc_matrix)

# Calculate maximum safe transfer amount
transfer = min(
src_load * 0.3, # Max 30% of source slot's load
(gpu_loads[src_gpu] - gpu_loads[dst_gpu]) * 0.4, # Proportional to imbalance
mean_load - dst_load, # Don't overfill target
)
return max(transfer, 0)

def execute_transfer(alloc_matrix, expert, src_slot, dst_slot, amount):
alloc_matrix[expert, src_slot] -= amount
alloc_matrix[expert, dst_slot] += amount

# Enhanced Initialization Phase (70/30 weighted distribution)
allocation_matrix = np.zeros((n_logical_experts, n_total_slots))

for expert_id in range(n_logical_experts):
replica_indices = np.where(flat_placement == expert_id)[0]
if len(replica_indices) > 0:
# Get current loads of replicas
replica_loads = np.sum(allocation_matrix[:, replica_indices], axis=0)

# 70% to least loaded replica, 30% distributed among others
primary_replica = replica_indices[np.argmin(replica_loads)]
allocation_matrix[expert_id, primary_replica] = initial_workloads[expert_id] * 0.7
remaining = initial_workloads[expert_id] * 0.3

if len(replica_indices) > 1:
allocation_matrix[expert_id, replica_indices] += remaining / len(replica_indices)
else:
allocation_matrix[expert_id, primary_replica] += remaining

# Track best solution
best_allocation = allocation_matrix.copy()
best_Z = np.max(calculate_gpu_loads(best_allocation))

# Adaptive Iterative Refinement
max_iterations = 100
for iteration in range(max_iterations):
# Calculate current loads
slot_loads = np.sum(allocation_matrix, axis=0)
gpu_loads = calculate_gpu_loads(allocation_matrix)
current_Z = np.max(gpu_loads)

# Update best solution
if current_Z < best_Z:
best_Z = current_Z
best_allocation = allocation_matrix.copy()

# Early termination if balanced
if np.max(gpu_loads) - np.min(gpu_loads) < 1e-6:
break

# Adaptive threshold calculation
mean_load = np.mean(gpu_loads)
std_load = np.std(gpu_loads)
threshold = mean_load + (0.8 - 0.7 * (iteration / max_iterations)) * std_load

# Process overloaded GPUs in descending order
overloaded_gpus = np.where(gpu_loads > threshold)[0]
for gpu in sorted(overloaded_gpus, key=lambda x: -gpu_loads[x]):
# Process slots in this GPU by descending load
slots = get_gpu_slots(gpu)
for slot in sorted(slots, key=lambda x: -slot_loads[x]):
# Get contributing experts sorted by contribution
contributing_experts = np.where(allocation_matrix[:, slot] > 1e-6)[0]
for expert in sorted(
contributing_experts, key=lambda x: -allocation_matrix[x, slot]
):
# Get all valid target replicas sorted by load
replica_indices = np.where(flat_placement == expert)[0]
target_replicas = sorted(
replica_indices, key=lambda x: np.sum(allocation_matrix[:, x])
)

for target in target_replicas:
if target == slot:
continue

transfer = calculate_safe_transfer(allocation_matrix, slot, target)
if transfer > 1e-6:
execute_transfer(allocation_matrix, expert, slot, target, transfer)

# Update slot and GPU loads
slot_loads[slot] -= transfer
slot_loads[target] += transfer
src_gpu = slot // n_slots_per_rank
dst_gpu = target // n_slots_per_rank
gpu_loads[src_gpu] -= transfer
gpu_loads[dst_gpu] += transfer

# Early exit if we've balanced this slot
if allocation_matrix[expert, slot] < 1e-6:
break

# Final validation and cleanup
allocation_matrix = best_allocation

# Fix floating point discrepancies
for expert_id in range(n_logical_experts):
total = np.sum(allocation_matrix[expert_id])
if not np.isclose(total, initial_workloads[expert_id]):
diff = initial_workloads[expert_id] - total
replica_indices = np.where(flat_placement == expert_id)[0]
if len(replica_indices) > 0:
allocation_matrix[expert_id, replica_indices[0]] += diff

# Calculate final maximum load
final_gpu_loads = calculate_gpu_loads(allocation_matrix)
minimized_max_load = np.max(final_gpu_loads) if final_gpu_loads.size > 0 else 0.0

return allocation_matrix, float(minimized_max_load)
99 changes: 99 additions & 0 deletions examples/moe_lb/archives/loongflow_best_lp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
best solution for liner programming solver for moe load balance
"""

from typing import Tuple

import numpy as np
from scipy.optimize import linprog


def solve_lplb_policy(
initial_workloads: np.ndarray, physical_expert_placement: np.ndarray
) -> Tuple[np.ndarray, float]:
"""
Solves the load balancing problem using linear programming to minimize the maximum GPU load.

Args:
initial_workloads: Array of shape (N,) with workload for each logical expert.
physical_expert_placement: Array of shape (K, S) mapping experts to processing units.

Returns:
Tuple containing:
- allocation_matrix: Array of shape (N, M) with optimal token allocations
- minimized_max_load: The minimized maximum GPU load
"""
N = initial_workloads.shape[0] # Number of logical experts
K, S = physical_expert_placement.shape # K GPUs, S slots per GPU
M = K * S # Total processing units

# Flatten the placement matrix to get processing unit assignments
flat_placement = physical_expert_placement.flatten()

# Precompute valid (i,j) pairs where expert i can be assigned to unit j
valid_pairs = []
var_indices = {} # Maps (i,j) to variable index
var_count = 0

for i in range(N):
for j in range(M):
if flat_placement[j] == i:
valid_pairs.append((i, j))
var_indices[(i, j)] = var_count
var_count += 1

# Number of variables is the number of valid (i,j) pairs
num_vars = len(valid_pairs)

# Construct the LP problem:
# Objective: minimize Z (the maximum load)
# We'll represent Z as the last variable (index num_vars)
c = np.zeros(num_vars + 1)
c[-1] = 1 # Minimize Z

# Constraints:
# 1. GPU load constraints (Z >= sum of loads for each GPU)
# 2. Flow conservation (sum of allocations for each expert = initial workload)
# 3. Non-negativity is handled by bounds

# GPU load constraints: one per GPU
A_ub = np.zeros((K, num_vars + 1))
b_ub = np.zeros(K)

for k in range(K):
# Get all slots in this GPU
slots_in_gpu = range(k * S, (k + 1) * S)
# Find all variables that contribute to this GPU's load
for (i, j), var_idx in var_indices.items():
if j in slots_in_gpu:
A_ub[k, var_idx] = 1
A_ub[k, -1] = -1 # -Z term

# Flow conservation constraints: one per expert
A_eq = np.zeros((N, num_vars + 1))
b_eq = initial_workloads.copy()

for i in range(N):
for (expert_i, j), var_idx in var_indices.items():
if expert_i == i:
A_eq[i, var_idx] = 1

# Bounds: all x_ij >= 0, Z is unbounded
bounds = [(0, None) for _ in range(num_vars)] + [(None, None)]

# Solve the LP problem
result = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method="highs")

if not result.success:
raise RuntimeError(f"LP solver failed: {result.message}")

# Extract solution
solution = result.x
minimized_max_load = solution[-1]

# Build the allocation matrix
allocation_matrix = np.zeros((N, M))
for (i, j), var_idx in var_indices.items():
allocation_matrix[i, j] = solution[var_idx]

return allocation_matrix, float(minimized_max_load)
75 changes: 75 additions & 0 deletions examples/moe_lb/archives/prompt_common.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
Act as an expert in operations research and algorithmic optimization, with a specialization in load balancing for distributed computing systems.

#### **Objective**

Your primary goal is to develop a Python function that solves a dynamic load balancing problem for Mixture-of-Experts (MoE) models. The task is to optimally redistribute computational workloads (tokens) from a set of `N` "logical experts" to `M` available "processing units" (slots), which are grouped across `K` physical GPUs.

The core objective is to **minimize the maximum total load** on any single Graphics Processing Unit (GPU), a classic min-max optimization problem.

#### **Formal Problem Definition**

Your solution must find an allocation that adheres to the following mathematical model:

* **Inputs**:
* An initial workload vector $L = [l_1, l_2, \dots, l_N]^T$, where $l_i$ is the number of tokens assigned to logical expert $e_i$.
* A hardware topology defined by the `physical_expert_placement` matrix of shape `(K, S)`, where `K` is the number of GPUs and `S` is the number of processing units (slots) per GPU. The total number of processing units is $M = K \times S$.

* **Decision Variables**:
* $x_{ij}$: The number of tokens transferred from logical expert $e_i$ to processing unit $p_j$ (where $j$ is an index from $0$ to $M-1$).

* **Objective**:
* Find an allocation $x_{ij}$ that minimizes the value of $Z$, where $Z$ is an auxiliary variable representing the maximum load across all GPUs.
* **Minimize:** $Z$

* **Constraints**:
1. **GPU Load Definition Constraint**: The final load on any GPU $G_k$, which is the sum of loads on all its constituent processing units, must not exceed $Z$. Let $\mathcal{S}(k)$ be the set of indices of processing units belonging to GPU $k$.
* $\sum_{j \in \mathcal{S}(k)} \sum_{i=1}^{N} x_{ij} - Z \le 0, \quad \forall k \in 1, \dots, K$
2. **Flow Conservation Constraint**: The total number of tokens transferred out of a logical expert $e_i$ must equal its initial workload $l_i$.
* $\sum_{j=1}^{M} x_{ij} = l_i, \quad \forall i \in 1, \dots, N$
3. **Non-negativity Constraint**: The number of transferred tokens cannot be negative.
* $x_{ij} \ge 0, \quad \forall i, j$
4. **Placement Constraint**: The workload from a logical expert $e_i$ can only be allocated to processing units that are its designated replicas. Let $\mathcal{P}(i)$ be the set of indices of processing units that are replicas of logical expert $e_i$. The allocation must satisfy:
* $x_{ij} = 0, \quad \forall i, \forall j \notin \mathcal{P}(i)$

#### **Code Structure & API Contract**

Your code will be evaluated through a fixed entry point. You **must** implement a function with the following exact signature in your Python script:

```python
import numpy as np
from typing import Tuple

def solve_lplb_policy(
initial_workloads: np.ndarray,
physical_expert_placement: np.ndarray
) -> Tuple[np.ndarray, float]:
```

This function must return:

1. `allocation_matrix`: A `numpy.ndarray` of shape `(N, M)`, representing the optimal allocation $x_{ij}$. `N` is the number of logical experts, and `M` is the total number of processing units.
2. `minimized_max_load`: A `float` representing the minimized maximum load $Z$ from your optimal solution.

The `physical_expert_placement` matrix is crucial oth the valid targets for allocation and the grouping of processing units into GPUs.
The evaluation script will handle the scoring based on this function's output. **Do not change the function signature.**

#### **Constraints**

The returned solution **MUST** adhere to these rules:

1. **Flow Conservation**: The sum of allocated tokens for each logical expert must exactly match its initial workload. (`np.sum(allocation_matrix, axis=1)` must equal `initial_workloads`).
2. **Non-negativity**: All elements in the `allocation_matrix` must be greater than or equal to zero.
3. **Performance**: The entire process, for a moderately sized problem, must complete within a reasonable time limit (e.g., 60 seconds). The evaluation will be run in a sandboxed environment with a timeout.
4. **Placement Correctness**: The workload from a logical expert `e_i` can **only** be allocated to processing units that are replicas of that same expert `e_i`. You cannot send tokens from expert `e_1` to a replica of expert `e_2`. Your solution must respect the mapping provided by `physical_expert_placement`.

#### **Strategic Guidance for Optimization**

The goal is to find an optimal (or near-optimal) allocation. You are encouraged to explore a wide range of algorithmic strategies to find a correct and efficient solution. Consider the following diverse approaches:

* **Greedy & Heuristic Algorithms**: Can you design a fast, simple algorithm that produces good results? For example, you could iteratively assign tokens from each expert to its currently least-loaded replica.
* **Iterative Refinement**: Start with a simple valid allocation (like the baseline "even split" strategy) and iteratively improve it. For instance, you could devise a method to move load from the most-loaded processing units to less-loaded ones while respecting all constraints.
* **Network Flow Models**: This problem can be modeled as a variant of a network flow problem. Exploring algorithms from this domain, such as min-cost max-flow, might yield powerful solutions.
* **Established Optimization Solvers**: While not required, using a generic solver is a valid strategy. The problem can be formally modeled for libraries handling Linear Programming or other forms of convex optimization. This can guarantee optimality but may have performance trade-offs.
* **Metaheuristics**: For very large-scale problems where finding the exact optimum is too slow, metaheuristic approaches like Simulated Annealing or Particle Swarm Optimization could find high-quality approximate solutions quickly.

The most successful solution will be one that is both correct (obeys all constraints) and computationally efficient, finding the best possible balance of load in the shortest time. Good luck!
Loading
Loading