Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a1ff02c
feat: switch core StatefulSet to in-place rolling updates
keynslug Mar 10, 2026
f4db418
refactor(api): make EMQX status and CRD surface slimmer
keynslug Mar 11, 2026
117c47d
chore(ds): return early when DS is inactive
keynslug Mar 11, 2026
6db23f0
refactor(api): remove `Replicas` from EMQX CR status
keynslug Mar 11, 2026
1dc2345
refactor(api): rename `nodeEvacuationsStatus` to `nodeEvacuations`
keynslug Mar 11, 2026
cdfec98
chore: remove PDB compatibility shims for Kubernetes 1.21
keynslug Mar 11, 2026
f96a391
refactor: replace onServing with direct Evacuation API readiness probe
keynslug Mar 11, 2026
2c9e2d2
fix: unblock core rolling update if DS replication site
keynslug Mar 11, 2026
f022f71
refactor: streamline EMQX status conditions
keynslug Mar 25, 2026
277a4d4
fix: drop core set leave-cluster preStop hook
keynslug Mar 25, 2026
907c322
test: rework `sync_pods_suite_test`
keynslug Mar 25, 2026
a876bcc
fix(api): preserve zero-valued status counters in EMQX CR
keynslug Mar 26, 2026
4fa3752
fix: require rest of cores are available before sync progress
keynslug Mar 26, 2026
84b8671
fix: skip evacuation for single-node clusters
keynslug Mar 26, 2026
9c8d9a7
test(e2e): update "EMQX Cluster" scenario
keynslug Mar 26, 2026
6d16635
test(e2e): update "EMQX Cluster / Botched Rolling Updates" scenario
keynslug Mar 26, 2026
c0909f4
feat(api): forbid single core when replicants are configured
keynslug Mar 26, 2026
9421723
fix: ensure at least 1 new & 1 old core during replicant rollout
keynslug Mar 27, 2026
f4162ea
fix: correct core set scale-down behavior
keynslug Mar 27, 2026
9a3ad81
fix: ensure outdated core pod list is empty if no updates
keynslug Mar 30, 2026
923e881
fix: force requeue after core/replicant set added or updated
keynslug Mar 30, 2026
9d0abf0
fix: run dsCleanupSites earlier in the reconciler chain
keynslug Mar 30, 2026
e8113d1
fix: rely on DS cluster state only to compute target replica sets
keynslug Mar 30, 2026
b91efbe
feat: specify PVC for core pods by default
keynslug Mar 30, 2026
b3c64ae
test(e2e): test both scale-up and scale-down
keynslug Mar 30, 2026
66c90fd
test(e2e): enrich diagnostic output on failures
keynslug Mar 30, 2026
16918c8
fix: ensure syncCoreSet stops evacuation explicitly on pod update
keynslug Mar 30, 2026
e17900c
feat: add sync cluster membership reconciler
keynslug Mar 30, 2026
f10e483
test(e2e): verify EMQX cluster view during tests
keynslug Mar 30, 2026
431b20c
test(e2e): update "EMQX Core-Replicant DS-Enabled Cluster" scenario
keynslug Mar 31, 2026
53c35ab
test: split `sync_pods_suite_test` into separate core + replicant suites
keynslug Mar 31, 2026
087b17a
refactor(api): move minReadySeconds to core + replicant templates
keynslug Mar 31, 2026
5b7eb01
fix: allow reconcilers to ask for short-timeout requeue
keynslug Mar 31, 2026
9582a67
fix: simplify `addService` + refactor test suite
keynslug Mar 31, 2026
c956d5a
feat(controller): trigger reconcile on EMQX CR spec changes only
keynslug Mar 31, 2026
5e0a504
fix: mark EMQX not ready when DS replication is unstable
keynslug Mar 31, 2026
9239813
fix: use safer listeners Service selectors for pod targeting
keynslug Mar 31, 2026
967838d
fix: stop propagating core template label to PVC templates
keynslug Apr 1, 2026
1e91ed5
chore(handler): drop dead code
keynslug Apr 1, 2026
8e61017
fix: stop using core template labels for selectors
keynslug Apr 1, 2026
ba3f3ac
fix: prefer EMQX 6.x to load DS cluster view
keynslug Apr 7, 2026
1d43716
test(e2e): update "EMQX Runtime-enabled DS Replication" scenario
keynslug Apr 1, 2026
e58f4ad
feat: promote EMQX CRD to v3alpha1 + turn Rebalance CRD off
keynslug Apr 2, 2026
34071b7
chore: define shared constants in `api` package
keynslug Apr 2, 2026
5b2726b
chore: drop dead code
keynslug Apr 2, 2026
8a234d6
chore(helm): switch to 3.0.0 and drop compat measures
keynslug Apr 7, 2026
1fa3208
fix: sort outdated pods by ordinal numerically
keynslug Apr 7, 2026
8b55e6d
fix: tolerate undefined "current" replicant set
keynslug Apr 7, 2026
ae46b62
chore: leave comment on API conflict handling
keynslug Apr 8, 2026
888e597
fix: avoid deleting `nil` core pod candidate during scale down
keynslug Apr 8, 2026
1f32248
chore: leave comment about `syncCoreSet` responsibilities
keynslug Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ resources:
domain: emqx.io
group: apps
kind: EMQX
path: github.com/emqx/emqx-operator/api/v2
version: v2
- api:
crdVersion: v1
namespaced: true
controller: true
domain: emqx.io
group: apps
kind: Rebalance
path: github.com/emqx/emqx-operator/api/v2beta1
version: v2beta1
path: github.com/emqx/emqx-operator/api/v3alpha1
version: v3alpha1
version: "3"
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ This operator is compatible with the following EMQX releases:
- EMQX 5.10
- EMQX 6.x

Requires Kubernetes >= 1.27. PVC auto-cleanup for core nodes (on scale-down and CR deletion) relies on the `StatefulSetAutoDeletePVC` feature gate, which is stable and enabled by default since Kubernetes 1.32.

## Installation

Here's the simplest way to install the operator.
Expand Down Expand Up @@ -74,8 +76,8 @@ kubectl logs -l "control-plane=controller-manager" --tail=-1 --namespace emqx-op
### Prerequisites
- go version v1.22.0+
- docker version 17.03+.
- kubectl version v1.24+.
- Access to a Kubernetes v1.24+ cluster.
- kubectl version v1.27+.
- Access to a Kubernetes v1.27+ cluster.

### To Deploy on the cluster
**Build and push your image to the location specified by `OPERATOR_IMAGE`:**
Expand Down
7 changes: 6 additions & 1 deletion api/v2beta1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// package v2beta1 contains API Schema definitions for the apps v2beta1 API group.
// Package v2beta1 contains Rebalance API Schema definitions for the apps.emqx.io v2beta1 API group.
//
// NOTE: The Rebalance CRD is currently NOT FUNCTIONAL.
// Type definitions are retained for future use but the CRD is not registered
// with the scheme, not installed in the cluster, and no controller reconciles it.
//
// +kubebuilder:object:generate=true
// +groupName=apps.emqx.io
package v2beta1
Expand Down
15 changes: 12 additions & 3 deletions api/v2beta1/rebalance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ limitations under the License.

package v2beta1

// NOTE: The Rebalance CRD is currently NOT FUNCTIONAL.
// The type definitions are retained for future use but the CRD is not registered
// with the scheme and no controller reconciles it. Do not create Rebalance custom
// resources — they will be ignored by the operator.
//
// To re-enable, uncomment the init() function at the bottom of this file and
// wire up a corresponding controller in cmd/main.go.

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -44,6 +52,7 @@ type RebalanceList struct {
Items []Rebalance `json:"items"`
}

func init() {
SchemeBuilder.Register(&Rebalance{}, &RebalanceList{})
}
// NOT FUNCTIONAL — Rebalance CRD is disabled. Uncomment to re-enable.
// func init() {
// SchemeBuilder.Register(&Rebalance{}, &RebalanceList{})
// }
4 changes: 1 addition & 3 deletions api/v2/const.go → api/v3alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v2
package v3alpha1

import corev1 "k8s.io/api/core/v1"

Expand All @@ -36,6 +36,4 @@ const (
const (
// Whether the pod is responsible for DS replication
DSReplicationSite corev1.PodConditionType = "apps.emqx.io/ds-replication-site"
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate
PodOnServing corev1.PodConditionType = "apps.emqx.io/on-serving"
)
2 changes: 1 addition & 1 deletion api/v2/emqx_types.go → api/v3alpha1/emqx_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v2
package v3alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
44 changes: 31 additions & 13 deletions api/v2/emqx_types_spec.go → api/v3alpha1/emqx_types_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v2
package v3alpha1

import (
corev1 "k8s.io/api/core/v1"
Expand All @@ -23,6 +23,7 @@ import (
)

// EMQXSpec defines the desired state of EMQX.
// +kubebuilder:validation:XValidation:rule="!has(self.replicantTemplate) || !has(self.replicantTemplate.spec.replicas) || self.replicantTemplate.spec.replicas == 0 || self.coreTemplate.spec.replicas >= 2",message="Core-replicant clusters require at least 2 core replicas for rolling updates."
type EMQXSpec struct {
// EMQX container image.
// More info: https://kubernetes.io/docs/concepts/containers/images
Expand Down Expand Up @@ -58,11 +59,11 @@ type EMQXSpec struct {
RevisionHistoryLimit int32 `json:"revisionHistoryLimit,omitempty"`

// Cluster upgrade strategy settings.
// +kubebuilder:default={type:Recreate}
// +kubebuilder:default={type:RollingUpdate}
UpdateStrategy UpdateStrategy `json:"updateStrategy,omitempty"`

// Template for Pods running EMQX core nodes.
// +kubebuilder:default={spec:{replicas:2}}
// +kubebuilder:default={spec:{replicas:2,persistentVolumeClaimSpec:{accessModes:{"ReadWriteOnce"},resources:{requests:{storage:"500Mi"}}}}}
CoreTemplate EMQXCoreTemplate `json:"coreTemplate,omitempty"`

// Template for Pods running EMQX replicant nodes.
Expand Down Expand Up @@ -118,14 +119,10 @@ type Config struct {

type UpdateStrategy struct {
// Determines how cluster upgrade is performed.
// * `Recreate`: Perform blue-green upgrade.
// +kubebuilder:validation:Enum=Recreate
// +kubebuilder:default=Recreate
// * `RollingUpdate`: Perform a rolling upgrade, updating pods one at a time.
// +kubebuilder:validation:Enum=RollingUpdate
// +kubebuilder:default=RollingUpdate
Type string `json:"type,omitempty"`
// Number of seconds before connection evacuation starts.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=10
InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"`
// Evacuation strategy settings.
EvacuationStrategy EvacuationStrategy `json:"evacuationStrategy,omitempty"`
}
Expand Down Expand Up @@ -177,8 +174,7 @@ type EMQXCoreTemplateSpec struct {
EMQXReplicantTemplateSpec `json:",inline"`

// PVC specification for a core node data storage.
// Note: this field named inconsistently, it is actually just a `PersistentVolumeClaimSpec`.
VolumeClaimTemplates corev1.PersistentVolumeClaimSpec `json:"volumeClaimTemplates,omitempty"`
PersistentVolumeClaimSpec corev1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"`
}

type EMQXReplicantTemplateSpec struct {
Expand Down Expand Up @@ -215,6 +211,12 @@ type EMQXReplicantTemplateSpec struct {
// by specifying 0. This is a mutually exclusive setting with "minAvailable".
// +kubebuilder:validation:XIntOrString
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// MinReadySeconds is the minimum time (seconds) a pod must be Ready before it counts as available.
// For core nodes this is applied to the StatefulSet (mirrors apps/v1 StatefulSetSpec.minReadySeconds);
// for replicants, to the ReplicaSet (mirrors apps/v1 ReplicaSetSpec.minReadySeconds).
// Omitted or zero matches the apps/v1 default (0).
// +kubebuilder:validation:Minimum=0
MinReadySeconds int32 `json:"minReadySeconds,omitempty"`

// Entrypoint array. Not executed within a shell.
// The container image's ENTRYPOINT is used if this is not provided.
Expand Down Expand Up @@ -285,8 +287,10 @@ type EMQXReplicantTemplateSpec struct {
LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty"`
// Periodic probe of container service readiness.
// Container will be removed from service endpoints if the probe fails.
// Strongly advised to keep the current default: it takes into account ongoing node evacuations managed
// by the Operator as part of scaling operations and rolling updates.
// More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
// +kubebuilder:default={initialDelaySeconds:10,periodSeconds:5,failureThreshold:12,httpGet: {path:/status, port:"dashboard"}}
// +kubebuilder:default={initialDelaySeconds:10,periodSeconds:5,timeoutSeconds:3,failureThreshold:1,httpGet:{path:"/api/v5/load_rebalance/availability_check", port:"dashboard"}}
ReadinessProbe *corev1.Probe `json:"readinessProbe,omitempty"`
// StartupProbe indicates that the Pod has successfully initialized.
// If specified, no other probes are executed until this completes successfully.
Expand Down Expand Up @@ -318,3 +322,17 @@ func (spec *EMQXSpec) HasReplicants() bool {
func (s *ServiceTemplate) IsEnabled() bool {
return s.Enabled != nil && *s.Enabled
}

func (spec *EMQXSpec) NumCoreReplicas() int32 {
if spec.CoreTemplate.Spec.Replicas != nil {
return *spec.CoreTemplate.Spec.Replicas
}
return 1
}

func (spec *EMQXSpec) NumReplicantReplicas() int32 {
if spec.ReplicantTemplate != nil && spec.ReplicantTemplate.Spec.Replicas != nil {
return *spec.ReplicantTemplate.Spec.Replicas
}
return 0
}
100 changes: 41 additions & 59 deletions api/v2/emqx_types_status.go → api/v3alpha1/emqx_types_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v2
package v3alpha1

import (
"slices"
Expand All @@ -32,15 +32,15 @@ type EMQXStatus struct {
// Status of each core node in the cluster.
CoreNodes []EMQXNode `json:"coreNodes,omitempty"`
// Summary status of the set of core nodes.
CoreNodesStatus EMQXNodesStatus `json:"coreNodesStatus,omitempty"`
CoreNodesStatus CoreNodesStatus `json:"coreNodesStatus,omitempty"`

// Status of each replicant node in the cluster.
ReplicantNodes []EMQXNode `json:"replicantNodes,omitempty"`
// Summary status of the set of replicant nodes.
ReplicantNodesStatus EMQXNodesStatus `json:"replicantNodesStatus,omitempty"`
ReplicantNodesStatus ReplicantNodesStatus `json:"replicantNodesStatus,omitempty"`

// Status of active node evacuations in the cluster.
NodeEvacuationsStatus []NodeEvacuationStatus `json:"nodeEvacuationsStatus,omitempty"`
NodeEvacuations []NodeEvacuationStatus `json:"nodeEvacuations,omitempty"`
// Status of EMQX Durable Storage replication.
DSReplication DSReplicationStatus `json:"dsReplication,omitempty"`
}
Expand All @@ -65,30 +65,40 @@ type NodeEvacuationStatus struct {
InitialConnections int32 `json:"initialConnections,omitempty"`
}

type EMQXNodesStatus struct {
// Total number of replicas.
Replicas int32 `json:"replicas,omitempty"`
// CoreNodesStatus is the summary status of core nodes managed by a single StatefulSet.
type CoreNodesStatus struct {
// Number of ready replicas.
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// Current revision of the respective core or replicant set.
ReadyReplicas int32 `json:"readyReplicas"`
// Number of replicas already updated to the desired pod template.
UpdatedReplicas int32 `json:"updatedReplicas"`
// Number of replicas still running the previous pod template.
CurrentReplicas int32 `json:"currentReplicas"`
}

// ReplicantNodesStatus is the summary status of the set of replicant nodes.
// The multi-ReplicaSet pattern requires revision tracking at the CR level.
type ReplicantNodesStatus struct {
// Number of ready replicas.
ReadyReplicas int32 `json:"readyReplicas"`
// Current revision of the replicant set.
CurrentRevision string `json:"currentRevision,omitempty"`
// Number of replicas running current revision.
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
// Update revision of the respective core or replicant set.
CurrentReplicas int32 `json:"currentReplicas"`
// Update revision of the replicant set.
// When different from the current revision, the set is being updated.
UpdateRevision string `json:"updateRevision,omitempty"`
// Number of replicas running update revision.
UpdateReplicas int32 `json:"updateReplicas,omitempty"`
UpdateReplicas int32 `json:"updateReplicas"`

CollisionCount *int32 `json:"collisionCount,omitempty"`
}

type EMQXNode struct {
// Node name
// +kubebuilder:example="emqx@emqx-core-557c8b7684-0.emqx-headless.default.svc.cluster.local"
// +kubebuilder:example="emqx@emqx-core-0.emqx-headless.default.svc.cluster.local"
Name string `json:"name,omitempty"`
// Corresponding pod name
// +kubebuilder:example="emqx-core-557c8b7684-0"
// +kubebuilder:example="emqx-core-0"
PodName string `json:"podName,omitempty"`
// Node status
// +kubebuilder:example=running
Expand All @@ -103,9 +113,9 @@ type EMQXNode struct {
// +kubebuilder:example=core
Role string `json:"role,omitempty"`
// Number of MQTT sessions
Sessions int64 `json:"sessions,omitempty"`
Sessions int64 `json:"sessions"`
// Number of connected MQTT clients
Connections int64 `json:"connections,omitempty"`
Connections int64 `json:"connections"`
}

func (s EMQXStatus) FindNode(node string) *EMQXNode {
Expand Down Expand Up @@ -162,57 +172,29 @@ type DSDBReplicationStatus struct {
}

const (
Initialized string = "Initialized"
CoreNodesProgressing string = "CoreNodesProgressing"
CoreNodesReady string = "CoreNodesReady"
ReplicantNodesProgressing string = "ReplicantNodesProgressing"
ReplicantNodesReady string = "ReplicantNodesReady"
Available string = "Available"
Ready string = "Ready"
)

func (s *EMQXStatus) ResetConditions(reason string) {
conditionTypes := []string{}
for _, c := range s.Conditions {
if c.Type != Initialized && c.Status == metav1.ConditionTrue {
conditionTypes = append(conditionTypes, c.Type)
}
func (s *EMQXStatus) SetCondition(ty string, status metav1.ConditionStatus, reason, message string) {
_, existing := s.GetCondition(ty)
if existing != nil &&
existing.Status == status &&
existing.Reason == reason &&
existing.Message == message {
return
}
for _, conditionType := range conditionTypes {
s.SetFalseCondition(conditionType, reason)
s.RemoveCondition(ty)
c := metav1.Condition{
Type: ty,
Status: status,
Reason: reason,
Message: message,
LastTransitionTime: metav1.Now(),
}
}

func (s *EMQXStatus) SetCondition(c metav1.Condition) {
s.RemoveCondition(c.Type)
c.LastTransitionTime = metav1.Now()
s.Conditions = slices.Insert(s.Conditions, 0, c)
}

func (s *EMQXStatus) SetTrueCondition(conditionType string) {
s.SetCondition(metav1.Condition{
Type: conditionType,
Status: metav1.ConditionTrue,
Reason: conditionType,
})
}

func (s *EMQXStatus) SetFalseCondition(conditionType string, reason string) {
s.SetCondition(metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Reason: reason,
})
}

func (s *EMQXStatus) GetLastTrueCondition() *metav1.Condition {
for i := range s.Conditions {
c := s.Conditions[i]
if c.Status == metav1.ConditionTrue {
return &c
}
}
return nil
s.Conditions = append(s.Conditions, c)
}

func (s *EMQXStatus) GetCondition(conditionType string) (int, *metav1.Condition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// package v2 contains API Schema definitions for the apps v2 API group.
// Package v3alpha1 contains API Schema definitions for the apps.emqx.io v3alpha1 API group.
// +kubebuilder:object:generate=true
// +groupName=apps.emqx.io
package v2
package v3alpha1

import (
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -26,7 +26,7 @@ import (

var (
// GroupVersion is group version used to register these objects.
GroupVersion = schema.GroupVersion{Group: "apps.emqx.io", Version: "v2"}
GroupVersion = schema.GroupVersion{Group: "apps.emqx.io", Version: "v3alpha1"}

// SchemeBuilder is used to add go types to the GroupVersionKind scheme.
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
Expand Down
2 changes: 1 addition & 1 deletion api/v2/labels.go → api/v3alpha1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v2
package v3alpha1

import (
"maps"
Expand Down
Loading
Loading