Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion operator/charts/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ config.yaml: |
{{- range .Values.config.scheduler.profiles }}
- name: {{ .name }}
{{- if hasKey . "config" }}
config: {{ toYaml .config | nindent 4 }}
config: {{ toYaml .config | nindent 8 }}
{{- end }}
{{- end }}
{{- end }}
Expand Down
12 changes: 12 additions & 0 deletions operator/charts/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ rules:
- patch
- update
- delete
- apiGroups:
- scheduling.k8s.io
resources:
- workloads
verbs:
- create
- get
- list
- watch
- patch
- update
- delete
{{- if .Values.config.network.autoMNNVLEnabled }}
# MNNVL (Multi-Node NVLink) support requires permissions for ComputeDomain and ResourceClaimTemplate resources.
# Note: Kubernetes allows RBAC rules for resources that don't exist yet. If the ComputeDomain CRD is not installed,
Expand Down
8 changes: 4 additions & 4 deletions operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ require (
go.uber.org/zap v1.27.0
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.19.2
k8s.io/api v0.34.3
k8s.io/api v0.35.0-beta.0
k8s.io/apiextensions-apiserver v0.34.3
k8s.io/apimachinery v0.34.3
k8s.io/apimachinery v0.35.0-beta.0
k8s.io/cli-runtime v0.34.3
k8s.io/client-go v0.34.3
k8s.io/client-go v0.35.0-beta.0
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20260108192941-914a6e750570
sigs.k8s.io/controller-runtime v0.22.4
Expand Down Expand Up @@ -150,7 +150,7 @@ require (
gotest.tools/v3 v3.5.2 // indirect
k8s.io/apiserver v0.34.3 // indirect
k8s.io/component-base v0.34.3 // indirect
k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/kubectl v0.34.1 // indirect
oras.land/oras-go/v2 v2.6.0 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
Expand Down
26 changes: 12 additions & 14 deletions operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.25.3 h1:Ty8+Yi/ayDAGtk4XxmmfUy4GabvM+MegeB4cDLRi6nw=
github.com/onsi/ginkgo/v2 v2.25.3/go.mod h1:43uiyQC4Ed2tkOzLsEYm7hnrb7UJTWHYNsuy3bG/snE=
github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns=
github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo=
github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A=
github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k=
github.com/open-policy-agent/cert-controller v0.14.0 h1:TPc19BOHOs4tARruTT5o4bzir7Ed6FF+j3EXP/nmZBs=
Expand Down Expand Up @@ -271,8 +271,8 @@ github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb
github.com/redis/go-redis/extra/redisotel/v9 v9.0.5/go.mod h1:WZjPDy7VNzn77AAfnAfVjZNvfJTYfPetfZk5yoSTLaQ=
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rubenv/sql-migrate v1.8.0 h1:dXnYiJk9k3wetp7GfQbKJcPHjVJL6YK19tKj8t2Ns0o=
github.com/rubenv/sql-migrate v1.8.0/go.mod h1:F2bGFBwCU+pnmbtNYDeKvSuvL6lBVtXDXUUv5t+u1qw=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down Expand Up @@ -357,8 +357,6 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down Expand Up @@ -445,26 +443,26 @@ gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA=
helm.sh/helm/v3 v3.19.2 h1:psQjaM8aIWrSVEly6PgYtLu/y6MRSmok4ERiGhZmtUY=
helm.sh/helm/v3 v3.19.2/go.mod h1:gX10tB5ErM+8fr7bglUUS/UfTOO8UUTYWIBH1IYNnpE=
k8s.io/api v0.34.3 h1:D12sTP257/jSH2vHV2EDYrb16bS7ULlHpdNdNhEw2S4=
k8s.io/api v0.34.3/go.mod h1:PyVQBF886Q5RSQZOim7DybQjAbVs8g7gwJNhGtY5MBk=
k8s.io/api v0.35.0-beta.0 h1:eqAAVeSatXNnsPjaeFrFGqSl5ihtPY4e8Txy2nYPOnw=
k8s.io/api v0.35.0-beta.0/go.mod h1:UXuvkssy8lHPSP381eqqBOW4BvRTicVpRjv7k2sjo4Y=
k8s.io/apiextensions-apiserver v0.34.3 h1:p10fGlkDY09eWKOTeUSioxwLukJnm+KuDZdrW71y40g=
k8s.io/apiextensions-apiserver v0.34.3/go.mod h1:aujxvqGFRdb/cmXYfcRTeppN7S2XV/t7WMEc64zB5A0=
k8s.io/apimachinery v0.34.3 h1:/TB+SFEiQvN9HPldtlWOTp0hWbJ+fjU+wkxysf/aQnE=
k8s.io/apimachinery v0.34.3/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw=
k8s.io/apimachinery v0.35.0-beta.0 h1:vVoDiASLwUEv5yZceZCBRPXBc1f9wUOZs7ZbEbGr5sY=
k8s.io/apimachinery v0.35.0-beta.0/go.mod h1:dR9KPaf5L0t2p9jZg/wCGB4b3ma2sXZ2zdNqILs+Sak=
k8s.io/apiserver v0.34.3 h1:uGH1qpDvSiYG4HVFqc6A3L4CKiX+aBWDrrsxHYK0Bdo=
k8s.io/apiserver v0.34.3/go.mod h1:QPnnahMO5C2m3lm6fPW3+JmyQbvHZQ8uudAu/493P2w=
k8s.io/cli-runtime v0.34.3 h1:YRyMhiwX0dT9lmG0AtZDaeG33Nkxgt9OlCTZhRXj9SI=
k8s.io/cli-runtime v0.34.3/go.mod h1:GVwL1L5uaGEgM7eGeKjaTG2j3u134JgG4dAI6jQKhMc=
k8s.io/client-go v0.34.3 h1:wtYtpzy/OPNYf7WyNBTj3iUA0XaBHVqhv4Iv3tbrF5A=
k8s.io/client-go v0.34.3/go.mod h1:OxxeYagaP9Kdf78UrKLa3YZixMCfP6bgPwPwNBQBzpM=
k8s.io/client-go v0.35.0-beta.0 h1:4APvMU7+XwWF+XoqAv+gbtSmwjPCXXXo4XVcY89Rde0=
k8s.io/client-go v0.35.0-beta.0/go.mod h1:+XxnPEoaCIB5G0zpwXRh3AnT+CvgS5lA+AFr9EtHUcA=
k8s.io/component-base v0.34.3 h1:zsEgw6ELqK0XncCQomgO9DpUIzlrYuZYA0Cgo+JWpVk=
k8s.io/component-base v0.34.3/go.mod h1:5iIlD8wPfWE/xSHTRfbjuvUul2WZbI2nOUK65XL0E/c=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-aggregator v0.34.1 h1:WNLV0dVNoFKmuyvdWLd92iDSyD/TSTjqwaPj0U9XAEU=
k8s.io/kube-aggregator v0.34.1/go.mod h1:RU8j+5ERfp0h+gIvWtxRPfsa5nK7rboDm8RST8BJfYQ=
k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 h1:liMHz39T5dJO1aOKHLvwaCjDbf07wVh6yaUlTpunnkE=
k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts=
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE=
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ=
k8s.io/kubectl v0.34.1 h1:1qP1oqT5Xc93K+H8J7ecpBjaz511gan89KO9Vbsh/OI=
k8s.io/kubectl v0.34.1/go.mod h1:JRYlhJpGPyk3dEmJ+BuBiOB9/dAvnrALJEiY/C5qa6A=
k8s.io/utils v0.0.0-20260108192941-914a6e750570 h1:JT4W8lsdrGENg9W+YwwdLJxklIuKWdRm+BC+xt33FOY=
Expand Down
121 changes: 110 additions & 11 deletions operator/internal/scheduler/kube/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,55 @@ package kube

import (
"context"
"encoding/json"
"fmt"
"reflect"

apicommon "github.com/ai-dynamo/grove/operator/api/common"
configv1alpha1 "github.com/ai-dynamo/grove/operator/api/config/v1alpha1"
grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1"
"github.com/ai-dynamo/grove/operator/internal/scheduler"

groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// schedulerBackend implements the scheduler backend interface (Backend in scheduler package) for Kubernetes default scheduler.
// This backend does minimal work - just sets the scheduler name on pods
type schedulerBackend struct {
client client.Client
scheme *runtime.Scheme
name string
eventRecorder record.EventRecorder
profile configv1alpha1.SchedulerProfile
config configv1alpha1.KubeSchedulerConfig
}

var _ scheduler.Backend = (*schedulerBackend)(nil)

// New creates a new Kube backend instance. profile is the scheduler profile for default-scheduler;
// schedulerBackend uses profile.Name and may unmarshal profile.Config into KubeSchedulerConfig.
func New(cl client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder, profile configv1alpha1.SchedulerProfile) scheduler.Backend {
var cfg configv1alpha1.KubeSchedulerConfig
if profile.Config != nil {
// Best-effort unmarshal; invalid config fields are silently ignored.
_ = json.Unmarshal(profile.Config.Raw, &cfg)
}
return &schedulerBackend{
client: cl,
scheme: scheme,
name: string(configv1alpha1.SchedulerNameKube),
eventRecorder: eventRecorder,
profile: profile,
config: cfg,
}
}

Expand All @@ -59,32 +75,115 @@ func (b *schedulerBackend) Name() string {
return b.name
}

// Init initializes the Kube backend
// For Kube backend, no special initialization is needed
// Init initializes the Kube backend.
// For Kube backend, no special initialization is needed.
func (b *schedulerBackend) Init() error {
return nil
}

// SyncPodGang synchronizes PodGang resources
// For default kube scheduler, no additional resources are needed
func (b *schedulerBackend) SyncPodGang(_ context.Context, _ *groveschedulerv1alpha1.PodGang) error {
// No-op: default kube scheduler doesn't need any custom resources
// SyncPodGang creates or reconciles a scheduling.k8s.io/v1alpha1 Workload resource for the
// given PodGang when GangScheduling is enabled. Each PodGroup in the PodGang is mapped to a
// PodGroup in the Workload with GangSchedulingPolicy.MinCount set to PodGroup.MinReplicas.
// The Workload is owned by the PodGang and is garbage-collected on PodGang deletion.
//
// When GangScheduling is disabled this is a no-op.
func (b *schedulerBackend) SyncPodGang(ctx context.Context, podGang *groveschedulerv1alpha1.PodGang) error {
if !b.config.GangScheduling {
return nil
}
logger := log.FromContext(ctx)

desired, err := b.buildWorkload(podGang)
if err != nil {
return fmt.Errorf("failed to build Workload for PodGang %s/%s: %w", podGang.Namespace, podGang.Name, err)
}

existing := &schedulingv1alpha1.Workload{}
err = b.client.Get(ctx, client.ObjectKeyFromObject(desired), existing)
if err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get Workload %s/%s: %w", desired.Namespace, desired.Name, err)
}
// Workload does not exist yet — create it.
if err = b.client.Create(ctx, desired); err != nil {
return fmt.Errorf("failed to create Workload %s/%s: %w", desired.Namespace, desired.Name, err)
}
logger.Info("Created Workload for PodGang", "workload", client.ObjectKeyFromObject(desired))
return nil
}

// Workload exists. Since Workload.Spec.PodGroups is immutable, we must delete and recreate
// if the desired PodGroups differ from the existing ones (e.g. MinReplicas changed).
if !reflect.DeepEqual(existing.Spec.PodGroups, desired.Spec.PodGroups) {
if err = b.client.Delete(ctx, existing); err != nil {
return fmt.Errorf("failed to delete stale Workload %s/%s for recreation: %w", existing.Namespace, existing.Name, err)
}
if err = b.client.Create(ctx, desired); err != nil {
return fmt.Errorf("failed to recreate Workload %s/%s: %w", desired.Namespace, desired.Name, err)
}
logger.Info("Recreated Workload for PodGang (PodGroups changed)", "workload", client.ObjectKeyFromObject(desired))
}
return nil
}

// OnPodGangDelete handles PodGang deletion
// For default kube scheduler, no cleanup is needed
// OnPodGangDelete is a no-op: the owner reference set on the Workload by SyncPodGang ensures
// the Workload is garbage-collected automatically when the PodGang is deleted.
func (b *schedulerBackend) OnPodGangDelete(_ context.Context, _ *groveschedulerv1alpha1.PodGang) error {
// No-op: default kube scheduler doesn't have any resources to clean up
return nil
}

// PreparePod prepares the Pod by setting the relevant schedulerName field with the chosen scheduler backend.
// PreparePod sets the scheduler name and, when GangScheduling is enabled, sets the WorkloadRef
// on the Pod so the kube-scheduler can apply workload-aware gang scheduling semantics.
// The WorkloadRef.Name is the PodGang name (from pod label grove.io/podgang) and
// WorkloadRef.PodGroup is the PodClique name (from pod label grove.io/podclique), which
// matches the PodGroup name inside the Workload created by SyncPodGang.
func (b *schedulerBackend) PreparePod(pod *corev1.Pod) {
pod.Spec.SchedulerName = b.name
if !b.config.GangScheduling {
return
}
podGangName := pod.Labels[apicommon.LabelPodGang]
podCliqueName := pod.Labels[apicommon.LabelPodClique]
if podGangName == "" || podCliqueName == "" {
return
}
pod.Spec.WorkloadRef = &corev1.WorkloadReference{
Name: podGangName,
PodGroup: podCliqueName,
}
}

// ValidatePodCliqueSet runs default-scheduler-specific validations on the PodCliqueSet.
func (b *schedulerBackend) ValidatePodCliqueSet(_ context.Context, _ *grovecorev1alpha1.PodCliqueSet) error {
return nil
}

// buildWorkload constructs the desired scheduling.k8s.io/v1alpha1 Workload for the given PodGang.
// The Workload name matches the PodGang name and lives in the same namespace.
func (b *schedulerBackend) buildWorkload(podGang *groveschedulerv1alpha1.PodGang) (*schedulingv1alpha1.Workload, error) {
podGroups := lo.Map(podGang.Spec.PodGroups, func(pg groveschedulerv1alpha1.PodGroup, _ int) schedulingv1alpha1.PodGroup {
return schedulingv1alpha1.PodGroup{
Name: pg.Name,
Policy: schedulingv1alpha1.PodGroupPolicy{
Gang: &schedulingv1alpha1.GangSchedulingPolicy{
MinCount: pg.MinReplicas,
},
},
}
})

workload := &schedulingv1alpha1.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: podGang.Name,
Namespace: podGang.Namespace,
},
Spec: schedulingv1alpha1.WorkloadSpec{
PodGroups: podGroups,
},
}

if err := controllerutil.SetOwnerReference(podGang, workload, b.scheme); err != nil {
return nil, fmt.Errorf("failed to set owner reference on Workload: %w", err)
}
return workload, nil
}
Loading
Loading