From 165486f8e394e8e9c508e898264bc4509c7d6b34 Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Fri, 5 Jun 2026 14:11:37 +0000 Subject: [PATCH 1/9] google_kubernetes_engine: add management plane operations - _IssueAsync: issues gcloud --async commands, returns op name - CreateNodePoolAsync: creates nodepool, returns op handle - DeleteNodePoolAsync: deletes nodepool, returns op handle - UpgradeNodePoolAsync: upgrades nodepool, returns op handle - UpdateClusterAsync: toggles label for non-destructive update - WaitForOperation: polls until DONE/ABORTING - ResolveNodePoolVersions: auto-detects initial/target versions - _GetLatestOperationName: fallback op lookup by type+target - GetNodePoolNames: lists current node pools Tested: - Existing GKE test suite passing - pyink + lint-diffs clean --- .../providers/gcp/google_kubernetes_engine.py | 284 ++++++++++++++++++ 1 file changed, 284 insertions(+) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index f943a53ff1..4e74ad62c7 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -16,6 +16,7 @@ import json import logging import math +import time import os import re import typing @@ -23,6 +24,7 @@ from absl import flags from perfkitbenchmarker import errors +from perfkitbenchmarker import vm_util from perfkitbenchmarker import provider_info from perfkitbenchmarker import virtual_machine_spec from perfkitbenchmarker.configs import container_spec as container_spec_lib @@ -630,6 +632,288 @@ def ResizeNodePool( cmd.flags['node-pool'] = node_pool cmd.Issue() + def _IssueAsync(self, cmd: util.GcloudCommand) -> str: + """Issues a gcloud command with --async, returns the operation name.""" + cmd.args.append('--async') + cmd.flags['format'] = 'value(name)' + stdout, stderr, retcode = cmd.Issue(timeout=600, raise_on_failure=False) + if retcode: + raise errors.Resource.CreationError(stderr) + op_name = stdout.strip().splitlines()[-1].strip() if stdout else '' + if not op_name: + raise errors.Resource.CreationError( + f'GKE async command returned no operation name; stderr={stderr}' + ) + return op_name + + def _GetLatestOperationName( + self, + operation_type: str = 'UPGRADE_NODES', + target_name: str = '', + max_attempts: int = 5, + retry_delay: int = 3, + op_start_time: float = 0.0, + ) -> str: + """Returns the name of the most recent matching operation for this cluster. + + The async gcloud command may return before the GKE control plane has + transitioned the operation from PENDING to RUNNING. For fast operations + (e.g. label updates) the operation may already be DONE by the time this + method is called. Passing op_start_time handles both cases. + + Args: + operation_type: GKE operationType to filter on, e.g. 'UPGRADE_NODES' + for node pool upgrades or 'UPDATE_CLUSTER' for cluster-level + updates via 'gcloud container clusters update'. + target_name: Substring to match against targetLink (e.g. nodepool name + for UPGRADE_NODES, or cluster name for UPDATE_CLUSTER). If empty, + falls back to self.name (the cluster name). + max_attempts: Number of query attempts before giving up. + retry_delay: Seconds to wait between attempts. + op_start_time: Unix timestamp recorded just before the async gcloud + command was issued. When provided, the status filter is broadened + to include DONE (so fast-completing operations are found) and a + startTime >= guard is added to avoid matching old operations. + + Returns: + Operation name string, or empty string if none found. + """ + link_target = target_name or self.name + if op_start_time: + # Fast operations (e.g. --update-labels) may be DONE before we query. + # Broaden the status filter and add a startTime guard (with a 30-second + # buffer for clock skew) to avoid picking up older completed operations. + from_time = time.strftime( + '%Y-%m-%dT%H:%M:%SZ', time.gmtime(op_start_time - 30) + ) + status_filter = '(status=RUNNING OR status=PENDING OR status=DONE)' + time_filter = f' AND startTime>="{from_time}"' + else: + # Slow operations (e.g. node pool upgrades): only look for active ops. + status_filter = '(status=RUNNING OR status=PENDING)' + time_filter = '' + + filter_str = ( + f'operationType={operation_type} AND ' + f'{status_filter} AND ' + f'targetLink ~ {link_target}' + f'{time_filter}' + ) + for attempt in range(1, max_attempts + 1): + list_cmd = self._GcloudCommand('container', 'operations', 'list') + list_cmd.flags['filter'] = filter_str + list_cmd.flags['sort-by'] = '~startTime' + list_cmd.flags['limit'] = 1 + list_cmd.flags['format'] = 'value(name)' + stdout, stderr, _ = list_cmd.Issue(raise_on_failure=False) + op_name = stdout.strip() + if op_name: + logging.info( + 'GetLatestOp: found %s type=%s target=%s attempt=%d/%d', + op_name, + operation_type, + link_target, + attempt, + max_attempts, + ) + return op_name + logging.info( + 'GetLatestOp: no %s op for %s attempt=%d/%d retry in %ds.', + operation_type, + link_target, + attempt, + max_attempts, + retry_delay, + ) + time.sleep(retry_delay) + raise errors.Resource.GetError( + f'_GetLatestOperationName: no {operation_type} op found ' + f'for target={link_target} after {max_attempts} attempts. ' + f'stderr={stderr}' + ) + + # cmd.flags['zone'] = self.zone + # cmd.flags['filter'] = 'operationType=UPGRADE_NODES AND status=RUNNING' + # cmd.flags['sort-by'] = '~startTime' + # cmd.flags['limit'] = 1 + # cmd.flags['format'] = 'value(name)' + + def CreateNodePoolAsync( + self, + nodepool_config: container.BaseNodePoolConfig, + node_version: str | None = None, + ) -> str: + """Initiates node pool create; returns op handle. Does NOT wait.""" + cmd = self._GcloudCommand( + 'container', + 'node-pools', + 'create', + nodepool_config.name, + '--cluster', + self.name, + ) + self._AddNodeParamsToCmd(nodepool_config, cmd) + if node_version: + cmd.flags['node-version'] = node_version + # --async is incompatible with the long --timeout flag in some gcloud + # builds; remove it so the CLI just hands back the op name immediately. + cmd.flags.pop('timeout', None) + return self._IssueAsync(cmd) + + def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: + """Initiates node pool upgrade; returns op handle. Does NOT wait.""" + cmd = self._GcloudCommand( + 'container', + 'clusters', + 'upgrade', + self.name, + '--node-pool', + name, + '--cluster-version', + target_version, + ) + try: + return self._IssueAsync(cmd) + except errors.Resource.CreationError as e: + if 'returned no operation name' not in str(e): + raise + # Fallback: gcloud succeeded but printed nothing. Query the operations + # list scoped to this specific nodepool to find the operation name. + logging.info( + 'UpgradeNodePoolAsync: ops list fallback for %s: %s', + name, + e, + ) + op_name = self._GetLatestOperationName( + operation_type='UPGRADE_NODES', target_name=name + ) + if not op_name: + raise + return op_name + + def DeleteNodePoolAsync(self, name: str) -> str: + cmd = self._GcloudCommand( + 'container', + 'node-pools', + 'delete', + name, + '--cluster', + self.name, + ) + cmd.args.append('--quiet') + return self._IssueAsync(cmd) + + def UpdateClusterAsync(self) -> str: + """Initiates cluster update; returns op handle. Does NOT wait.""" + cmd = self._GcloudCommand('container', 'clusters', 'update', self.name) + cmd.flags['update-labels'] = f'k8s-mgmt-ts={int(time.time())}' + # 'gcloud container clusters update --async' suppresses stdout when + # --quiet is active (same behaviour as 'clusters upgrade'), so the + # operation name is never printed. Remove --quiet here; the label-update + # is non-interactive so no confirmation prompt is needed. + cmd.flags.pop('quiet', None) + # Record start time BEFORE issuing. The label-update operation completes + # in seconds, so it may already be DONE by the time the fallback queries + # the operations list. The timestamp lets us safely include DONE ops + # without matching older completed operations from previous runs. + op_start_time = time.time() + try: + return self._IssueAsync(cmd) + except errors.Resource.CreationError as e: + if 'returned no operation name' not in str(e): + raise + # Fallback: gcloud returned retcode=0 but empty stdout. Query the + # operations list including DONE status (fast label-update ops complete + # before we query) guarded by op_start_time to avoid stale matches. + logging.info( + 'UpdateClusterAsync: ops list fallback for %s: %s', + self.name, + e, + ) + op_name = self._GetLatestOperationName( + operation_type='UPDATE_CLUSTER', + target_name=self.name, + op_start_time=op_start_time, + ) + if not op_name: + raise + return op_name + + def ResolveNodePoolVersions(self) -> tuple[str, str]: + """Returns (initial, target) GKE node versions: initial=N-1, target=N. + + GKE requires fully-qualified node versions (e.g. '1.34.4-gke.1234'), + so we query `gcloud container get-server-config` and pick the newest + valid version per minor. + """ + cmd = self._GcloudCommand('container', 'get-server-config') + cmd.flags['format'] = 'json' + stdout, stderr, retcode = cmd.Issue(raise_on_failure=False) + if retcode: + raise errors.Resource.GetError( + f'gcloud get-server-config failed: {stderr}' + ) + config = json.loads(stdout) + valid = list(config.get('validNodeVersions', [])) + if not valid: + raise errors.Resource.GetError( + 'GKE get-server-config returned no validNodeVersions' + ) + + def _version_tuple(v): + return tuple(int(x) for x in v.split('-', 1)[0].split('.')) + + valid.sort(key=_version_tuple, reverse=True) + target = valid[0] + target_parts = target.split('-', 1)[0].split('.') + initial_minor = f'{target_parts[0]}.{int(target_parts[1]) - 1}' + for v in valid: + v_bare = '.'.join(v.split('-', 1)[0].split('.')[:2]) + if v_bare == initial_minor: + return v, target + raise errors.Resource.GetError( + f'No GKE node version found for minor {initial_minor!r}; ' + f'available top 5: {valid[:5]}' + ) + + def WaitForOperation(self, op_handle: str) -> None: + """Polls a GKE operation until terminal; raises on failure.""" + + @vm_util.Retry( + poll_interval=5, + fuzz=0, + timeout=ONE_HOUR, + retryable_exceptions=(errors.Resource.RetryableCreationError,), + ) + def _poll(): + describe = self._GcloudCommand( + 'container', + 'operations', + 'describe', + op_handle, + ) + # describe.flags['format'] = 'value(status)' + describe.flags['format'] = 'json' + out, err, rc = describe.Issue(raise_on_failure=False) + if rc: + raise errors.Resource.RetryableCreationError( + f'describe op failed: {err}' + ) + # status = out.strip() + try: + status = json.loads(out).get('status') + except (json.JSONDecodeError, ValueError): + status = out.strip() + if status == 'DONE': + return + if status in ('ABORTING', 'ABORTED'): + raise errors.Resource.CreationError(f'op {op_handle} aborted') + raise errors.Resource.RetryableCreationError( + f'op {op_handle} status={status}' + ) + + _poll() + class GkeAutopilotCluster(BaseGkeCluster): """Class representing an Autopilot GKE cluster, which has no nodepools.""" From 9b8ec25f9b49a384aab514dd3e0bc275ff52aa0a Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Tue, 9 Jun 2026 10:53:17 +0000 Subject: [PATCH 2/9] google_kubernetes_engine: remove leftover commented-out line --- perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index 4e74ad62c7..49f089610d 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -899,7 +899,6 @@ def _poll(): raise errors.Resource.RetryableCreationError( f'describe op failed: {err}' ) - # status = out.strip() try: status = json.loads(out).get('status') except (json.JSONDecodeError, ValueError): From b78f357616fcc8abd8803634dcd1fe776f3b352c Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Tue, 9 Jun 2026 11:00:03 +0000 Subject: [PATCH 3/9] google_kubernetes_engine: remove leftover commented-out code --- .../providers/gcp/google_kubernetes_engine.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index 49f089610d..dda1d55031 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -732,12 +732,6 @@ def _GetLatestOperationName( f'stderr={stderr}' ) - # cmd.flags['zone'] = self.zone - # cmd.flags['filter'] = 'operationType=UPGRADE_NODES AND status=RUNNING' - # cmd.flags['sort-by'] = '~startTime' - # cmd.flags['limit'] = 1 - # cmd.flags['format'] = 'value(name)' - def CreateNodePoolAsync( self, nodepool_config: container.BaseNodePoolConfig, @@ -892,7 +886,6 @@ def _poll(): 'describe', op_handle, ) - # describe.flags['format'] = 'value(status)' describe.flags['format'] = 'json' out, err, rc = describe.Issue(raise_on_failure=False) if rc: From d3eec15ac267eb80ac88532c6d7aff4d59bf7a11 Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Mon, 15 Jun 2026 10:54:05 +0000 Subject: [PATCH 4/9] google_kubernetes_engine: consolidate async op-name fallback into _IssueAsync; add tests --- .../providers/gcp/google_kubernetes_engine.py | 171 +++++++++--------- .../gcp/google_kubernetes_engine_test.py | 93 +++++++++- 2 files changed, 177 insertions(+), 87 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index dda1d55031..8e83730668 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -632,19 +632,58 @@ def ResizeNodePool( cmd.flags['node-pool'] = node_pool cmd.Issue() - def _IssueAsync(self, cmd: util.GcloudCommand) -> str: - """Issues a gcloud command with --async, returns the operation name.""" + def _IssueAsync( + self, + cmd: util.GcloudCommand, + fallback_op_type: str | None = None, + fallback_target: str = '', + ) -> str: + """Issues a gcloud --async command and returns the operation name. + + Most async commands (node-pool create/delete) print the operation name to + stdout. A few (`clusters upgrade --node-pool`, `clusters update`) reliably + return success with empty stdout — gcloud simply does not emit the name for + those subcommands. For those, pass fallback_op_type/fallback_target and the + operation name is recovered from `gcloud container operations list`. + + Args: + cmd: the gcloud command to issue (--async and format are added here). + fallback_op_type: GKE operationType (e.g. 'UPGRADE_NODES', + 'UPDATE_CLUSTER') to look up if stdout is empty. None disables the + fallback (create/delete, which always print the name). + fallback_target: targetLink substring for the fallback lookup (node-pool + or cluster name). + + Returns: + The operation name. + """ cmd.args.append('--async') cmd.flags['format'] = 'value(name)' + # Recorded before issuing so the fallback's startTime>= guard can include + # fast ops that may already be DONE before the operations-list query runs. + op_start_time = time.time() stdout, stderr, retcode = cmd.Issue(timeout=600, raise_on_failure=False) if retcode: raise errors.Resource.CreationError(stderr) op_name = stdout.strip().splitlines()[-1].strip() if stdout else '' - if not op_name: + if op_name: + return op_name + # Empty stdout. For commands that print the name this is a real failure; + # for upgrade/update it is expected, so recover via the operations list. + if fallback_op_type is None: raise errors.Resource.CreationError( f'GKE async command returned no operation name; stderr={stderr}' ) - return op_name + logging.info( + '_IssueAsync: no op name printed; ops-list fallback type=%s target=%s', + fallback_op_type, + fallback_target, + ) + return self._GetLatestOperationName( + operation_type=fallback_op_type, + target_name=fallback_target, + op_start_time=op_start_time, + ) def _GetLatestOperationName( self, @@ -656,48 +695,40 @@ def _GetLatestOperationName( ) -> str: """Returns the name of the most recent matching operation for this cluster. - The async gcloud command may return before the GKE control plane has - transitioned the operation from PENDING to RUNNING. For fast operations - (e.g. label updates) the operation may already be DONE by the time this - method is called. Passing op_start_time handles both cases. + Used to recover an operation name for async commands that don't print one + (upgrade/update). The async gcloud command may return before the control + plane has transitioned the operation out of PENDING, and fast operations + (e.g. label updates) may already be DONE by the time this runs — so the + status filter always includes RUNNING/PENDING/DONE, with a startTime guard + (op_start_time minus a 30s clock-skew buffer) to avoid matching older + completed operations. Args: operation_type: GKE operationType to filter on, e.g. 'UPGRADE_NODES' for node pool upgrades or 'UPDATE_CLUSTER' for cluster-level - updates via 'gcloud container clusters update'. - target_name: Substring to match against targetLink (e.g. nodepool name - for UPGRADE_NODES, or cluster name for UPDATE_CLUSTER). If empty, - falls back to self.name (the cluster name). + updates. + target_name: Substring to match against targetLink (node-pool name for + UPGRADE_NODES, cluster name for UPDATE_CLUSTER). If empty, falls + back to self.name. max_attempts: Number of query attempts before giving up. retry_delay: Seconds to wait between attempts. - op_start_time: Unix timestamp recorded just before the async gcloud - command was issued. When provided, the status filter is broadened - to include DONE (so fast-completing operations are found) and a - startTime >= guard is added to avoid matching old operations. + op_start_time: Unix timestamp recorded just before the async command + was issued; used for the startTime>= guard. Defaults to now minus + the buffer if not supplied. Returns: - Operation name string, or empty string if none found. + Operation name string. """ link_target = target_name or self.name - if op_start_time: - # Fast operations (e.g. --update-labels) may be DONE before we query. - # Broaden the status filter and add a startTime guard (with a 30-second - # buffer for clock skew) to avoid picking up older completed operations. - from_time = time.strftime( - '%Y-%m-%dT%H:%M:%SZ', time.gmtime(op_start_time - 30) - ) - status_filter = '(status=RUNNING OR status=PENDING OR status=DONE)' - time_filter = f' AND startTime>="{from_time}"' - else: - # Slow operations (e.g. node pool upgrades): only look for active ops. - status_filter = '(status=RUNNING OR status=PENDING)' - time_filter = '' - + # 30-second buffer absorbs clock skew between client and control plane. + from_time = time.strftime( + '%Y-%m-%dT%H:%M:%SZ', time.gmtime((op_start_time or time.time()) - 30) + ) filter_str = ( f'operationType={operation_type} AND ' - f'{status_filter} AND ' - f'targetLink ~ {link_target}' - f'{time_filter}' + '(status=RUNNING OR status=PENDING OR status=DONE) AND ' + f'targetLink ~ {link_target} AND ' + f'startTime>="{from_time}"' ) for attempt in range(1, max_attempts + 1): list_cmd = self._GcloudCommand('container', 'operations', 'list') @@ -755,7 +786,12 @@ def CreateNodePoolAsync( return self._IssueAsync(cmd) def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: - """Initiates node pool upgrade; returns op handle. Does NOT wait.""" + """Initiates node pool upgrade; returns op handle. Does NOT wait. + + `clusters upgrade --node-pool --async` returns success with empty stdout + (gcloud doesn't print the op name for this subcommand), so the operation + name is recovered from the operations list via _IssueAsync's fallback. + """ cmd = self._GcloudCommand( 'container', 'clusters', @@ -766,24 +802,9 @@ def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: '--cluster-version', target_version, ) - try: - return self._IssueAsync(cmd) - except errors.Resource.CreationError as e: - if 'returned no operation name' not in str(e): - raise - # Fallback: gcloud succeeded but printed nothing. Query the operations - # list scoped to this specific nodepool to find the operation name. - logging.info( - 'UpgradeNodePoolAsync: ops list fallback for %s: %s', - name, - e, - ) - op_name = self._GetLatestOperationName( - operation_type='UPGRADE_NODES', target_name=name - ) - if not op_name: - raise - return op_name + return self._IssueAsync( + cmd, fallback_op_type='UPGRADE_NODES', fallback_target=name + ) def DeleteNodePoolAsync(self, name: str) -> str: cmd = self._GcloudCommand( @@ -798,40 +819,22 @@ def DeleteNodePoolAsync(self, name: str) -> str: return self._IssueAsync(cmd) def UpdateClusterAsync(self) -> str: - """Initiates cluster update; returns op handle. Does NOT wait.""" + """Initiates cluster update; returns op handle. Does NOT wait. + + Toggles a label for a non-destructive cluster update. Like + `clusters upgrade`, `clusters update --async` returns success with empty + stdout, so the op name is recovered via _IssueAsync's fallback. The + label-update completes in seconds, so the fallback may find it already + DONE — handled by _GetLatestOperationName's startTime-guarded filter. + """ cmd = self._GcloudCommand('container', 'clusters', 'update', self.name) cmd.flags['update-labels'] = f'k8s-mgmt-ts={int(time.time())}' - # 'gcloud container clusters update --async' suppresses stdout when - # --quiet is active (same behaviour as 'clusters upgrade'), so the - # operation name is never printed. Remove --quiet here; the label-update - # is non-interactive so no confirmation prompt is needed. + # GcloudCommand sets --quiet by default; the label update is + # non-interactive so it's safe to drop (matches how this was validated). cmd.flags.pop('quiet', None) - # Record start time BEFORE issuing. The label-update operation completes - # in seconds, so it may already be DONE by the time the fallback queries - # the operations list. The timestamp lets us safely include DONE ops - # without matching older completed operations from previous runs. - op_start_time = time.time() - try: - return self._IssueAsync(cmd) - except errors.Resource.CreationError as e: - if 'returned no operation name' not in str(e): - raise - # Fallback: gcloud returned retcode=0 but empty stdout. Query the - # operations list including DONE status (fast label-update ops complete - # before we query) guarded by op_start_time to avoid stale matches. - logging.info( - 'UpdateClusterAsync: ops list fallback for %s: %s', - self.name, - e, - ) - op_name = self._GetLatestOperationName( - operation_type='UPDATE_CLUSTER', - target_name=self.name, - op_start_time=op_start_time, - ) - if not op_name: - raise - return op_name + return self._IssueAsync( + cmd, fallback_op_type='UPDATE_CLUSTER', fallback_target=self.name + ) def ResolveNodePoolVersions(self) -> tuple[str, str]: """Returns (initial, target) GKE node versions: initial=N-1, target=N. diff --git a/tests/providers/gcp/google_kubernetes_engine_test.py b/tests/providers/gcp/google_kubernetes_engine_test.py index dbf8232f5e..1e31d4ab92 100644 --- a/tests/providers/gcp/google_kubernetes_engine_test.py +++ b/tests/providers/gcp/google_kubernetes_engine_test.py @@ -442,6 +442,95 @@ def testCreateRapidChannel(self): self.assertNotIn('--no-enable-autoupgrade', issue_command.all_commands) +class GoogleKubernetesEngineAsyncOpsTestCase(PatchedObjectsTestCase): + """Tests async management-plane ops and the op-name fallback. + + create/delete print the operation name to stdout. upgrade/update reliably + return success with empty stdout (gcloud does not print the name for those + subcommands), so _IssueAsync recovers it from `operations list`. + """ + + @staticmethod + def _spec(): + return container_spec.ContainerClusterSpec( + 'NAME', + **{ + 'cloud': 'GCP', + 'vm_spec': { + 'GCP': { + 'machine_type': 'fake-machine-type', + 'zone': 'us-central1-a', + }, + }, + 'vm_count': 2, + }, + ) + + def test_create_returns_op_name_directly(self): + """create prints an op name on stdout - no fallback needed.""" + spec = self._spec() + op = 'operation-create-1779870000000-abcd' + with self.patch_critical_objects(stdout=op + '\n') as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + pool = mock.Mock(name='pool-cfg') + pool.name = 'pkbma000' + pool.num_nodes = 2 + pool.machine_type = 'fake-machine-type' + self.enter_context(mock.patch.object(cluster, '_AddNodeParamsToCmd')) + handle = cluster.CreateNodePoolAsync(pool) + self.assertEqual(op, handle) + self.assertIn( + 'gcloud container node-pools create', issue_command.all_commands + ) + self.assertNotIn( + 'gcloud container operations list', issue_command.all_commands + ) + + def test_upgrade_falls_back_to_ops_list(self): + """upgrade returns empty stdout; op name recovered from operations list.""" + spec = self._spec() + found_op = 'operation-1779870514692-250d3b27-upgrade' + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + issue = self.MockIssueCommand({ + 'clusters upgrade': [('', '', 0)], + 'operations list': [(found_op + '\n', '', 0)], + }) + handle = cluster.UpgradeNodePoolAsync('pkbma000', '1.34') + self.assertEqual(found_op, handle) + self.assertIn('gcloud container clusters upgrade', issue.all_commands) + self.assertIn('gcloud container operations list', issue.all_commands) + self.assertIn('operationType=UPGRADE_NODES', issue.all_commands) + + def test_update_cluster_falls_back_to_ops_list(self): + """update returns empty stdout; op name recovered from operations list.""" + spec = self._spec() + found_op = 'operation-1779873580306-efa66f70-update' + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + issue = self.MockIssueCommand({ + 'clusters update': [('', '', 0)], + 'operations list': [(found_op + '\n', '', 0)], + }) + handle = cluster.UpdateClusterAsync() + self.assertEqual(found_op, handle) + self.assertIn('operationType=UPDATE_CLUSTER', issue.all_commands) + self.assertIn('status=DONE', issue.all_commands) + self.assertIn('startTime>=', issue.all_commands) + + def test_issue_async_raises_when_no_op_name_and_no_fallback(self): + """Empty stdout with no fallback configured is a hard error.""" + spec = self._spec() + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + self.MockIssueCommand({'': [('', '', 0)]}) + # pylint: disable=protected-access + cmd = cluster._GcloudCommand('container', 'node-pools', 'delete', 'x') + with self.assertRaises(errors.Resource.CreationError): + cluster._IssueAsync(cmd) + # pylint: enable=protected-access + + class GoogleKubernetesEngineGvnicFlagTestCase(PatchedObjectsTestCase): @staticmethod @@ -887,9 +976,7 @@ def testGetMachineTypeFromNodeName(self): spec = self.create_kubernetes_engine_spec() with self.patch_critical_objects(): cluster = google_kubernetes_engine.GkeAutopilotCluster(spec) - self.MockIssueCommand( - {'get node': [('ek-standard-16', '', 0)]} - ) + self.MockIssueCommand({'get node': [('ek-standard-16', '', 0)]}) self.assertEqual( cluster.GetMachineTypeFromNodeName( 'gke-pkb-cluster-default-pool-node-1' From b0768fca60b82c223aaf5857d39c98d1db77bcbc Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Mon, 29 Jun 2026 11:41:45 +0000 Subject: [PATCH 5/9] google_kubernetes_engine: use vm_util.Retry in _GetLatestOperationName --- .../providers/gcp/google_kubernetes_engine.py | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index 8e83730668..b154bc358c 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -730,7 +730,13 @@ def _GetLatestOperationName( f'targetLink ~ {link_target} AND ' f'startTime>="{from_time}"' ) - for attempt in range(1, max_attempts + 1): + @vm_util.Retry( + poll_interval=retry_delay, + max_retries=max_attempts, + retryable_exceptions=(errors.Resource.GetError,), + log_errors=False, + ) + def _QueryOnce() -> str: list_cmd = self._GcloudCommand('container', 'operations', 'list') list_cmd.flags['filter'] = filter_str list_cmd.flags['sort-by'] = '~startTime' @@ -738,30 +744,20 @@ def _GetLatestOperationName( list_cmd.flags['format'] = 'value(name)' stdout, stderr, _ = list_cmd.Issue(raise_on_failure=False) op_name = stdout.strip() - if op_name: - logging.info( - 'GetLatestOp: found %s type=%s target=%s attempt=%d/%d', - op_name, - operation_type, - link_target, - attempt, - max_attempts, + if not op_name: + raise errors.Resource.GetError( + f'_GetLatestOperationName: no {operation_type} op found ' + f'for target={link_target}. stderr={stderr}' ) - return op_name logging.info( - 'GetLatestOp: no %s op for %s attempt=%d/%d retry in %ds.', + 'GetLatestOp: found %s type=%s target=%s', + op_name, operation_type, link_target, - attempt, - max_attempts, - retry_delay, ) - time.sleep(retry_delay) - raise errors.Resource.GetError( - f'_GetLatestOperationName: no {operation_type} op found ' - f'for target={link_target} after {max_attempts} attempts. ' - f'stderr={stderr}' - ) + return op_name + + return _QueryOnce() def CreateNodePoolAsync( self, From 82d52b4bbc946061fab845f338a3ce06bf2e9b6c Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Mon, 29 Jun 2026 12:09:47 +0000 Subject: [PATCH 6/9] providers/gcp: generalize async op handling into GcloudCommand.IssueAsync Moves the --async issue + 'no op name printed, fall back to listing operations' pattern out of GKE-specific _IssueAsync into a reusable GcloudCommand.IssueAsync method, with the resource-specific lookup (_GetLatestOperationName for GKE) passed in as a callable. Per hubatish's review on PR #6747. --- .../providers/gcp/google_kubernetes_engine.py | 40 ++++++---------- perfkitbenchmarker/providers/gcp/util.py | 46 ++++++++++++++++++- 2 files changed, 58 insertions(+), 28 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index b154bc358c..7b729016a4 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -642,12 +642,14 @@ def _IssueAsync( Most async commands (node-pool create/delete) print the operation name to stdout. A few (`clusters upgrade --node-pool`, `clusters update`) reliably - return success with empty stdout — gcloud simply does not emit the name for - those subcommands. For those, pass fallback_op_type/fallback_target and the - operation name is recovered from `gcloud container operations list`. + return success with empty stdout -- gcloud simply does not emit the name + for those subcommands. For those, pass fallback_op_type/fallback_target + and the operation name is recovered from `gcloud container operations + list` via GcloudCommand.IssueAsync's get_latest_op_fn fallback. Args: - cmd: the gcloud command to issue (--async and format are added here). + cmd: the gcloud command to issue (--async and format are added by + GcloudCommand.IssueAsync). fallback_op_type: GKE operationType (e.g. 'UPGRADE_NODES', 'UPDATE_CLUSTER') to look up if stdout is empty. None disables the fallback (create/delete, which always print the name). @@ -657,33 +659,17 @@ def _IssueAsync( Returns: The operation name. """ - cmd.args.append('--async') - cmd.flags['format'] = 'value(name)' # Recorded before issuing so the fallback's startTime>= guard can include # fast ops that may already be DONE before the operations-list query runs. op_start_time = time.time() - stdout, stderr, retcode = cmd.Issue(timeout=600, raise_on_failure=False) - if retcode: - raise errors.Resource.CreationError(stderr) - op_name = stdout.strip().splitlines()[-1].strip() if stdout else '' - if op_name: - return op_name - # Empty stdout. For commands that print the name this is a real failure; - # for upgrade/update it is expected, so recover via the operations list. - if fallback_op_type is None: - raise errors.Resource.CreationError( - f'GKE async command returned no operation name; stderr={stderr}' + get_latest_op_fn = None + if fallback_op_type is not None: + get_latest_op_fn = lambda: self._GetLatestOperationName( + operation_type=fallback_op_type, + target_name=fallback_target, + op_start_time=op_start_time, ) - logging.info( - '_IssueAsync: no op name printed; ops-list fallback type=%s target=%s', - fallback_op_type, - fallback_target, - ) - return self._GetLatestOperationName( - operation_type=fallback_op_type, - target_name=fallback_target, - op_start_time=op_start_time, - ) + return cmd.IssueAsync(get_latest_op_fn=get_latest_op_fn, timeout=600) def _GetLatestOperationName( self, diff --git a/perfkitbenchmarker/providers/gcp/util.py b/perfkitbenchmarker/providers/gcp/util.py index bfe124957f..b43f1ac67e 100644 --- a/perfkitbenchmarker/providers/gcp/util.py +++ b/perfkitbenchmarker/providers/gcp/util.py @@ -20,7 +20,7 @@ import json import logging import re -from typing import Any, Set, cast +from typing import Any, Callable, Set, cast from absl import flags from google.cloud import monitoring_v3 @@ -399,6 +399,50 @@ def Issue(self, **kwargs: Any): return stdout, stderr, retcode + def IssueAsync( + self, + get_latest_op_fn: Callable[[], str] | None = None, + **kwargs, + ) -> str: + """Issues this command with --async and returns the operation name. + + Most async gcloud commands print an operation name to stdout. Some + subcommands (e.g. GKE's `clusters upgrade --node-pool`, `clusters + update`) reliably return success with empty stdout instead. For those, + pass get_latest_op_fn -- a zero-arg callable that looks up the + operation name by some resource-specific means (e.g. listing + operations and filtering) when stdout is empty. + + Args: + get_latest_op_fn: Optional callable invoked when stdout is empty + after issuing the async command. Resource-specific -- e.g. for GKE + this queries `gcloud container operations list` with a + type/target filter. If None, empty stdout raises an error (the + default for resources where async commands always print the + operation name, e.g. create/delete). + **kwargs: Forwarded to Issue(). + + Returns: + The operation name. + + Raises: + errors.Resource.CreationError: if stdout is empty and no + get_latest_op_fn is supplied, or if the command itself fails. + """ + self.args.append('--async') + self.flags['format'] = 'value(name)' + stdout, stderr, retcode = self.Issue(raise_on_failure=False, **kwargs) + if retcode: + raise errors.Resource.CreationError(stderr) + op_name = stdout.strip().splitlines()[-1].strip() if stdout else '' + if op_name: + return op_name + if get_latest_op_fn is None: + raise errors.Resource.CreationError( + f'Async gcloud command returned no operation name; stderr={stderr}' + ) + return get_latest_op_fn() + def _RaiseRateLimitedException(self, error): """Raise rate limited exception based on the retry_on_rate_limited flag. From 1cba1e7ac5fae94ce50eb8dbe65beb73f4676804 Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Mon, 29 Jun 2026 12:13:54 +0000 Subject: [PATCH 7/9] providers/gcp: add unit tests for GcloudCommand.IssueAsync --- tests/providers/gcp/util_test.py | 36 ++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/providers/gcp/util_test.py b/tests/providers/gcp/util_test.py index cf72c230fe..978e2dc0fe 100644 --- a/tests/providers/gcp/util_test.py +++ b/tests/providers/gcp/util_test.py @@ -526,3 +526,39 @@ def testFormatAndSplitAreInverses(self): if __name__ == '__main__': unittest.main() +class GcloudCommandIssueAsyncTestCase(unittest.TestCase): + """Tests for GcloudCommand.IssueAsync (generic async-issue + fallback).""" + + def testIssueAsyncReturnsOpNameDirectly(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object(cmd, 'Issue', return_value=('op-123\n', '', 0)): + result = cmd.IssueAsync() + self.assertEqual(result, 'op-123') + + def testIssueAsyncFallsBackWhenEmptyStdout(self): + cmd = util.GcloudCommand(None, 'fake', 'update') + with mock.patch.object(cmd, 'Issue', return_value=('', '', 0)): + result = cmd.IssueAsync(get_latest_op_fn=lambda: 'recovered-op') + self.assertEqual(result, 'recovered-op') + + def testIssueAsyncRaisesWhenEmptyStdoutAndNoFallback(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object(cmd, 'Issue', return_value=('', 'some error', 0)): + with self.assertRaises(errors.Resource.CreationError): + cmd.IssueAsync() + + def testIssueAsyncRaisesOnCommandFailure(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object(cmd, 'Issue', return_value=('', 'boom', 1)): + with self.assertRaises(errors.Resource.CreationError): + cmd.IssueAsync() + + def testIssueAsyncSetsAsyncFlagAndFormat(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object( + cmd, 'Issue', return_value=('op-456\n', '', 0) + ) as mock_issue: + cmd.IssueAsync() + self.assertIn('--async', cmd.args) + self.assertEqual(cmd.flags['format'], 'value(name)') + mock_issue.assert_called_once() From 94ae63626f3f11a21e34650c80d6653c7c962a7e Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Mon, 29 Jun 2026 12:15:02 +0000 Subject: [PATCH 8/9] providers/gcp: fix blank-line spacing in util_test.py --- tests/providers/gcp/util_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/providers/gcp/util_test.py b/tests/providers/gcp/util_test.py index 978e2dc0fe..bcfcba0f41 100644 --- a/tests/providers/gcp/util_test.py +++ b/tests/providers/gcp/util_test.py @@ -526,6 +526,8 @@ def testFormatAndSplitAreInverses(self): if __name__ == '__main__': unittest.main() + + class GcloudCommandIssueAsyncTestCase(unittest.TestCase): """Tests for GcloudCommand.IssueAsync (generic async-issue + fallback).""" From f3e8f24205648371ba6ed9935b1b01e048a552ac Mon Sep 17 00:00:00 2001 From: Ashish Suneja Date: Mon, 29 Jun 2026 18:28:11 +0000 Subject: [PATCH 9/9] providers/gcp: don't hardcode raise_on_failure in IssueAsync; use IssueCommandError Per hubatish's review: - Removed hardcoded raise_on_failure=False -- IssueAsync now inherits Issue()'s own default (True), so callers control this via **kwargs if needed, and the manual retcode check (which just reimplemented the default) is gone. - Switched the remaining 'no op name, no fallback' exception from errors.Resource.CreationError to errors.VmUtil.IssueCommandError, since IssueAsync isn't always called in a _Create context. Updated 3 tests to match: 2 needed the new exception type, and testIssueAsyncRaisesOnCommandFailure now mocks Issue() raising (side_effect) rather than returning a failure tuple, matching real behavior now that retcode isn't checked manually. --- perfkitbenchmarker/providers/gcp/util.py | 18 ++++++++++++------ .../gcp/google_kubernetes_engine_test.py | 2 +- tests/providers/gcp/util_test.py | 11 ++++++++--- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/util.py b/perfkitbenchmarker/providers/gcp/util.py index b43f1ac67e..2cadd60cb3 100644 --- a/perfkitbenchmarker/providers/gcp/util.py +++ b/perfkitbenchmarker/providers/gcp/util.py @@ -426,19 +426,25 @@ def IssueAsync( The operation name. Raises: - errors.Resource.CreationError: if stdout is empty and no - get_latest_op_fn is supplied, or if the command itself fails. + errors.VmUtil.IssueCommandError: if stdout is empty and no + get_latest_op_fn is supplied. If the command itself fails, the + exception raised is whatever Issue()/IssueCommand raises by + default (also errors.VmUtil.IssueCommandError), unless the caller + passes raise_on_failure=False via **kwargs. """ self.args.append('--async') self.flags['format'] = 'value(name)' - stdout, stderr, retcode = self.Issue(raise_on_failure=False, **kwargs) - if retcode: - raise errors.Resource.CreationError(stderr) + # No explicit retcode check here: Issue() defaults raise_on_failure=True + # (forwarded via **kwargs), so a failing command already raises before + # we get here. Callers that want raise_on_failure=False behavior can + # pass it explicitly through **kwargs and handle the empty stdout/ + # stderr themselves. + stdout, stderr, _ = self.Issue(**kwargs) op_name = stdout.strip().splitlines()[-1].strip() if stdout else '' if op_name: return op_name if get_latest_op_fn is None: - raise errors.Resource.CreationError( + raise errors.VmUtil.IssueCommandError( f'Async gcloud command returned no operation name; stderr={stderr}' ) return get_latest_op_fn() diff --git a/tests/providers/gcp/google_kubernetes_engine_test.py b/tests/providers/gcp/google_kubernetes_engine_test.py index 1e31d4ab92..b145cab88f 100644 --- a/tests/providers/gcp/google_kubernetes_engine_test.py +++ b/tests/providers/gcp/google_kubernetes_engine_test.py @@ -526,7 +526,7 @@ def test_issue_async_raises_when_no_op_name_and_no_fallback(self): self.MockIssueCommand({'': [('', '', 0)]}) # pylint: disable=protected-access cmd = cluster._GcloudCommand('container', 'node-pools', 'delete', 'x') - with self.assertRaises(errors.Resource.CreationError): + with self.assertRaises(errors.VmUtil.IssueCommandError): cluster._IssueAsync(cmd) # pylint: enable=protected-access diff --git a/tests/providers/gcp/util_test.py b/tests/providers/gcp/util_test.py index bcfcba0f41..dfd29028eb 100644 --- a/tests/providers/gcp/util_test.py +++ b/tests/providers/gcp/util_test.py @@ -546,13 +546,18 @@ def testIssueAsyncFallsBackWhenEmptyStdout(self): def testIssueAsyncRaisesWhenEmptyStdoutAndNoFallback(self): cmd = util.GcloudCommand(None, 'fake', 'create') with mock.patch.object(cmd, 'Issue', return_value=('', 'some error', 0)): - with self.assertRaises(errors.Resource.CreationError): + with self.assertRaises(errors.VmUtil.IssueCommandError): cmd.IssueAsync() def testIssueAsyncRaisesOnCommandFailure(self): cmd = util.GcloudCommand(None, 'fake', 'create') - with mock.patch.object(cmd, 'Issue', return_value=('', 'boom', 1)): - with self.assertRaises(errors.Resource.CreationError): + # Issue() defaults raise_on_failure=True, so a real failure surfaces as + # an exception from Issue() itself, not a returned (stdout, stderr, rc) + # failure tuple -- IssueAsync no longer checks retcode. + with mock.patch.object( + cmd, 'Issue', side_effect=errors.VmUtil.IssueCommandError('boom') + ): + with self.assertRaises(errors.VmUtil.IssueCommandError): cmd.IssueAsync() def testIssueAsyncSetsAsyncFlagAndFormat(self):