From ab2defc7410863e88dd3af301cd6b626baacec5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?p=C3=BDrus?= Date: Thu, 26 Feb 2026 13:12:14 +0100 Subject: [PATCH] core: refactor lbaas util functions --- pkg/ingress/controller/openstack/octavia.go | 2 +- pkg/openstack/loadbalancer.go | 2 +- pkg/util/openstack/loadbalancer.go | 755 +++++++++----------- pkg/util/openstack/loadbalancer_serial.go | 2 +- 4 files changed, 341 insertions(+), 420 deletions(-) diff --git a/pkg/ingress/controller/openstack/octavia.go b/pkg/ingress/controller/openstack/octavia.go index 50b0d34b6d..686159af72 100644 --- a/pkg/ingress/controller/openstack/octavia.go +++ b/pkg/ingress/controller/openstack/octavia.go @@ -529,7 +529,7 @@ func (os *OpenStack) UpdateLoadbalancerMembers(ctx context.Context, lbID string, for _, pool := range lbPools { log.WithFields(log.Fields{"poolID": pool.ID}).Debug("Starting to update pool members") - members, err := openstackutil.GetMembersbyPool(ctx, os.Octavia, pool.ID) + members, err := openstackutil.GetPoolMembers(ctx, os.Octavia, pool.ID) if err != nil { log.WithFields(log.Fields{"poolID": pool.ID}).Errorf("Failed to get pool members: %v", err) continue diff --git a/pkg/openstack/loadbalancer.go b/pkg/openstack/loadbalancer.go index c67ba1f1b4..b11212ff4e 100644 --- a/pkg/openstack/loadbalancer.go +++ b/pkg/openstack/loadbalancer.go @@ -946,7 +946,7 @@ func (lbaas *LbaasV2) ensureOctaviaPool(ctx context.Context, lbID string, name s } curMembers := sets.New[string]() - poolMembers, err := openstackutil.GetMembersbyPool(ctx, lbaas.lb, pool.ID) + poolMembers, err := openstackutil.GetPoolMembers(ctx, lbaas.lb, pool.ID) if err != nil { klog.Errorf("failed to get members in the pool %s: %v", pool.ID, err) } diff --git a/pkg/util/openstack/loadbalancer.go b/pkg/util/openstack/loadbalancer.go index 6d9288a1a9..348103d7a5 100644 --- a/pkg/util/openstack/loadbalancer.go +++ b/pkg/util/openstack/loadbalancer.go @@ -40,20 +40,23 @@ import ( ) const ( - OctaviaFeatureTags = 0 - OctaviaFeatureVIPACL = 1 - OctaviaFeatureFlavors = 2 - OctaviaFeatureTimeout = 3 - OctaviaFeatureAvailabilityZones = 4 - OctaviaFeatureHTTPMonitorsOnUDP = 5 - - waitLoadbalancerInitDelay = 1 * time.Second - waitLoadbalancerFactor = 1.2 - waitLoadbalancerActiveSteps = 23 - waitLoadbalancerDeleteSteps = 24 - - activeStatus = "ACTIVE" - errorStatus = "ERROR" + // Octavia feature flags for version checking + OctaviaFeatureTags = 0 // Tag support (v2.5+) + OctaviaFeatureVIPACL = 1 // VIP ACL support (v2.12+, not supported on OVN) + OctaviaFeatureFlavors = 2 // Flavor support (v2.6+, not supported on OVN) + OctaviaFeatureTimeout = 3 // Timeout support (v2.1+, not supported on OVN) + OctaviaFeatureAvailabilityZones = 4 // Availability zone support (v2.14+, not supported on OVN) + OctaviaFeatureHTTPMonitorsOnUDP = 5 // HTTP health monitors on UDP pools (v2.16+, not supported on OVN) + + // Wait configuration for load balancer operations + waitLoadbalancerInitDelay = 1 * time.Second // Initial delay before first check + waitLoadbalancerFactor = 1.2 // Exponential backoff multiplier + waitLoadbalancerActiveSteps = 23 // Max steps for ACTIVE status (~5 min total) + waitLoadbalancerDeleteSteps = 24 // Max steps for DELETE (~2 min total) + + // Load balancer provisioning states + activeStatus = "ACTIVE" // Load balancer is operational + errorStatus = "ERROR" // Load balancer is in error state ) var ( @@ -73,7 +76,7 @@ func getOctaviaVersion(ctx context.Context, client *gophercloud.ServiceClient) ( return defaultVer, err } versions, err := apiversions.ExtractAPIVersions(allPages) - if err != nil { + if mc.ObserveRequest(err) != nil { return defaultVer, err } if len(versions) == 0 { @@ -89,7 +92,17 @@ func getOctaviaVersion(ctx context.Context, client *gophercloud.ServiceClient) ( return octaviaVersion, nil } -// IsOctaviaFeatureSupported returns true if the given feature is supported in the deployed Octavia version. +// IsOctaviaFeatureSupported checks if a specific Octavia feature is available +// based on the deployed API version and load balancer provider. +// +// The function compares the current Octavia version against the minimum required +// version for each feature. Some features are not supported on certain providers +// (e.g., OVN provider does not support VIP ACL, flavors, timeouts, etc.). +// +// Returns false if: +// - The Octavia version cannot be determined +// - The feature is not supported by the provider +// - The version is below the minimum required version func IsOctaviaFeatureSupported(ctx context.Context, client *gophercloud.ServiceClient, feature int, lbProvider string) bool { octaviaVer, err := getOctaviaVersion(ctx, client) if err != nil { @@ -152,6 +165,127 @@ func IsOctaviaFeatureSupported(ctx context.Context, client *gophercloud.ServiceC return false } +// ============================================================================== +// Internal Helper Functions +// ============================================================================== + +// executeAndWaitActive executes an operation with metrics tracking and waits for +// the load balancer to return to ACTIVE state. Used for operations that don't +// return a result (delete, update without returning updated object). +func executeAndWaitActive(ctx context.Context, client *gophercloud.ServiceClient, + lbID, resourceType, operation string, fn func() error) error { + + _, err := executeExtractAndWaitActive(ctx, client, lbID, resourceType, operation, + func() (*struct{}, error) { + if err := fn(); err != nil { + return nil, err + } + return &struct{}{}, nil + }) + return err +} + +// executeExtractAndWaitActive executes an operation with metrics tracking, extracts +// the result, and waits for the load balancer to return to ACTIVE state. +// Used for create operations that return the created resource. +// For delete operations, NotFound errors are logged but not returned as errors. +func executeExtractAndWaitActive[T any](ctx context.Context, client *gophercloud.ServiceClient, + lbID, resourceType, operation string, fn func() (T, error)) (T, error) { + + var zero T + mc := metrics.NewMetricContext(resourceType, operation) + result, err := fn() + + // For delete operations, treat NotFound as success (already deleted) + if err != nil && operation == "delete" && cpoerrors.IsNotFound(err) { + klog.V(2).Infof("%s was already deleted", resourceType) + _ = mc.ObserveRequest(nil) + // Still wait for load balancer to be active for consistency + if _, waitErr := WaitActiveAndGetLoadBalancer(ctx, client, lbID); waitErr != nil { + return zero, fmt.Errorf("failed to wait for load balancer %s ACTIVE after %s %s: %v", + lbID, operation, resourceType, waitErr) + } + return zero, nil + } + + // Observe the request result (error or success) + if mc.ObserveRequest(err) != nil { + return zero, fmt.Errorf("failed to %s %s: %v", operation, resourceType, err) + } + + if _, waitErr := WaitActiveAndGetLoadBalancer(ctx, client, lbID); waitErr != nil { + return zero, fmt.Errorf("failed to wait for load balancer %s ACTIVE after %s %s: %v", + lbID, operation, resourceType, waitErr) + } + return result, nil +} + +// list performs pagination and returns all results with metrics tracking. +func list[T any](ctx context.Context, resourceType, operation string, + pager pagination.Pager, extractFn func(pagination.Page) ([]T, error)) ([]T, error) { + + mc := metrics.NewMetricContext(resourceType, operation) + allPages, err := pager.AllPages(ctx) + if mc.ObserveRequest(err) != nil { + return nil, err + } + + results, err := extractFn(allPages) + if err != nil { + return nil, err + } + + return results, nil +} + +// listWithUniqueResult performs pagination and expects exactly one result. +// Returns ErrNotFound if no results, ErrMultipleResults if more than one. +// Stops pagination early if multiple results are detected. +func listWithUniqueResult[T any](ctx context.Context, resourceType, operation string, + pager pagination.Pager, extractFn func(pagination.Page) ([]T, error)) (*T, error) { + + mc := metrics.NewMetricContext(resourceType, operation) + var results []T + err := pager.EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { + items, err := extractFn(page) + if err != nil { + return false, err + } + results = append(results, items...) + // Stop early if we found more than one result + if len(results) > 1 { + return false, cpoerrors.ErrMultipleResults + } + return true, nil + }) + + if mc.ObserveRequest(err) != nil { + if cpoerrors.IsNotFound(err) { + return nil, cpoerrors.ErrNotFound + } + return nil, err + } + + if len(results) == 0 { + return nil, cpoerrors.ErrNotFound + } + return &results[0], nil +} + +// getSingleResource executes a single resource retrieval operation with metrics tracking. +// Used for Get operations that retrieve a resource by ID. +func getSingleResource[T any](ctx context.Context, resourceType, operation string, + fn func() (T, error)) (T, error) { + + mc := metrics.NewMetricContext(resourceType, operation) + result, err := fn() + if mc.ObserveRequest(err) != nil { + var zero T + return zero, fmt.Errorf("failed to %s %s: %v", operation, resourceType, err) + } + return result, nil +} + func getTimeoutSteps(name string, steps int) int { if v := os.Getenv(name); v != "" { s, err := strconv.Atoi(v) @@ -162,7 +296,20 @@ func getTimeoutSteps(name string, steps int) int { return steps } -// WaitActiveAndGetLoadBalancer wait for LB active then return the LB object for further usage +// ============================================================================== +// Load Balancer Operations +// ============================================================================== + +// WaitActiveAndGetLoadBalancer waits for a load balancer to reach ACTIVE status +// and returns the updated load balancer object. It uses exponential backoff and +// will timeout after the configured number of steps (default 23, approximately 5 minutes). +// +// Returns an error if: +// - The load balancer enters ERROR state +// - The timeout is exceeded +// - API calls fail for reasons other than transient status checks +// +// The timeout can be customized using the OCCM_WAIT_LB_ACTIVE_STEPS environment variable. func WaitActiveAndGetLoadBalancer(ctx context.Context, client *gophercloud.ServiceClient, loadbalancerID string) (*loadbalancers.LoadBalancer, error) { klog.InfoS("Waiting for load balancer ACTIVE", "lbID", loadbalancerID) steps := getTimeoutSteps("OCCM_WAIT_LB_ACTIVE_STEPS", waitLoadbalancerActiveSteps) @@ -200,58 +347,28 @@ func WaitActiveAndGetLoadBalancer(ctx context.Context, client *gophercloud.Servi return loadbalancer, err } -// GetLoadBalancers returns all the filtered load balancer. +// GetLoadBalancers retrieves all load balancers matching the provided filters. +// Use ListOpts to filter by name, project ID, tags, or other attributes. func GetLoadBalancers(ctx context.Context, client *gophercloud.ServiceClient, opts loadbalancers.ListOpts) ([]loadbalancers.LoadBalancer, error) { - mc := metrics.NewMetricContext("loadbalancer", "list") - allPages, err := loadbalancers.List(client, opts).AllPages(ctx) - if mc.ObserveRequest(err) != nil { - return nil, err - } - allLoadbalancers, err := loadbalancers.ExtractLoadBalancers(allPages) - if err != nil { - return nil, err - } - - return allLoadbalancers, nil + pager := loadbalancers.List(client, opts) + return list(ctx, "loadbalancer", "list", pager, loadbalancers.ExtractLoadBalancers) } -// GetLoadbalancerByID retrieves loadbalancer object +// GetLoadbalancerByID retrieves a load balancer by ID. func GetLoadbalancerByID(ctx context.Context, client *gophercloud.ServiceClient, lbID string) (*loadbalancers.LoadBalancer, error) { - mc := metrics.NewMetricContext("loadbalancer", "get") - lb, err := loadbalancers.Get(ctx, client, lbID).Extract() - if mc.ObserveRequest(err) != nil { - return nil, err - } - - return lb, nil + return getSingleResource(ctx, "loadbalancer", "get", + func() (*loadbalancers.LoadBalancer, error) { + return loadbalancers.Get(ctx, client, lbID).Extract() + }) } -// GetLoadbalancerByName retrieves loadbalancer object +// GetLoadbalancerByName retrieves a load balancer by name. func GetLoadbalancerByName(ctx context.Context, client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) { - opts := loadbalancers.ListOpts{ - Name: name, - } - mc := metrics.NewMetricContext("loadbalancer", "list") - allPages, err := loadbalancers.List(client, opts).AllPages(ctx) - if mc.ObserveRequest(err) != nil { - return nil, err - } - loadbalancerList, err := loadbalancers.ExtractLoadBalancers(allPages) - if err != nil { - return nil, err - } - - if len(loadbalancerList) > 1 { - return nil, cpoerrors.ErrMultipleResults - } - if len(loadbalancerList) == 0 { - return nil, cpoerrors.ErrNotFound - } - - return &loadbalancerList[0], nil + pager := loadbalancers.List(client, loadbalancers.ListOpts{Name: name}) + return listWithUniqueResult(ctx, "loadbalancer", "list", pager, loadbalancers.ExtractLoadBalancers) } -// UpdateLoadBalancerTags updates tags for the load balancer +// UpdateLoadBalancerTags updates tags for a load balancer and waits for it to become active. func UpdateLoadBalancerTags(ctx context.Context, client *gophercloud.ServiceClient, lbID string, tags []string) error { updateOpts := loadbalancers.UpdateOpts{ Tags: &tags, @@ -262,23 +379,15 @@ func UpdateLoadBalancerTags(ctx context.Context, client *gophercloud.ServiceClie return err } -// UpdateLoadBalancer updates the load balancer +// UpdateLoadBalancer updates a load balancer and waits for it to become active. func UpdateLoadBalancer(ctx context.Context, client *gophercloud.ServiceClient, lbID string, updateOpts loadbalancers.UpdateOpts) (*loadbalancers.LoadBalancer, error) { - mc := metrics.NewMetricContext("loadbalancer", "update") - _, err := loadbalancers.Update(ctx, client, lbID, updateOpts).Extract() - if mc.ObserveRequest(err) != nil { - return nil, err - } - - lb, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID) - if err != nil { - return nil, fmt.Errorf("failed to wait for load balancer %s ACTIVE after updating: %v", lbID, err) - } - - return lb, nil + return executeExtractAndWaitActive(ctx, client, lbID, "loadbalancer", "update", + func() (*loadbalancers.LoadBalancer, error) { + return loadbalancers.Update(ctx, client, lbID, updateOpts).Extract() + }) } -func waitLoadbalancerDeleted(ctx context.Context, client *gophercloud.ServiceClient, loadbalancerID string) error { +func waitLoadBalancerDeleted(ctx context.Context, client *gophercloud.ServiceClient, loadbalancerID string) error { klog.V(4).InfoS("Waiting for load balancer deleted", "lbID", loadbalancerID) backoff := wait.Backoff{ Duration: waitLoadbalancerInitDelay, @@ -305,7 +414,7 @@ func waitLoadbalancerDeleted(ctx context.Context, client *gophercloud.ServiceCli return err } -// DeleteLoadbalancer deletes a loadbalancer and wait for it's gone. +// DeleteLoadbalancer deletes a load balancer and waits for it to be fully deleted. func DeleteLoadbalancer(ctx context.Context, client *gophercloud.ServiceClient, lbID string, cascade bool) error { opts := loadbalancers.DeleteOpts{} if cascade { @@ -320,421 +429,233 @@ func DeleteLoadbalancer(ctx context.Context, client *gophercloud.ServiceClient, } _ = mc.ObserveRequest(nil) - if err := waitLoadbalancerDeleted(ctx, client, lbID); err != nil { + if err := waitLoadBalancerDeleted(ctx, client, lbID); err != nil { return err } return nil } -// UpdateListener updates a listener and wait for the lb active -func UpdateListener(ctx context.Context, client *gophercloud.ServiceClient, lbID string, listenerID string, opts listeners.UpdateOpts) error { - mc := metrics.NewMetricContext("loadbalancer_listener", "update") - _, err := listeners.Update(ctx, client, listenerID, opts).Extract() - if mc.ObserveRequest(err) != nil { - return err - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after updating listener: %v", lbID, err) - } +// ============================================================================== +// Listener Operations +// ============================================================================== - return nil +// UpdateListener updates a listener and waits for the load balancer to become active. +func UpdateListener(ctx context.Context, client *gophercloud.ServiceClient, lbID string, listenerID string, opts listeners.UpdateOpts) error { + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_listener", "update", + func() error { + _, err := listeners.Update(ctx, client, listenerID, opts).Extract() + return err + }) } -// CreateListener creates a new listener +// CreateListener creates a listener and waits for the load balancer to become active. func CreateListener(ctx context.Context, client *gophercloud.ServiceClient, lbID string, opts listeners.CreateOpts) (*listeners.Listener, error) { - mc := metrics.NewMetricContext("loadbalancer_listener", "create") - listener, err := listeners.Create(ctx, client, opts).Extract() - if mc.ObserveRequest(err) != nil { - return nil, err - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return nil, fmt.Errorf("failed to wait for load balancer %s ACTIVE after creating listener: %v", lbID, err) - } - - return listener, nil + return executeExtractAndWaitActive(ctx, client, lbID, "loadbalancer_listener", "create", + func() (*listeners.Listener, error) { + return listeners.Create(ctx, client, opts).Extract() + }) } -// DeleteListener deletes a listener. +// DeleteListener deletes a listener and waits for the load balancer to become active. func DeleteListener(ctx context.Context, client *gophercloud.ServiceClient, listenerID string, lbID string) error { - mc := metrics.NewMetricContext("loadbalancer_listener", "delete") - if err := listeners.Delete(ctx, client, listenerID).ExtractErr(); mc.ObserveRequest(err) != nil { - if cpoerrors.IsNotFound(err) { - klog.V(2).Infof("Listener %s for load balancer %s was already deleted: %v", listenerID, lbID, err) - } else { - _ = mc.ObserveRequest(err) - return fmt.Errorf("error deleting listener %s for load balancer %s: %v", listenerID, lbID, err) - } - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after deleting listener: %v", lbID, err) - } - - return nil + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_listener", "delete", + func() error { + return listeners.Delete(ctx, client, listenerID).ExtractErr() + }) } -// GetListenerByName gets a listener by its name, raise error if not found or get multiple ones. +// GetListenerByName retrieves a listener by name within a specific load balancer. +// Returns ErrNotFound if no listener is found, or ErrMultipleResults if multiple listeners match. func GetListenerByName(ctx context.Context, client *gophercloud.ServiceClient, name string, lbID string) (*listeners.Listener, error) { - opts := listeners.ListOpts{ - Name: name, - LoadbalancerID: lbID, - } - mc := metrics.NewMetricContext("loadbalancer_listener", "list") - pager := listeners.List(client, opts) - var listenerList []listeners.Listener - - err := pager.EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { - v, err := listeners.ExtractListeners(page) - if err != nil { - return false, err - } - listenerList = append(listenerList, v...) - if len(listenerList) > 1 { - return false, cpoerrors.ErrMultipleResults - } - return true, nil - }) - if mc.ObserveRequest(err) != nil { - if cpoerrors.IsNotFound(err) { - return nil, cpoerrors.ErrNotFound - } - return nil, err - } - - if len(listenerList) == 0 { - return nil, cpoerrors.ErrNotFound - } - - return &listenerList[0], nil + pager := listeners.List(client, listeners.ListOpts{Name: name, LoadbalancerID: lbID}) + return listWithUniqueResult(ctx, "loadbalancer_listener", "list", pager, listeners.ExtractListeners) } -// GetListenersByLoadBalancerID returns listener list +// GetListenersByLoadBalancerID retrieves all listeners for a specific load balancer. +// Returns an empty slice if no listeners are found. func GetListenersByLoadBalancerID(ctx context.Context, client *gophercloud.ServiceClient, lbID string) ([]listeners.Listener, error) { - mc := metrics.NewMetricContext("loadbalancer_listener", "list") - var lbListeners []listeners.Listener - - allPages, err := listeners.List(client, listeners.ListOpts{LoadbalancerID: lbID}).AllPages(ctx) - if mc.ObserveRequest(err) != nil { - return nil, err - } - lbListeners, err = listeners.ExtractListeners(allPages) - if err != nil { - return nil, err - } - - return lbListeners, nil + pager := listeners.List(client, listeners.ListOpts{LoadbalancerID: lbID}) + return list(ctx, "loadbalancer_listener", "list", pager, listeners.ExtractListeners) } -// CreatePool creates a new pool. -func CreatePool(ctx context.Context, client *gophercloud.ServiceClient, opts pools.CreateOptsBuilder, lbID string) (*pools.Pool, error) { - mc := metrics.NewMetricContext("loadbalancer_pool", "create") - pool, err := pools.Create(ctx, client, opts).Extract() - if mc.ObserveRequest(err) != nil { - return nil, err - } - - if _, err = WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return nil, fmt.Errorf("failed to wait for load balancer ACTIVE after creating pool: %v", err) - } +// ============================================================================== +// Pool Operations +// ============================================================================== - return pool, nil +// CreatePool creates a pool and waits for the load balancer to become active. +func CreatePool(ctx context.Context, client *gophercloud.ServiceClient, opts pools.CreateOptsBuilder, lbID string) (*pools.Pool, error) { + return executeExtractAndWaitActive(ctx, client, lbID, "loadbalancer_pool", "create", + func() (*pools.Pool, error) { + return pools.Create(ctx, client, opts).Extract() + }) } -// GetPoolByName gets a pool by its name, raise error if not found or get multiple ones. +// GetPoolByName retrieves a pool by name within a specific load balancer. +// Returns ErrNotFound if no pool is found, or ErrMultipleResults if multiple pools match. func GetPoolByName(ctx context.Context, client *gophercloud.ServiceClient, name string, lbID string) (*pools.Pool, error) { - var listenerPools []pools.Pool - - opts := pools.ListOpts{ - Name: name, - LoadbalancerID: lbID, - } - mc := metrics.NewMetricContext("loadbalancer_pool", "list") - err := pools.List(client, opts).EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { - v, err := pools.ExtractPools(page) - if err != nil { - return false, err - } - listenerPools = append(listenerPools, v...) - if len(listenerPools) > 1 { - return false, cpoerrors.ErrMultipleResults - } - return true, nil - }) - if mc.ObserveRequest(err) != nil { - if cpoerrors.IsNotFound(err) { - return nil, cpoerrors.ErrNotFound - } - return nil, err - } - - if len(listenerPools) == 0 { - return nil, cpoerrors.ErrNotFound - } else if len(listenerPools) > 1 { - return nil, cpoerrors.ErrMultipleResults - } - - return &listenerPools[0], nil + pager := pools.List(client, pools.ListOpts{Name: name, LoadbalancerID: lbID}) + return listWithUniqueResult(ctx, "loadbalancer_pool", "list", pager, pools.ExtractPools) } -// GetPoolByListener finds pool for a listener. -// A listener always has exactly one pool. +// GetPoolByListener retrieves the pool associated with a specific listener. +// It first queries the listener to get its default pool ID, then fetches the pool directly. +// Falls back to using the listener's Pools field if no default pool is set. +// Returns ErrNotFound if no pool is found, or ErrMultipleResults if multiple pools match. func GetPoolByListener(ctx context.Context, client *gophercloud.ServiceClient, lbID, listenerID string) (*pools.Pool, error) { - listenerPools := make([]pools.Pool, 0, 1) - mc := metrics.NewMetricContext("loadbalancer_pool", "list") - err := pools.List(client, pools.ListOpts{LoadbalancerID: lbID}).EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { - poolsList, err := pools.ExtractPools(page) - if err != nil { - return false, err - } - for _, p := range poolsList { - for _, l := range p.Listeners { - if l.ID == listenerID { - listenerPools = append(listenerPools, p) - } - } - } - if len(listenerPools) > 1 { - return false, cpoerrors.ErrMultipleResults - } - return true, nil - }) - if mc.ObserveRequest(err) != nil { - if cpoerrors.IsNotFound(err) { - return nil, cpoerrors.ErrNotFound - } + // Get the listener by ID to retrieve its default pool ID + listener, err := listWithUniqueResult(ctx, "loadbalancer_listener", "list", + listeners.List(client, listeners.ListOpts{ + LoadbalancerID: lbID, + ID: listenerID, + }), + listeners.ExtractListeners) + if err != nil { return nil, err } - if len(listenerPools) == 0 { - return nil, cpoerrors.ErrNotFound + // If listener has a default pool, get it directly + if listener.DefaultPoolID != "" { + return getSingleResource(ctx, "loadbalancer_pool", "get", + func() (*pools.Pool, error) { + return pools.Get(ctx, client, listener.DefaultPoolID).Extract() + }) } - return &listenerPools[0], nil -} - -// GetPools retrieves the pools belong to the loadbalancer. -func GetPools(ctx context.Context, client *gophercloud.ServiceClient, lbID string) ([]pools.Pool, error) { - var lbPools []pools.Pool - - opts := pools.ListOpts{ - LoadbalancerID: lbID, - } - allPages, err := pools.List(client, opts).AllPages(ctx) - if err != nil { - return nil, err + // Fallback: use listener's Pools field if no default pool is set + if len(listener.Pools) == 0 { + return nil, cpoerrors.ErrNotFound } - - lbPools, err = pools.ExtractPools(allPages) - if err != nil { - return nil, err + if len(listener.Pools) > 1 { + return nil, cpoerrors.ErrMultipleResults } - return lbPools, nil + // Get the pool by ID from the listener's Pools field + return getSingleResource(ctx, "loadbalancer_pool", "get", + func() (*pools.Pool, error) { + return pools.Get(ctx, client, listener.Pools[0].ID).Extract() + }) } -// GetMembersbyPool get all the members in the pool. -func GetMembersbyPool(ctx context.Context, client *gophercloud.ServiceClient, poolID string) ([]pools.Member, error) { - var members []pools.Member - - mc := metrics.NewMetricContext("loadbalancer_member", "list") - err := pools.ListMembers(client, poolID, pools.ListMembersOpts{}).EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { - membersList, err := pools.ExtractMembers(page) - if err != nil { - return false, err - } - members = append(members, membersList...) - - return true, nil - }) - if mc.ObserveRequest(err) != nil { - return nil, err - } - - return members, nil +// GetPools retrieves all pools for a specific load balancer. +// Returns an empty slice if no pools are found. +func GetPools(ctx context.Context, client *gophercloud.ServiceClient, lbID string) ([]pools.Pool, error) { + pager := pools.List(client, pools.ListOpts{LoadbalancerID: lbID}) + return list(ctx, "loadbalancer_pool", "list", pager, pools.ExtractPools) } -// UpdatePool updates a pool and wait for the lb active +// UpdatePool updates a pool and waits for the load balancer to become active. func UpdatePool(ctx context.Context, client *gophercloud.ServiceClient, lbID string, poolID string, opts pools.UpdateOpts) error { - mc := metrics.NewMetricContext("loadbalancer_pool", "update") - _, err := pools.Update(ctx, client, poolID, opts).Extract() - if mc.ObserveRequest(err) != nil { - return err - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after updating pool: %v", lbID, err) - } - - return nil + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_pool", "update", + func() error { + _, err := pools.Update(ctx, client, poolID, opts).Extract() + return err + }) } -// DeletePool deletes a pool. +// DeletePool deletes a pool and waits for the load balancer to become active. func DeletePool(ctx context.Context, client *gophercloud.ServiceClient, poolID string, lbID string) error { - mc := metrics.NewMetricContext("loadbalancer_pool", "delete") - if err := pools.Delete(ctx, client, poolID).ExtractErr(); mc.ObserveRequest(err) != nil { - if cpoerrors.IsNotFound(err) { - klog.V(2).Infof("Pool %s for load balancer %s was already deleted: %v", poolID, lbID, err) - } else { - return fmt.Errorf("error deleting pool %s for load balancer %s: %v", poolID, lbID, err) - } - } - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after deleting pool: %v", lbID, err) - } - - return nil + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_pool", "delete", + func() error { + return pools.Delete(ctx, client, poolID).ExtractErr() + }) } -// BatchUpdatePoolMembers updates pool members in batch. -func BatchUpdatePoolMembers(ctx context.Context, client *gophercloud.ServiceClient, lbID string, poolID string, opts []pools.BatchUpdateMemberOpts) error { - mc := metrics.NewMetricContext("loadbalancer_members", "update") - err := pools.BatchUpdateMembers(ctx, client, poolID, opts).ExtractErr() - if mc.ObserveRequest(err) != nil { - return err - } +// ============================================================================== +// Pool Member Operations +// ============================================================================== - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after updating pool members for %s: %v", lbID, poolID, err) - } +// GetPoolMembers retrieves all members in a specific pool. +// Returns an empty slice if no members are found. +func GetPoolMembers(ctx context.Context, client *gophercloud.ServiceClient, poolID string) ([]pools.Member, error) { + pager := pools.ListMembers(client, poolID, pools.ListMembersOpts{}) + return list(ctx, "loadbalancer_member", "list", pager, pools.ExtractMembers) +} - return nil +// BatchUpdatePoolMembers updates pool members in batch and waits for the load balancer to become active. +func BatchUpdatePoolMembers(ctx context.Context, client *gophercloud.ServiceClient, lbID string, poolID string, opts []pools.BatchUpdateMemberOpts) error { + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_members", "update", + func() error { + return pools.BatchUpdateMembers(ctx, client, poolID, opts).ExtractErr() + }) } -// GetL7policies retrieves all l7 policies for the given listener. -func GetL7policies(ctx context.Context, client *gophercloud.ServiceClient, listenerID string) ([]l7policies.L7Policy, error) { - var policies []l7policies.L7Policy - opts := l7policies.ListOpts{ - ListenerID: listenerID, - } - err := l7policies.List(client, opts).EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { - v, err := l7policies.ExtractL7Policies(page) - if err != nil { - return false, err - } - policies = append(policies, v...) - return true, nil - }) - if err != nil { - return nil, err - } +// ============================================================================== +// L7 Policy and Rule Operations +// ============================================================================== - return policies, nil +// GetL7policies retrieves all L7 policies for a specific listener. +// Returns an empty slice if no policies are found. +func GetL7policies(ctx context.Context, client *gophercloud.ServiceClient, listenerID string) ([]l7policies.L7Policy, error) { + pager := l7policies.List(client, l7policies.ListOpts{ListenerID: listenerID}) + return list(ctx, "loadbalancer_l7policy", "list", pager, l7policies.ExtractL7Policies) } -// CreateL7Policy creates a l7 policy. +// CreateL7Policy creates an L7 policy and waits for the load balancer to become active. func CreateL7Policy(ctx context.Context, client *gophercloud.ServiceClient, opts l7policies.CreateOpts, lbID string) (*l7policies.L7Policy, error) { - mc := metrics.NewMetricContext("loadbalancer_l7policy", "create") - policy, err := l7policies.Create(ctx, client, opts).Extract() - if mc.ObserveRequest(err) != nil { - return nil, err - } - - if _, err = WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return nil, fmt.Errorf("failed to wait for load balancer ACTIVE after creating l7policy: %v", err) - } - - return policy, nil + return executeExtractAndWaitActive(ctx, client, lbID, "loadbalancer_l7policy", "create", + func() (*l7policies.L7Policy, error) { + return l7policies.Create(ctx, client, opts).Extract() + }) } -// DeleteL7policy deletes a l7 policy. +// DeleteL7policy deletes an L7 policy and waits for the load balancer to become active. func DeleteL7policy(ctx context.Context, client *gophercloud.ServiceClient, policyID string, lbID string) error { - mc := metrics.NewMetricContext("loadbalancer_l7policy", "delete") - if err := l7policies.Delete(ctx, client, policyID).ExtractErr(); mc.ObserveRequest(err) != nil { - return err - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after deleting l7policy: %v", lbID, err) - } - - return nil + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_l7policy", "delete", + func() error { + return l7policies.Delete(ctx, client, policyID).ExtractErr() + }) } -// GetL7Rules gets all the rules for a l7 policy +// GetL7Rules retrieves all L7 rules for a specific L7 policy. +// Returns an empty slice if no rules are found. func GetL7Rules(ctx context.Context, client *gophercloud.ServiceClient, policyID string) ([]l7policies.Rule, error) { - listOpts := l7policies.ListRulesOpts{} - allPages, err := l7policies.ListRules(client, policyID, listOpts).AllPages(ctx) - if err != nil { - return nil, err - } - allRules, err := l7policies.ExtractRules(allPages) - if err != nil { - return nil, err - } - - return allRules, nil + pager := l7policies.ListRules(client, policyID, l7policies.ListRulesOpts{}) + return list(ctx, "loadbalancer_l7rule", "list", pager, l7policies.ExtractRules) } -// CreateL7Rule creates a l7 rule. +// CreateL7Rule creates an L7 rule and waits for the load balancer to become active. func CreateL7Rule(ctx context.Context, client *gophercloud.ServiceClient, policyID string, opts l7policies.CreateRuleOpts, lbID string) error { - mc := metrics.NewMetricContext("loadbalancer_l7rule", "create") - _, err := l7policies.CreateRule(ctx, client, policyID, opts).Extract() - if mc.ObserveRequest(err) != nil { - return err - } - - if _, err = WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer ACTIVE after creating l7policy rule: %v", err) - } - - return nil + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_l7rule", "create", + func() error { + _, err := l7policies.CreateRule(ctx, client, policyID, opts).Extract() + return err + }) } -// UpdateHealthMonitor updates a health monitor. -func UpdateHealthMonitor(ctx context.Context, client *gophercloud.ServiceClient, monitorID string, opts monitors.UpdateOpts, lbID string) error { - mc := metrics.NewMetricContext("loadbalancer_healthmonitor", "update") - _, err := monitors.Update(ctx, client, monitorID, opts).Extract() - if mc.ObserveRequest(err) != nil { - return fmt.Errorf("failed to update healthmonitor: %v", err) - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after updating healthmonitor: %v", lbID, err) - } +// ============================================================================== +// Health Monitor Operations +// ============================================================================== - return nil +// UpdateHealthMonitor updates a health monitor and waits for the load balancer to become active. +func UpdateHealthMonitor(ctx context.Context, client *gophercloud.ServiceClient, monitorID string, opts monitors.UpdateOpts, lbID string) error { + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_healthmonitor", "update", + func() error { + _, err := monitors.Update(ctx, client, monitorID, opts).Extract() + return err + }) } -// DeleteHealthMonitor deletes a health monitor. +// DeleteHealthMonitor deletes a health monitor and waits for the load balancer to become active. func DeleteHealthMonitor(ctx context.Context, client *gophercloud.ServiceClient, monitorID string, lbID string) error { - mc := metrics.NewMetricContext("loadbalancer_healthmonitor", "delete") - err := monitors.Delete(ctx, client, monitorID).ExtractErr() - if err != nil && !cpoerrors.IsNotFound(err) { - return mc.ObserveRequest(err) - } - _ = mc.ObserveRequest(nil) - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return fmt.Errorf("failed to wait for load balancer %s ACTIVE after deleting healthmonitor: %v", lbID, err) - } - - return nil + return executeAndWaitActive(ctx, client, lbID, "loadbalancer_healthmonitor", "delete", + func() error { + return monitors.Delete(ctx, client, monitorID).ExtractErr() + }) } -// CreateHealthMonitor creates a health monitor in a pool. +// CreateHealthMonitor creates a health monitor and waits for the load balancer to become active. func CreateHealthMonitor(ctx context.Context, client *gophercloud.ServiceClient, opts monitors.CreateOpts, lbID string) (*monitors.Monitor, error) { - mc := metrics.NewMetricContext("loadbalancer_healthmonitor", "create") - monitor, err := monitors.Create(ctx, client, opts).Extract() - if mc.ObserveRequest(err) != nil { - return nil, fmt.Errorf("failed to create healthmonitor: %v", err) - } - - if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { - return nil, fmt.Errorf("failed to wait for load balancer %s ACTIVE after creating healthmonitor: %v", lbID, err) - } - - return monitor, nil + return executeExtractAndWaitActive(ctx, client, lbID, "loadbalancer_healthmonitor", "create", + func() (*monitors.Monitor, error) { + return monitors.Create(ctx, client, opts).Extract() + }) } -// GetHealthMonitor gets details about loadbalancer health monitor. +// GetHealthMonitor retrieves a health monitor by ID. func GetHealthMonitor(ctx context.Context, client *gophercloud.ServiceClient, monitorID string) (*monitors.Monitor, error) { - mc := metrics.NewMetricContext("loadbalancer_healthmonitor", "get") - monitor, err := monitors.Get(ctx, client, monitorID).Extract() - if mc.ObserveRequest(err) != nil { - return nil, fmt.Errorf("failed to get healthmonitor: %v", err) - } - - return monitor, nil + return getSingleResource(ctx, "loadbalancer_healthmonitor", "get", + func() (*monitors.Monitor, error) { + return monitors.Get(ctx, client, monitorID).Extract() + }) } diff --git a/pkg/util/openstack/loadbalancer_serial.go b/pkg/util/openstack/loadbalancer_serial.go index a609ce77b0..12db0facd5 100644 --- a/pkg/util/openstack/loadbalancer_serial.go +++ b/pkg/util/openstack/loadbalancer_serial.go @@ -51,7 +51,7 @@ func getNodeAddressForLB(node *apiv1.Node) (string, error) { func SeriallyReconcilePoolMembers(ctx context.Context, client *gophercloud.ServiceClient, pool *pools.Pool, nodePort int, lbID string, nodes []*apiv1.Node) error { - members, err := GetMembersbyPool(ctx, client, pool.ID) + members, err := GetPoolMembers(ctx, client, pool.ID) if err != nil && !cpoerrors.IsNotFound(err) { return fmt.Errorf("error getting pool members %s: %v", pool.ID, err) }