diff --git a/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py b/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py index a57a43057a..f4ed4910a4 100644 --- a/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py +++ b/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py @@ -22,7 +22,9 @@ from collections import abc import copy +import functools import json +import time import logging import math import re @@ -273,7 +275,7 @@ def GetNodePoolFromNodeName( raise_on_failure=False, ) if code: - logging.warning( + logging.info( 'Got error when trying to get nodepool name for node %s: %s', err, node_name, @@ -285,7 +287,7 @@ def GetNodePoolFromNodeName( nodepool = self.default_nodepool else: if nodepool_name not in self.nodepools: - logging.warning( + logging.info( 'Nodepool %s not found in nodepools %s', nodepool_name, self.nodepools, @@ -370,8 +372,9 @@ def GetNodePoolNames(self) -> list[str]: ] stdout, stderr, retcode = vm_util.IssueCommand(cmd) if retcode: - logging.warning('Failed to get nodegroups: %s, error: %s', stdout, stderr) - return [] + raise errors.Resource.GetError( + f'Failed to get nodegroups: {stdout}, error: {stderr}' + ) nodegroups = json.loads(stdout) return [ng['Name'] for ng in nodegroups] @@ -549,6 +552,711 @@ def ResizeNodePool( ] vm_util.IssueCommand(cmd) + @functools.lru_cache(maxsize=None) + def _DiscoverSubnets(self) -> list[str]: + """Returns the EKS cluster's subnet IDs (cached after first call).""" + out, _, _ = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'describe-cluster', + '--name', + self.name, + '--region', + self.region, + ] + ) + info = json.loads(out) + return info['cluster']['resourcesVpcConfig']['subnetIds'] + + @functools.lru_cache(maxsize=None) + def _DiscoverSubnetsPerAZ(self) -> dict[str, str]: + """Returns a mapping of {AZ: subnet_id} for the cluster's subnets. + + Used by CreateNodePoolAsync to distribute nodegroups round-robin across + AZs, avoiding per-AZ EC2 capacity limits when creating many pools. + Only returns AZs that are in control_plane_zones (if specified). + Cached after first call (via functools.lru_cache). + """ + subnet_ids = self._DiscoverSubnets() + if not subnet_ids: + return {} + + # Describe subnets to get their AZ mapping + out, _, rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'ec2', + 'describe-subnets', + '--region', + self.region, + '--subnet-ids', + *subnet_ids, + '--query', + ( + 'Subnets[*].{SubnetId:SubnetId,AZ:AvailabilityZone,' + 'Public:MapPublicIpOnLaunch}' + ), + '--output', + 'json', + ], + raise_on_failure=False, + ) + if rc: + logging.info( + '[EKS] Could not describe subnets for AZ mapping —' + ' falling back to all subnets' + ) + return {} + + subnets = json.loads(out) + + # Do NOT filter by control_plane_zones — PKB truncates it to 2 AZs. + # Accept all subnets the VPC has across all AZs. + # Build AZ map — always prefer public subnets (MapPublicIpOnLaunch=True) + # which have an internet gateway route. Private subnets lack IGW routes + # and nodes launched there cannot reach the EKS API server to join. + az_map: dict[str, str] = {} + az_map_private: dict[str, str] = {} + for s in subnets: + az = s['AZ'] + if s.get('Public'): + az_map[az] = s['SubnetId'] + logging.info('[EKS] AZ %s → public subnet %s', az, s['SubnetId']) + elif az not in az_map: + az_map_private[az] = s['SubnetId'] + for az, sid in az_map_private.items(): + if az not in az_map: + logging.info( + '[EKS] AZ %s has no public subnet — using private %s', az, sid + ) + az_map[az] = sid + + logging.info( + '[EKS] Subnet-per-AZ mapping: %s (from %d total subnets)', + az_map, + len(subnet_ids), + ) + return az_map + + @functools.lru_cache(maxsize=None) + def _ResolveReleaseVersion(self, minor: str) -> str: + """Returns the EKS-optimized AMI release version (e.g. '1.33.10-20260124'). + + Used to populate `releaseVersion` in the create-nodegroup payload so the + benchmark can pin specific K8s minors. lru_cache is thread-safe by + construction, so at scale, of N workers asking for the same minor, only + the first does the SSM lookup -- the rest get the cached result. + """ + cmd = util.AWS_PREFIX + [ + 'ssm', + 'get-parameter', + '--name', + ( + f'/aws/service/eks/optimized-ami/{minor}/amazon-linux-2023/' + 'x86_64/standard/recommended/release_version' + ), + '--region', + self.region, + '--query', + 'Parameter.Value', + '--output', + 'text', + ] + out, err, rc = vm_util.IssueCommand(cmd, raise_on_failure=False) + if rc: + raise errors.Resource.CreationError( + f'Failed to resolve EKS release version for minor {minor!r}: {err}' + ) + return out.strip() + + @functools.lru_cache(maxsize=None) + def _DiscoverNodeRoleArn(self) -> str: + """Returns a node IAM role ARN by inspecting an existing nodegroup.""" + out, _, _ = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'list-nodegroups', + '--cluster-name', + self.name, + '--region', + self.region, + ] + ) + for ng_name in json.loads(out).get('nodegroups', []): + ng_out, _, _ = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'describe-nodegroup', + '--cluster-name', + self.name, + '--nodegroup-name', + ng_name, + '--region', + self.region, + ] + ) + role = json.loads(ng_out)['nodegroup'].get('nodeRole') + if role: + return role + raise errors.Resource.CreationError( + 'No existing nodegroup found to discover node role for ' + f'cluster {self.name}.' + ) + + def CreateNodePoolAsync( + self, + nodepool_config: container.BaseNodePoolConfig, + node_version: str | None = None, + ) -> str: + """Initiates EKS nodegroup create; returns ng_active handle.""" + # Pass the full request via --cli-input-json so that we can specify both + # `version` (e.g. "1.33") and `releaseVersion` (e.g. "1.33.11-...") in + # the same call. Two reasons this matters: + # 1. AWS CLI v1 has a bug where the top-level --version flag swallows + # the subcommand --version, printing the CLI banner and exiting. + # cli-input-json sidesteps CLI argument parsing entirely. + # 2. EKS rejects a releaseVersion that doesn't match the request's + # `version`; if `version` is omitted EKS defaults it to the + # cluster's version, which (for the N-1 -> N benchmark path) + # produces a "release version X is not valid for kubernetes + # version Y" error. + + # ── AZ distribution ──────────────────────────────────────────────────── + # When multiple zones are specified (e.g. us-east-1a,1b,1c), distribute + # nodegroups round-robin across AZs to avoid per-AZ EC2 capacity limits. + # Without this, EKS places all nodegroups in a single AZ causing timeouts. + # Pool name format: pkbma000, pkbma001, ... — extract index from suffix. + az_subnets = self._DiscoverSubnetsPerAZ() + if az_subnets and len(az_subnets) > 1: + # Extract numeric suffix from pool name to determine AZ assignment + name = nodepool_config.name + suffix = ''.join(c for c in name if c.isdigit()) + # pkbmb (Scenario B) has no suffix — assign to us-east-1b (idx=1) + # to avoid competing with us-east-1a which has the default nodegroup. + idx = int(suffix) if suffix else 1 + zones = sorted(az_subnets.keys()) + assigned_az = zones[idx % len(zones)] + subnets = [az_subnets[assigned_az]] + logging.info( + '[EKS] CreateNodePool %s -> AZ=%s subnet=%s (round-robin idx=%d)', + name, + assigned_az, + subnets[0], + idx, + ) + else: + subnets = self._DiscoverSubnets() + logging.info( + '[EKS] CreateNodePool %s -> using all subnets (single AZ)', + nodepool_config.name, + ) + + payload: dict[str, Any] = { + 'clusterName': self.name, + 'nodegroupName': nodepool_config.name, + 'scalingConfig': { + 'minSize': nodepool_config.num_nodes, + 'maxSize': nodepool_config.num_nodes, + 'desiredSize': nodepool_config.num_nodes, + }, + 'subnets': subnets, + 'instanceTypes': [nodepool_config.machine_type], + 'amiType': 'AL2023_x86_64_STANDARD', + 'nodeRole': self._DiscoverNodeRoleArn(), + 'labels': {'pkb_nodepool': nodepool_config.name}, + 'tags': util.MakeDefaultTags(), + # Target open capacity reservations first before falling back to + # regular on-demand. Ensures EC2 capacity reservations created + # before the benchmark are actually used by EKS nodegroups. + 'capacityReservationSpecification': { + 'capacityReservationPreference': 'open', + }, + } + az = ( + assigned_az if az_subnets and len(az_subnets) > 1 else f'{self.region}a' + ) + # Only look up launch templates and capacity reservations when + # --eks_reserve_capacity_per_az=true. Other benchmarks skip this entirely. + if FLAGS.eks_reserve_capacity_per_az: + lt_name = f'pkb-eks-lt-{az}' + lt_out, _, lt_rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'ec2', + 'describe-launch-templates', + '--region', + self.region, + '--filters', + f'Name=launch-template-name,Values={lt_name}', + '--query', + 'LaunchTemplates[0].LaunchTemplateId', + '--output', + 'text', + ], + raise_on_failure=False, + ) + res_id = getattr(self, '_capacity_reservation_ids', {}).get(az, '') + if ( + res_id + and lt_rc == 0 + and lt_out.strip() + and lt_out.strip() not in ('None', 'null', '') + ): + payload['launchTemplate'] = { + 'id': lt_out.strip(), + 'version': '$Latest', + } + # When launch template specifies an ImageId, EKS rejects these fields: + # - releaseVersion: conflicts with AMI + # - instanceTypes: must come from launch template only + # - amiType: conflicts with AMI + payload.pop('releaseVersion', None) + payload.pop('instanceTypes', None) + payload.pop('amiType', None) + logging.info( + # pylint: disable-next=implicit-str-concat + '[EKS] Nodegroup %s using launch template %s targeting reservation' + ' %s in AZ %s', + nodepool_config.name, + lt_name, + res_id, + az, + ) + else: + logging.info( + '[EKS] No reservation/template for AZ %s — using on-demand', az + ) + + if node_version: + # EKS rejects both 'version' and 'releaseVersion' when a launch template + # with ImageId is specified — skip both when launchTemplate is in use. + if 'launchTemplate' not in payload: + payload['version'] = node_version + payload['releaseVersion'] = self._ResolveReleaseVersion(node_version) + filename = self._WriteJsonToFile(payload) + cmd = util.AWS_PREFIX + [ + 'eks', + 'create-nodegroup', + '--region', + self.region, + '--cli-input-json', + f'file://{filename}', + ] + for attempt in range(5): + _, stderr, retcode = vm_util.IssueCommand( + cmd, timeout=300, raise_on_failure=False + ) + if retcode == 0: + break + throttled = ( + 'Request limit exceeded' in stderr or 'ThrottlingException' in stderr + ) + if throttled and attempt < 4: + logging.info( + '[EKS] CreateNodegroup throttled — retry %d/5', attempt + 1 + ) + time.sleep(10 * (2**attempt)) + continue + raise errors.Resource.CreationError(stderr) + return f'ng_active:{nodepool_config.name}' + + def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: + """Initiates EKS nodegroup upgrade; returns ng_active handle.""" + # For Custom AMI nodegroups (using launch template with ImageId), + # EKS requires the launch template to be passed on upgrade. + # Determine the AZ for this nodegroup to find the correct launch template. + suffix = ''.join(c for c in name if c.isdigit()) + # pkbmb (Scenario B) has no suffix — use idx=1 (us-east-1b) to avoid + # competing with us-east-1a which already has the default nodegroup + idx = int(suffix) if suffix else 1 + # Only look up launch template when capacity reservations are enabled. + # For other benchmarks, always use standard kubernetes-version upgrade. + lt_id = '' + lt_name = '' + az = f'{self.region}a' + if FLAGS.eks_reserve_capacity_per_az: + az_subnets = self._DiscoverSubnetsPerAZ() + if az_subnets and len(az_subnets) > 1: + zones = sorted(az_subnets.keys()) + az = zones[idx % len(zones)] + if FLAGS.eks_reserve_capacity_per_az: + lt_name = f'pkb-eks-lt-{az}' + lt_out, _, lt_rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'ec2', + 'describe-launch-templates', + '--region', + self.region, + '--filters', + f'Name=launch-template-name,Values={lt_name}', + '--query', + 'LaunchTemplates[0].LaunchTemplateId', + '--output', + 'text', + ], + raise_on_failure=False, + ) + lt_id = ( + lt_out.strip() + if lt_rc == 0 and lt_out.strip() not in ('', 'None', 'null') + else '' + ) + + # Custom AMI nodegroups cannot use --kubernetes-version; + # use launch template only. + if lt_id: + cmd = util.AWS_PREFIX + [ + 'eks', + 'update-nodegroup-version', + '--cluster-name', + self.name, + '--nodegroup-name', + name, + '--region', + self.region, + '--launch-template', + f'id={lt_id},version=$Latest', + ] + logging.info( + '[EKS] Upgrading %s with launch template %s in AZ %s', + name, + lt_name, + az, + ) + else: + cmd = util.AWS_PREFIX + [ + 'eks', + 'update-nodegroup-version', + '--cluster-name', + self.name, + '--nodegroup-name', + name, + '--region', + self.region, + '--kubernetes-version', + target_version, + ] + _, stderr, retcode = vm_util.IssueCommand( + cmd, timeout=300, raise_on_failure=False + ) + if retcode: + raise errors.Resource.CreationError(stderr) + return f'ng_active:{name}' + + def DeleteNodePoolAsync(self, name: str) -> str: + """Initiates EKS nodegroup delete; returns ng_gone handle.""" + cmd = util.AWS_PREFIX + [ + 'eks', + 'delete-nodegroup', + '--cluster-name', + self.name, + '--nodegroup-name', + name, + '--region', + self.region, + ] + _, stderr, retcode = vm_util.IssueCommand( + cmd, timeout=300, raise_on_failure=False + ) + if retcode: + raise errors.Resource.CreationError(stderr) + return f'ng_gone:{name}' + + def UpdateClusterAsync(self) -> str: + """Fires a CloudWatch logging toggle; returns handle 'cluster_update:'. + + Returns a handle carrying the specific update id so WaitForOperation + can poll *that* update's status (Successful / Failed) rather than the + cluster's top-level status (which stays ACTIVE during config updates, + making the wait return instantly and silently mis-reporting latency). + """ + log_types = [ + 'api', + 'audit', + 'authenticator', + 'controllerManager', + 'scheduler', + ] + describe = util.AWS_PREFIX + [ + 'eks', + 'describe-cluster', + '--name', + self.name, + '--region', + self.region, + ] + out, _, _ = vm_util.IssueCommand(describe) + current = ( + json.loads(out)['cluster'].get('logging', {}).get('clusterLogging', []) + ) + any_enabled = any(e.get('enabled', False) for e in current) + payload = json.dumps( + {'clusterLogging': [{'types': log_types, 'enabled': not any_enabled}]} + ) + upd = util.AWS_PREFIX + [ + 'eks', + 'update-cluster-config', + '--name', + self.name, + '--region', + self.region, + '--logging', + payload, + ] + # Wait for cluster ACTIVE before firing update — at 99-pool scale + # Scenario A leaves the cluster UPDATING causing ResourceInUseException. + logging.info('[EKS] Waiting for cluster ACTIVE before ClusterUpdate...') + for _ in range(60): + status_out, _, status_rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'describe-cluster', + '--name', + self.name, + '--region', + self.region, + '--query', + 'cluster.status', + '--output', + 'text', + ], + raise_on_failure=False, + ) + if status_rc == 0 and status_out.strip() == 'ACTIVE': + logging.info('[EKS] Cluster is ACTIVE — proceeding with ClusterUpdate') + break + logging.info( + '[EKS] Cluster status=%s — waiting 5s...', status_out.strip() + ) + time.sleep(5) + stdout = '' + for attempt in range(10): + stdout, stderr, retcode = vm_util.IssueCommand( + upd, timeout=300, raise_on_failure=False + ) + if retcode == 0: + break + if 'ResourceInUseException' in stderr and attempt < 9: + logging.info( + '[EKS] UpdateClusterConfig ResourceInUseException — retry %d/10', + attempt + 1, + ) + time.sleep(30) + continue + raise errors.Resource.CreationError(stderr) + update_id = json.loads(stdout)['update']['id'] + return f'cluster_update:{update_id}' + + def ResolveNodePoolVersions(self) -> tuple[str, str]: + """Returns (initial, target) EKS nodegroup versions. + + Uses cluster_version (already set from FLAGS/describe-cluster) rather than + querying kubectl, which is faster and avoids a kubectl round-trip. + initial = N-1 (adjacent minor below cluster version) + target = N (cluster version = latest) + """ + cluster_ver = self.cluster_version or self.k8s_version + # Strip any patch suffix e.g. '1.34.7' -> '1.34' + parts = cluster_ver.lstrip('v').split('.') + major, minor = int(parts[0]), int(parts[1]) + target = f'{major}.{minor}' + initial = f'{major}.{minor - 1}' + logging.info( + '[EKS] ResolveNodePoolVersions: cluster=%s initial=%s target=%s', + cluster_ver, + initial, + target, + ) + return initial, target + + def WaitForOperation(self, op_handle: str) -> None: + """Polls EKS resources until the expected terminal state is observed.""" + kind, _, name = op_handle.partition(':') + + @vm_util.Retry( + poll_interval=5, + fuzz=0, + timeout=3600, + retryable_exceptions=(errors.Resource.RetryableCreationError,), + ) + def _wait_ng_active(): + out, err, rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'describe-nodegroup', + '--cluster-name', + self.name, + '--nodegroup-name', + name, + '--region', + self.region, + ], + raise_on_failure=False, + ) + if rc: + raise errors.Resource.RetryableCreationError(err) + status = json.loads(out)['nodegroup']['status'] + if status in ('ACTIVE',): + return + if status in ('CREATE_FAILED', 'DELETE_FAILED', 'DEGRADED'): + raise errors.Resource.CreationError( + f'nodegroup {name} ended in {status}' + ) + raise errors.Resource.RetryableCreationError( + f'nodegroup {name} status={status}' + ) + + @vm_util.Retry( + poll_interval=5, + fuzz=0, + timeout=3600, + retryable_exceptions=(errors.Resource.RetryableDeletionError,), + ) + def _wait_ng_gone(): + _, err, rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'describe-nodegroup', + '--cluster-name', + self.name, + '--nodegroup-name', + name, + '--region', + self.region, + ], + raise_on_failure=False, + ) + if rc and 'ResourceNotFoundException' in (err or ''): + return + if rc: + raise errors.Resource.RetryableDeletionError(err) + raise errors.Resource.RetryableDeletionError( + f'nodegroup {name} still present' + ) + + @vm_util.Retry( + poll_interval=5, + fuzz=0, + timeout=3600, + retryable_exceptions=(errors.Resource.RetryableCreationError,), + ) + def _wait_cluster_update(): + out, err, rc = vm_util.IssueCommand( + util.AWS_PREFIX + + [ + 'eks', + 'describe-update', + '--name', + self.name, + '--update-id', + name, + '--region', + self.region, + '--query', + 'update.status', + '--output', + 'text', + ], + raise_on_failure=False, + ) + if rc: + raise errors.Resource.RetryableCreationError(err) + status = out.strip() + if status == 'Successful': + return + if status in ('Failed', 'Cancelled'): + raise errors.Resource.CreationError( + f'cluster update {name} ended in {status}' + ) + raise errors.Resource.RetryableCreationError( + f'cluster update {name} status={status}' + ) + + if kind == 'ng_active': + _wait_ng_active() + elif kind == 'ng_gone': + _wait_ng_gone() + elif kind == 'cluster_update': + _wait_cluster_update() + else: + raise ValueError(f'Unknown EKS op handle: {op_handle!r}') + + def UpdateCluster(self) -> None: + """Real cluster-level update via a CloudWatch logging toggle. + + Reads the current cluster logging state, flips it (enable->disable or + vice versa), and waits for the cluster to return to ACTIVE. Enabling all + five log types is a 5-10 minute control-plane op, giving a meaningful + overlap window for Scenario B. + """ + log_types = [ + 'api', + 'audit', + 'authenticator', + 'controllerManager', + 'scheduler', + ] + describe = util.AWS_PREFIX + [ + 'eks', + 'describe-cluster', + '--name', + self.name, + '--region', + self.region, + ] + stdout, _, _ = vm_util.IssueCommand(describe) + info = json.loads(stdout) + current = info['cluster'].get('logging', {}).get('clusterLogging', []) + any_enabled = any(entry.get('enabled', False) for entry in current) + new_enabled = not any_enabled + logging_payload = json.dumps( + {'clusterLogging': [{'types': log_types, 'enabled': new_enabled}]} + ) + update = util.AWS_PREFIX + [ + 'eks', + 'update-cluster-config', + '--name', + self.name, + '--region', + self.region, + '--logging', + logging_payload, + ] + vm_util.IssueCommand(update, timeout=900) + + @vm_util.Retry( + poll_interval=5, + fuzz=0, + timeout=900, + retryable_exceptions=(errors.Resource.RetryableCreationError,), + ) + def _wait_active(): + query = util.AWS_PREFIX + [ + 'eks', + 'describe-cluster', + '--name', + self.name, + '--region', + self.region, + '--query', + 'cluster.status', + '--output', + 'text', + ] + out, _, _ = vm_util.IssueCommand(query) + status = out.strip() + if status != 'ACTIVE': + raise errors.Resource.RetryableCreationError(f'cluster status={status}') + + _wait_active() + class EksAutoCluster(BaseEksCluster): """Class representing an Elastic Kubernetes Service cluster with auto mode. diff --git a/tests/providers/aws/elastic_kubernetes_service_test.py b/tests/providers/aws/elastic_kubernetes_service_test.py index 42450bd63c..f5cc9c5069 100644 --- a/tests/providers/aws/elastic_kubernetes_service_test.py +++ b/tests/providers/aws/elastic_kubernetes_service_test.py @@ -327,6 +327,38 @@ def testEksClusterGetMachineTypeFromNodeName(self): self.assertEqual(machine_type, 'm6i.xlarge') + def testDiscoverSubnetsPerAZPrefersPublicSubnets(self): + """Public subnets are preferred; private used only when no public exists.""" + cluster = elastic_kubernetes_service.EksCluster(EKS_SPEC) + describe_cluster_out = json.dumps({ + 'cluster': { + 'resourcesVpcConfig': { + 'subnetIds': ['subnet-pub-a', 'subnet-priv-a', 'subnet-priv-b'] + } + } + }) + # NOTE: keys here are AZ/Public/SubnetId (not AvailabilityZone/ + # MapPublicIpOnLaunch) because the code's --query JMESPath renames + # them server-side before this mocked stdout is "returned". + describe_subnets_out = json.dumps([ + {'SubnetId': 'subnet-pub-a', 'AZ': 'us-east-1a', 'Public': True}, + {'SubnetId': 'subnet-priv-a', 'AZ': 'us-east-1a', 'Public': False}, + {'SubnetId': 'subnet-priv-b', 'AZ': 'us-east-1b', 'Public': False}, + ]) + self.MockIssueCommand({ + 'describe-cluster': [(describe_cluster_out, '', 0)], + 'describe-subnets': [(describe_subnets_out, '', 0)], + }) + result = cluster._DiscoverSubnetsPerAZ() + self.assertEqual( + result, + { + 'us-east-1a': 'subnet-pub-a', # public preferred in same AZ + 'us-east-1b': 'subnet-priv-b', # falls back to private + }, + ) + + class EksAutoClusterTest(BaseEksTest): def testInitEksClusterWorks(self):