diff --git a/operator/charts/templates/_helpers.tpl b/operator/charts/templates/_helpers.tpl index dc5f3a64b..4ae1fbd53 100644 --- a/operator/charts/templates/_helpers.tpl +++ b/operator/charts/templates/_helpers.tpl @@ -39,7 +39,7 @@ config.yaml: | {{- range .Values.config.scheduler.profiles }} - name: {{ .name }} {{- if hasKey . "config" }} - config: {{ toYaml .config | nindent 4 }} + config: {{ toYaml .config | nindent 8 }} {{- end }} {{- end }} {{- end }} diff --git a/operator/charts/templates/clusterrole.yaml b/operator/charts/templates/clusterrole.yaml index 8ed60a40d..d34672e13 100644 --- a/operator/charts/templates/clusterrole.yaml +++ b/operator/charts/templates/clusterrole.yaml @@ -142,6 +142,18 @@ rules: - patch - update - delete +- apiGroups: + - scheduling.k8s.io + resources: + - workloads + verbs: + - create + - get + - list + - watch + - patch + - update + - delete {{- if .Values.config.network.autoMNNVLEnabled }} # MNNVL (Multi-Node NVLink) support requires permissions for ComputeDomain and ResourceClaimTemplate resources. # Note: Kubernetes allows RBAC rules for resources that don't exist yet. If the ComputeDomain CRD is not installed, diff --git a/operator/go.mod b/operator/go.mod index 6358dd004..f7000486d 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -17,11 +17,11 @@ require ( go.uber.org/zap v1.27.0 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.19.2 - k8s.io/api v0.34.3 + k8s.io/api v0.35.0-beta.0 k8s.io/apiextensions-apiserver v0.34.3 - k8s.io/apimachinery v0.34.3 + k8s.io/apimachinery v0.35.0-beta.0 k8s.io/cli-runtime v0.34.3 - k8s.io/client-go v0.34.3 + k8s.io/client-go v0.35.0-beta.0 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20260108192941-914a6e750570 sigs.k8s.io/controller-runtime v0.22.4 @@ -150,7 +150,7 @@ require ( gotest.tools/v3 v3.5.2 // indirect k8s.io/apiserver v0.34.3 // indirect k8s.io/component-base v0.34.3 // indirect - k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 // indirect + k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect k8s.io/kubectl v0.34.1 // indirect oras.land/oras-go/v2 v2.6.0 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect diff --git a/operator/go.sum b/operator/go.sum index a2a83fcb7..dfc2c380c 100644 --- a/operator/go.sum +++ b/operator/go.sum @@ -234,8 +234,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/onsi/ginkgo/v2 v2.25.3 h1:Ty8+Yi/ayDAGtk4XxmmfUy4GabvM+MegeB4cDLRi6nw= -github.com/onsi/ginkgo/v2 v2.25.3/go.mod h1:43uiyQC4Ed2tkOzLsEYm7hnrb7UJTWHYNsuy3bG/snE= +github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns= +github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= github.com/open-policy-agent/cert-controller v0.14.0 h1:TPc19BOHOs4tARruTT5o4bzir7Ed6FF+j3EXP/nmZBs= @@ -271,8 +271,8 @@ github.com/redis/go-redis/extra/redisotel/v9 v9.0.5 h1:EfpWLLCyXw8PSM2/XNJLjI3Pb github.com/redis/go-redis/extra/redisotel/v9 v9.0.5/go.mod h1:WZjPDy7VNzn77AAfnAfVjZNvfJTYfPetfZk5yoSTLaQ= github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rubenv/sql-migrate v1.8.0 h1:dXnYiJk9k3wetp7GfQbKJcPHjVJL6YK19tKj8t2Ns0o= github.com/rubenv/sql-migrate v1.8.0/go.mod h1:F2bGFBwCU+pnmbtNYDeKvSuvL6lBVtXDXUUv5t+u1qw= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -357,8 +357,6 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= -go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -445,26 +443,26 @@ gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= helm.sh/helm/v3 v3.19.2 h1:psQjaM8aIWrSVEly6PgYtLu/y6MRSmok4ERiGhZmtUY= helm.sh/helm/v3 v3.19.2/go.mod h1:gX10tB5ErM+8fr7bglUUS/UfTOO8UUTYWIBH1IYNnpE= -k8s.io/api v0.34.3 h1:D12sTP257/jSH2vHV2EDYrb16bS7ULlHpdNdNhEw2S4= -k8s.io/api v0.34.3/go.mod h1:PyVQBF886Q5RSQZOim7DybQjAbVs8g7gwJNhGtY5MBk= +k8s.io/api v0.35.0-beta.0 h1:eqAAVeSatXNnsPjaeFrFGqSl5ihtPY4e8Txy2nYPOnw= +k8s.io/api v0.35.0-beta.0/go.mod h1:UXuvkssy8lHPSP381eqqBOW4BvRTicVpRjv7k2sjo4Y= k8s.io/apiextensions-apiserver v0.34.3 h1:p10fGlkDY09eWKOTeUSioxwLukJnm+KuDZdrW71y40g= k8s.io/apiextensions-apiserver v0.34.3/go.mod h1:aujxvqGFRdb/cmXYfcRTeppN7S2XV/t7WMEc64zB5A0= -k8s.io/apimachinery v0.34.3 h1:/TB+SFEiQvN9HPldtlWOTp0hWbJ+fjU+wkxysf/aQnE= -k8s.io/apimachinery v0.34.3/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/apimachinery v0.35.0-beta.0 h1:vVoDiASLwUEv5yZceZCBRPXBc1f9wUOZs7ZbEbGr5sY= +k8s.io/apimachinery v0.35.0-beta.0/go.mod h1:dR9KPaf5L0t2p9jZg/wCGB4b3ma2sXZ2zdNqILs+Sak= k8s.io/apiserver v0.34.3 h1:uGH1qpDvSiYG4HVFqc6A3L4CKiX+aBWDrrsxHYK0Bdo= k8s.io/apiserver v0.34.3/go.mod h1:QPnnahMO5C2m3lm6fPW3+JmyQbvHZQ8uudAu/493P2w= k8s.io/cli-runtime v0.34.3 h1:YRyMhiwX0dT9lmG0AtZDaeG33Nkxgt9OlCTZhRXj9SI= k8s.io/cli-runtime v0.34.3/go.mod h1:GVwL1L5uaGEgM7eGeKjaTG2j3u134JgG4dAI6jQKhMc= -k8s.io/client-go v0.34.3 h1:wtYtpzy/OPNYf7WyNBTj3iUA0XaBHVqhv4Iv3tbrF5A= -k8s.io/client-go v0.34.3/go.mod h1:OxxeYagaP9Kdf78UrKLa3YZixMCfP6bgPwPwNBQBzpM= +k8s.io/client-go v0.35.0-beta.0 h1:4APvMU7+XwWF+XoqAv+gbtSmwjPCXXXo4XVcY89Rde0= +k8s.io/client-go v0.35.0-beta.0/go.mod h1:+XxnPEoaCIB5G0zpwXRh3AnT+CvgS5lA+AFr9EtHUcA= k8s.io/component-base v0.34.3 h1:zsEgw6ELqK0XncCQomgO9DpUIzlrYuZYA0Cgo+JWpVk= k8s.io/component-base v0.34.3/go.mod h1:5iIlD8wPfWE/xSHTRfbjuvUul2WZbI2nOUK65XL0E/c= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-aggregator v0.34.1 h1:WNLV0dVNoFKmuyvdWLd92iDSyD/TSTjqwaPj0U9XAEU= k8s.io/kube-aggregator v0.34.1/go.mod h1:RU8j+5ERfp0h+gIvWtxRPfsa5nK7rboDm8RST8BJfYQ= -k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 h1:liMHz39T5dJO1aOKHLvwaCjDbf07wVh6yaUlTpunnkE= -k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= k8s.io/kubectl v0.34.1 h1:1qP1oqT5Xc93K+H8J7ecpBjaz511gan89KO9Vbsh/OI= k8s.io/kubectl v0.34.1/go.mod h1:JRYlhJpGPyk3dEmJ+BuBiOB9/dAvnrALJEiY/C5qa6A= k8s.io/utils v0.0.0-20260108192941-914a6e750570 h1:JT4W8lsdrGENg9W+YwwdLJxklIuKWdRm+BC+xt33FOY= diff --git a/operator/internal/scheduler/kube/backend.go b/operator/internal/scheduler/kube/backend.go index a36b218f7..0da89e538 100644 --- a/operator/internal/scheduler/kube/backend.go +++ b/operator/internal/scheduler/kube/backend.go @@ -18,26 +18,36 @@ package kube import ( "context" + "encoding/json" + "fmt" + "reflect" + apicommon "github.com/ai-dynamo/grove/operator/api/common" configv1alpha1 "github.com/ai-dynamo/grove/operator/api/config/v1alpha1" grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "github.com/ai-dynamo/grove/operator/internal/scheduler" groveschedulerv1alpha1 "github.com/ai-dynamo/grove/scheduler/api/core/v1alpha1" + "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" ) // schedulerBackend implements the scheduler backend interface (Backend in scheduler package) for Kubernetes default scheduler. -// This backend does minimal work - just sets the scheduler name on pods type schedulerBackend struct { client client.Client scheme *runtime.Scheme name string eventRecorder record.EventRecorder profile configv1alpha1.SchedulerProfile + config configv1alpha1.KubeSchedulerConfig } var _ scheduler.Backend = (*schedulerBackend)(nil) @@ -45,12 +55,18 @@ var _ scheduler.Backend = (*schedulerBackend)(nil) // New creates a new Kube backend instance. profile is the scheduler profile for default-scheduler; // schedulerBackend uses profile.Name and may unmarshal profile.Config into KubeSchedulerConfig. func New(cl client.Client, scheme *runtime.Scheme, eventRecorder record.EventRecorder, profile configv1alpha1.SchedulerProfile) scheduler.Backend { + var cfg configv1alpha1.KubeSchedulerConfig + if profile.Config != nil { + // Best-effort unmarshal; invalid config fields are silently ignored. + _ = json.Unmarshal(profile.Config.Raw, &cfg) + } return &schedulerBackend{ client: cl, scheme: scheme, name: string(configv1alpha1.SchedulerNameKube), eventRecorder: eventRecorder, profile: profile, + config: cfg, } } @@ -59,32 +75,115 @@ func (b *schedulerBackend) Name() string { return b.name } -// Init initializes the Kube backend -// For Kube backend, no special initialization is needed +// Init initializes the Kube backend. +// For Kube backend, no special initialization is needed. func (b *schedulerBackend) Init() error { return nil } -// SyncPodGang synchronizes PodGang resources -// For default kube scheduler, no additional resources are needed -func (b *schedulerBackend) SyncPodGang(_ context.Context, _ *groveschedulerv1alpha1.PodGang) error { - // No-op: default kube scheduler doesn't need any custom resources +// SyncPodGang creates or reconciles a scheduling.k8s.io/v1alpha1 Workload resource for the +// given PodGang when GangScheduling is enabled. Each PodGroup in the PodGang is mapped to a +// PodGroup in the Workload with GangSchedulingPolicy.MinCount set to PodGroup.MinReplicas. +// The Workload is owned by the PodGang and is garbage-collected on PodGang deletion. +// +// When GangScheduling is disabled this is a no-op. +func (b *schedulerBackend) SyncPodGang(ctx context.Context, podGang *groveschedulerv1alpha1.PodGang) error { + if !b.config.GangScheduling { + return nil + } + logger := log.FromContext(ctx) + + desired, err := b.buildWorkload(podGang) + if err != nil { + return fmt.Errorf("failed to build Workload for PodGang %s/%s: %w", podGang.Namespace, podGang.Name, err) + } + + existing := &schedulingv1alpha1.Workload{} + err = b.client.Get(ctx, client.ObjectKeyFromObject(desired), existing) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get Workload %s/%s: %w", desired.Namespace, desired.Name, err) + } + // Workload does not exist yet — create it. + if err = b.client.Create(ctx, desired); err != nil { + return fmt.Errorf("failed to create Workload %s/%s: %w", desired.Namespace, desired.Name, err) + } + logger.Info("Created Workload for PodGang", "workload", client.ObjectKeyFromObject(desired)) + return nil + } + + // Workload exists. Since Workload.Spec.PodGroups is immutable, we must delete and recreate + // if the desired PodGroups differ from the existing ones (e.g. MinReplicas changed). + if !reflect.DeepEqual(existing.Spec.PodGroups, desired.Spec.PodGroups) { + if err = b.client.Delete(ctx, existing); err != nil { + return fmt.Errorf("failed to delete stale Workload %s/%s for recreation: %w", existing.Namespace, existing.Name, err) + } + if err = b.client.Create(ctx, desired); err != nil { + return fmt.Errorf("failed to recreate Workload %s/%s: %w", desired.Namespace, desired.Name, err) + } + logger.Info("Recreated Workload for PodGang (PodGroups changed)", "workload", client.ObjectKeyFromObject(desired)) + } return nil } -// OnPodGangDelete handles PodGang deletion -// For default kube scheduler, no cleanup is needed +// OnPodGangDelete is a no-op: the owner reference set on the Workload by SyncPodGang ensures +// the Workload is garbage-collected automatically when the PodGang is deleted. func (b *schedulerBackend) OnPodGangDelete(_ context.Context, _ *groveschedulerv1alpha1.PodGang) error { - // No-op: default kube scheduler doesn't have any resources to clean up return nil } -// PreparePod prepares the Pod by setting the relevant schedulerName field with the chosen scheduler backend. +// PreparePod sets the scheduler name and, when GangScheduling is enabled, sets the WorkloadRef +// on the Pod so the kube-scheduler can apply workload-aware gang scheduling semantics. +// The WorkloadRef.Name is the PodGang name (from pod label grove.io/podgang) and +// WorkloadRef.PodGroup is the PodClique name (from pod label grove.io/podclique), which +// matches the PodGroup name inside the Workload created by SyncPodGang. func (b *schedulerBackend) PreparePod(pod *corev1.Pod) { pod.Spec.SchedulerName = b.name + if !b.config.GangScheduling { + return + } + podGangName := pod.Labels[apicommon.LabelPodGang] + podCliqueName := pod.Labels[apicommon.LabelPodClique] + if podGangName == "" || podCliqueName == "" { + return + } + pod.Spec.WorkloadRef = &corev1.WorkloadReference{ + Name: podGangName, + PodGroup: podCliqueName, + } } // ValidatePodCliqueSet runs default-scheduler-specific validations on the PodCliqueSet. func (b *schedulerBackend) ValidatePodCliqueSet(_ context.Context, _ *grovecorev1alpha1.PodCliqueSet) error { return nil } + +// buildWorkload constructs the desired scheduling.k8s.io/v1alpha1 Workload for the given PodGang. +// The Workload name matches the PodGang name and lives in the same namespace. +func (b *schedulerBackend) buildWorkload(podGang *groveschedulerv1alpha1.PodGang) (*schedulingv1alpha1.Workload, error) { + podGroups := lo.Map(podGang.Spec.PodGroups, func(pg groveschedulerv1alpha1.PodGroup, _ int) schedulingv1alpha1.PodGroup { + return schedulingv1alpha1.PodGroup{ + Name: pg.Name, + Policy: schedulingv1alpha1.PodGroupPolicy{ + Gang: &schedulingv1alpha1.GangSchedulingPolicy{ + MinCount: pg.MinReplicas, + }, + }, + } + }) + + workload := &schedulingv1alpha1.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: podGang.Name, + Namespace: podGang.Namespace, + }, + Spec: schedulingv1alpha1.WorkloadSpec{ + PodGroups: podGroups, + }, + } + + if err := controllerutil.SetOwnerReference(podGang, workload, b.scheme); err != nil { + return nil, fmt.Errorf("failed to set owner reference on Workload: %w", err) + } + return workload, nil +} diff --git a/operator/internal/scheduler/kube/backend_test.go b/operator/internal/scheduler/kube/backend_test.go index 4e1819538..1f9395395 100644 --- a/operator/internal/scheduler/kube/backend_test.go +++ b/operator/internal/scheduler/kube/backend_test.go @@ -17,24 +17,163 @@ package kube import ( + "context" + "encoding/json" "testing" + apicommon "github.com/ai-dynamo/grove/operator/api/common" configv1alpha1 "github.com/ai-dynamo/grove/operator/api/config/v1alpha1" testutils "github.com/ai-dynamo/grove/operator/test/utils" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + schedulingv1alpha1 "k8s.io/api/scheduling/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" ) -func TestBackend_PreparePod(t *testing.T) { +// newGangSchedulingProfile returns a SchedulerProfile with GangScheduling enabled. +func newGangSchedulingProfile(t *testing.T) configv1alpha1.SchedulerProfile { + t.Helper() + cfgBytes, err := json.Marshal(configv1alpha1.KubeSchedulerConfig{GangScheduling: true}) + require.NoError(t, err) + return configv1alpha1.SchedulerProfile{ + Name: configv1alpha1.SchedulerNameKube, + Config: &runtime.RawExtension{Raw: cfgBytes}, + } +} + +func TestBackend_PreparePod_GangSchedulingDisabled(t *testing.T) { cl := testutils.CreateDefaultFakeClient(nil) recorder := record.NewFakeRecorder(10) profile := configv1alpha1.SchedulerProfile{Name: configv1alpha1.SchedulerNameKube} b := New(cl, cl.Scheme(), recorder, profile) pod := testutils.NewPodBuilder("test-pod", "default").Build() + b.PreparePod(pod) + + assert.Equal(t, string(configv1alpha1.SchedulerNameKube), pod.Spec.SchedulerName) + assert.Nil(t, pod.Spec.WorkloadRef, "WorkloadRef should not be set when GangScheduling is disabled") +} + +func TestBackend_PreparePod_GangSchedulingEnabled(t *testing.T) { + cl := testutils.CreateDefaultFakeClient(nil) + recorder := record.NewFakeRecorder(10) + b := New(cl, cl.Scheme(), recorder, newGangSchedulingProfile(t)) + + pod := testutils.NewPodBuilder("test-pod", "default"). + WithLabels(map[string]string{ + apicommon.LabelPodGang: "my-podgang", + apicommon.LabelPodClique: "my-podclique", + }). + Build() + b.PreparePod(pod) + + assert.Equal(t, string(configv1alpha1.SchedulerNameKube), pod.Spec.SchedulerName) + require.NotNil(t, pod.Spec.WorkloadRef, "WorkloadRef should be set when GangScheduling is enabled") + assert.Equal(t, "my-podgang", pod.Spec.WorkloadRef.Name) + assert.Equal(t, "my-podclique", pod.Spec.WorkloadRef.PodGroup) +} + +func TestBackend_PreparePod_GangSchedulingEnabled_MissingLabels(t *testing.T) { + cl := testutils.CreateDefaultFakeClient(nil) + recorder := record.NewFakeRecorder(10) + b := New(cl, cl.Scheme(), recorder, newGangSchedulingProfile(t)) + // Pod with no podgang/podclique labels — WorkloadRef must not be set. + pod := testutils.NewPodBuilder("test-pod", "default").Build() b.PreparePod(pod) assert.Equal(t, string(configv1alpha1.SchedulerNameKube), pod.Spec.SchedulerName) + assert.Nil(t, pod.Spec.WorkloadRef, "WorkloadRef should not be set when pod labels are missing") +} + +func TestBackend_SyncPodGang_GangSchedulingDisabled(t *testing.T) { + cl := testutils.CreateDefaultFakeClient(nil) + recorder := record.NewFakeRecorder(10) + profile := configv1alpha1.SchedulerProfile{Name: configv1alpha1.SchedulerNameKube} + b := New(cl, cl.Scheme(), recorder, profile) + + podGang := testutils.NewPodGangBuilder("my-podgang", "default"). + WithPodGroup("clique-a", 2). + Build() + + err := b.SyncPodGang(context.Background(), podGang) + require.NoError(t, err) + + // No Workload should have been created. + workloadList := &schedulingv1alpha1.WorkloadList{} + require.NoError(t, cl.List(context.Background(), workloadList, client.InNamespace("default"))) + assert.Empty(t, workloadList.Items, "no Workload should be created when GangScheduling is disabled") +} + +func TestBackend_SyncPodGang_GangSchedulingEnabled_Create(t *testing.T) { + cl := testutils.CreateDefaultFakeClient(nil) + recorder := record.NewFakeRecorder(10) + b := New(cl, cl.Scheme(), recorder, newGangSchedulingProfile(t)) + + podGang := testutils.NewPodGangBuilder("my-podgang", "default"). + WithPodGroup("clique-a", 2). + WithPodGroup("clique-b", 3). + Build() + + err := b.SyncPodGang(context.Background(), podGang) + require.NoError(t, err) + + // Workload should exist with the same name and namespace as the PodGang. + workload := &schedulingv1alpha1.Workload{} + require.NoError(t, cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "my-podgang"}, workload)) + + require.Len(t, workload.Spec.PodGroups, 2) + + assert.Equal(t, "clique-a", workload.Spec.PodGroups[0].Name) + require.NotNil(t, workload.Spec.PodGroups[0].Policy.Gang) + assert.Equal(t, int32(2), workload.Spec.PodGroups[0].Policy.Gang.MinCount) + + assert.Equal(t, "clique-b", workload.Spec.PodGroups[1].Name) + require.NotNil(t, workload.Spec.PodGroups[1].Policy.Gang) + assert.Equal(t, int32(3), workload.Spec.PodGroups[1].Policy.Gang.MinCount) +} + +func TestBackend_SyncPodGang_GangSchedulingEnabled_RecreateOnChange(t *testing.T) { + // Pre-create a Workload with MinCount=2 for clique-a. + podGang := testutils.NewPodGangBuilder("my-podgang", "default"). + WithPodGroup("clique-a", 2). + Build() + + cl := testutils.CreateDefaultFakeClient(nil) + recorder := record.NewFakeRecorder(10) + b := New(cl, cl.Scheme(), recorder, newGangSchedulingProfile(t)) + + require.NoError(t, b.SyncPodGang(context.Background(), podGang)) + + // Now sync again with MinCount=5 — Workload.Spec.PodGroups is immutable so the backend + // must delete and recreate the Workload. + podGang.Spec.PodGroups[0].MinReplicas = 5 + require.NoError(t, b.SyncPodGang(context.Background(), podGang)) + + workload := &schedulingv1alpha1.Workload{} + require.NoError(t, cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "my-podgang"}, workload)) + require.NotNil(t, workload.Spec.PodGroups[0].Policy.Gang) + assert.Equal(t, int32(5), workload.Spec.PodGroups[0].Policy.Gang.MinCount) +} + +func TestBackend_SyncPodGang_GangSchedulingEnabled_NoOpWhenUnchanged(t *testing.T) { + cl := testutils.CreateDefaultFakeClient(nil) + recorder := record.NewFakeRecorder(10) + b := New(cl, cl.Scheme(), recorder, newGangSchedulingProfile(t)) + + podGang := testutils.NewPodGangBuilder("my-podgang", "default"). + WithPodGroup("clique-a", 2). + Build() + + // First sync creates the Workload. + require.NoError(t, b.SyncPodGang(context.Background(), podGang)) + // Second sync with identical spec must not fail. + require.NoError(t, b.SyncPodGang(context.Background(), podGang)) + + workload := &schedulingv1alpha1.Workload{} + require.NoError(t, cl.Get(context.Background(), client.ObjectKey{Namespace: "default", Name: "my-podgang"}, workload)) + assert.Equal(t, int32(2), workload.Spec.PodGroups[0].Policy.Gang.MinCount) }