diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index f943a53ff1..7b729016a4 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -16,6 +16,7 @@ import json import logging import math +import time import os import re import typing @@ -23,6 +24,7 @@ from absl import flags from perfkitbenchmarker import errors +from perfkitbenchmarker import vm_util from perfkitbenchmarker import provider_info from perfkitbenchmarker import virtual_machine_spec from perfkitbenchmarker.configs import container_spec as container_spec_lib @@ -630,6 +632,265 @@ def ResizeNodePool( cmd.flags['node-pool'] = node_pool cmd.Issue() + def _IssueAsync( + self, + cmd: util.GcloudCommand, + fallback_op_type: str | None = None, + fallback_target: str = '', + ) -> str: + """Issues a gcloud --async command and returns the operation name. + + Most async commands (node-pool create/delete) print the operation name to + stdout. A few (`clusters upgrade --node-pool`, `clusters update`) reliably + return success with empty stdout -- gcloud simply does not emit the name + for those subcommands. For those, pass fallback_op_type/fallback_target + and the operation name is recovered from `gcloud container operations + list` via GcloudCommand.IssueAsync's get_latest_op_fn fallback. + + Args: + cmd: the gcloud command to issue (--async and format are added by + GcloudCommand.IssueAsync). + fallback_op_type: GKE operationType (e.g. 'UPGRADE_NODES', + 'UPDATE_CLUSTER') to look up if stdout is empty. None disables the + fallback (create/delete, which always print the name). + fallback_target: targetLink substring for the fallback lookup (node-pool + or cluster name). + + Returns: + The operation name. + """ + # Recorded before issuing so the fallback's startTime>= guard can include + # fast ops that may already be DONE before the operations-list query runs. + op_start_time = time.time() + get_latest_op_fn = None + if fallback_op_type is not None: + get_latest_op_fn = lambda: self._GetLatestOperationName( + operation_type=fallback_op_type, + target_name=fallback_target, + op_start_time=op_start_time, + ) + return cmd.IssueAsync(get_latest_op_fn=get_latest_op_fn, timeout=600) + + def _GetLatestOperationName( + self, + operation_type: str = 'UPGRADE_NODES', + target_name: str = '', + max_attempts: int = 5, + retry_delay: int = 3, + op_start_time: float = 0.0, + ) -> str: + """Returns the name of the most recent matching operation for this cluster. + + Used to recover an operation name for async commands that don't print one + (upgrade/update). The async gcloud command may return before the control + plane has transitioned the operation out of PENDING, and fast operations + (e.g. label updates) may already be DONE by the time this runs — so the + status filter always includes RUNNING/PENDING/DONE, with a startTime guard + (op_start_time minus a 30s clock-skew buffer) to avoid matching older + completed operations. + + Args: + operation_type: GKE operationType to filter on, e.g. 'UPGRADE_NODES' + for node pool upgrades or 'UPDATE_CLUSTER' for cluster-level + updates. + target_name: Substring to match against targetLink (node-pool name for + UPGRADE_NODES, cluster name for UPDATE_CLUSTER). If empty, falls + back to self.name. + max_attempts: Number of query attempts before giving up. + retry_delay: Seconds to wait between attempts. + op_start_time: Unix timestamp recorded just before the async command + was issued; used for the startTime>= guard. Defaults to now minus + the buffer if not supplied. + + Returns: + Operation name string. + """ + link_target = target_name or self.name + # 30-second buffer absorbs clock skew between client and control plane. + from_time = time.strftime( + '%Y-%m-%dT%H:%M:%SZ', time.gmtime((op_start_time or time.time()) - 30) + ) + filter_str = ( + f'operationType={operation_type} AND ' + '(status=RUNNING OR status=PENDING OR status=DONE) AND ' + f'targetLink ~ {link_target} AND ' + f'startTime>="{from_time}"' + ) + @vm_util.Retry( + poll_interval=retry_delay, + max_retries=max_attempts, + retryable_exceptions=(errors.Resource.GetError,), + log_errors=False, + ) + def _QueryOnce() -> str: + list_cmd = self._GcloudCommand('container', 'operations', 'list') + list_cmd.flags['filter'] = filter_str + list_cmd.flags['sort-by'] = '~startTime' + list_cmd.flags['limit'] = 1 + list_cmd.flags['format'] = 'value(name)' + stdout, stderr, _ = list_cmd.Issue(raise_on_failure=False) + op_name = stdout.strip() + if not op_name: + raise errors.Resource.GetError( + f'_GetLatestOperationName: no {operation_type} op found ' + f'for target={link_target}. stderr={stderr}' + ) + logging.info( + 'GetLatestOp: found %s type=%s target=%s', + op_name, + operation_type, + link_target, + ) + return op_name + + return _QueryOnce() + + def CreateNodePoolAsync( + self, + nodepool_config: container.BaseNodePoolConfig, + node_version: str | None = None, + ) -> str: + """Initiates node pool create; returns op handle. Does NOT wait.""" + cmd = self._GcloudCommand( + 'container', + 'node-pools', + 'create', + nodepool_config.name, + '--cluster', + self.name, + ) + self._AddNodeParamsToCmd(nodepool_config, cmd) + if node_version: + cmd.flags['node-version'] = node_version + # --async is incompatible with the long --timeout flag in some gcloud + # builds; remove it so the CLI just hands back the op name immediately. + cmd.flags.pop('timeout', None) + return self._IssueAsync(cmd) + + def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: + """Initiates node pool upgrade; returns op handle. Does NOT wait. + + `clusters upgrade --node-pool --async` returns success with empty stdout + (gcloud doesn't print the op name for this subcommand), so the operation + name is recovered from the operations list via _IssueAsync's fallback. + """ + cmd = self._GcloudCommand( + 'container', + 'clusters', + 'upgrade', + self.name, + '--node-pool', + name, + '--cluster-version', + target_version, + ) + return self._IssueAsync( + cmd, fallback_op_type='UPGRADE_NODES', fallback_target=name + ) + + def DeleteNodePoolAsync(self, name: str) -> str: + cmd = self._GcloudCommand( + 'container', + 'node-pools', + 'delete', + name, + '--cluster', + self.name, + ) + cmd.args.append('--quiet') + return self._IssueAsync(cmd) + + def UpdateClusterAsync(self) -> str: + """Initiates cluster update; returns op handle. Does NOT wait. + + Toggles a label for a non-destructive cluster update. Like + `clusters upgrade`, `clusters update --async` returns success with empty + stdout, so the op name is recovered via _IssueAsync's fallback. The + label-update completes in seconds, so the fallback may find it already + DONE — handled by _GetLatestOperationName's startTime-guarded filter. + """ + cmd = self._GcloudCommand('container', 'clusters', 'update', self.name) + cmd.flags['update-labels'] = f'k8s-mgmt-ts={int(time.time())}' + # GcloudCommand sets --quiet by default; the label update is + # non-interactive so it's safe to drop (matches how this was validated). + cmd.flags.pop('quiet', None) + return self._IssueAsync( + cmd, fallback_op_type='UPDATE_CLUSTER', fallback_target=self.name + ) + + def ResolveNodePoolVersions(self) -> tuple[str, str]: + """Returns (initial, target) GKE node versions: initial=N-1, target=N. + + GKE requires fully-qualified node versions (e.g. '1.34.4-gke.1234'), + so we query `gcloud container get-server-config` and pick the newest + valid version per minor. + """ + cmd = self._GcloudCommand('container', 'get-server-config') + cmd.flags['format'] = 'json' + stdout, stderr, retcode = cmd.Issue(raise_on_failure=False) + if retcode: + raise errors.Resource.GetError( + f'gcloud get-server-config failed: {stderr}' + ) + config = json.loads(stdout) + valid = list(config.get('validNodeVersions', [])) + if not valid: + raise errors.Resource.GetError( + 'GKE get-server-config returned no validNodeVersions' + ) + + def _version_tuple(v): + return tuple(int(x) for x in v.split('-', 1)[0].split('.')) + + valid.sort(key=_version_tuple, reverse=True) + target = valid[0] + target_parts = target.split('-', 1)[0].split('.') + initial_minor = f'{target_parts[0]}.{int(target_parts[1]) - 1}' + for v in valid: + v_bare = '.'.join(v.split('-', 1)[0].split('.')[:2]) + if v_bare == initial_minor: + return v, target + raise errors.Resource.GetError( + f'No GKE node version found for minor {initial_minor!r}; ' + f'available top 5: {valid[:5]}' + ) + + def WaitForOperation(self, op_handle: str) -> None: + """Polls a GKE operation until terminal; raises on failure.""" + + @vm_util.Retry( + poll_interval=5, + fuzz=0, + timeout=ONE_HOUR, + retryable_exceptions=(errors.Resource.RetryableCreationError,), + ) + def _poll(): + describe = self._GcloudCommand( + 'container', + 'operations', + 'describe', + op_handle, + ) + describe.flags['format'] = 'json' + out, err, rc = describe.Issue(raise_on_failure=False) + if rc: + raise errors.Resource.RetryableCreationError( + f'describe op failed: {err}' + ) + try: + status = json.loads(out).get('status') + except (json.JSONDecodeError, ValueError): + status = out.strip() + if status == 'DONE': + return + if status in ('ABORTING', 'ABORTED'): + raise errors.Resource.CreationError(f'op {op_handle} aborted') + raise errors.Resource.RetryableCreationError( + f'op {op_handle} status={status}' + ) + + _poll() + class GkeAutopilotCluster(BaseGkeCluster): """Class representing an Autopilot GKE cluster, which has no nodepools.""" diff --git a/perfkitbenchmarker/providers/gcp/util.py b/perfkitbenchmarker/providers/gcp/util.py index bfe124957f..2cadd60cb3 100644 --- a/perfkitbenchmarker/providers/gcp/util.py +++ b/perfkitbenchmarker/providers/gcp/util.py @@ -20,7 +20,7 @@ import json import logging import re -from typing import Any, Set, cast +from typing import Any, Callable, Set, cast from absl import flags from google.cloud import monitoring_v3 @@ -399,6 +399,56 @@ def Issue(self, **kwargs: Any): return stdout, stderr, retcode + def IssueAsync( + self, + get_latest_op_fn: Callable[[], str] | None = None, + **kwargs, + ) -> str: + """Issues this command with --async and returns the operation name. + + Most async gcloud commands print an operation name to stdout. Some + subcommands (e.g. GKE's `clusters upgrade --node-pool`, `clusters + update`) reliably return success with empty stdout instead. For those, + pass get_latest_op_fn -- a zero-arg callable that looks up the + operation name by some resource-specific means (e.g. listing + operations and filtering) when stdout is empty. + + Args: + get_latest_op_fn: Optional callable invoked when stdout is empty + after issuing the async command. Resource-specific -- e.g. for GKE + this queries `gcloud container operations list` with a + type/target filter. If None, empty stdout raises an error (the + default for resources where async commands always print the + operation name, e.g. create/delete). + **kwargs: Forwarded to Issue(). + + Returns: + The operation name. + + Raises: + errors.VmUtil.IssueCommandError: if stdout is empty and no + get_latest_op_fn is supplied. If the command itself fails, the + exception raised is whatever Issue()/IssueCommand raises by + default (also errors.VmUtil.IssueCommandError), unless the caller + passes raise_on_failure=False via **kwargs. + """ + self.args.append('--async') + self.flags['format'] = 'value(name)' + # No explicit retcode check here: Issue() defaults raise_on_failure=True + # (forwarded via **kwargs), so a failing command already raises before + # we get here. Callers that want raise_on_failure=False behavior can + # pass it explicitly through **kwargs and handle the empty stdout/ + # stderr themselves. + stdout, stderr, _ = self.Issue(**kwargs) + op_name = stdout.strip().splitlines()[-1].strip() if stdout else '' + if op_name: + return op_name + if get_latest_op_fn is None: + raise errors.VmUtil.IssueCommandError( + f'Async gcloud command returned no operation name; stderr={stderr}' + ) + return get_latest_op_fn() + def _RaiseRateLimitedException(self, error): """Raise rate limited exception based on the retry_on_rate_limited flag. diff --git a/tests/providers/gcp/google_kubernetes_engine_test.py b/tests/providers/gcp/google_kubernetes_engine_test.py index dbf8232f5e..b145cab88f 100644 --- a/tests/providers/gcp/google_kubernetes_engine_test.py +++ b/tests/providers/gcp/google_kubernetes_engine_test.py @@ -442,6 +442,95 @@ def testCreateRapidChannel(self): self.assertNotIn('--no-enable-autoupgrade', issue_command.all_commands) +class GoogleKubernetesEngineAsyncOpsTestCase(PatchedObjectsTestCase): + """Tests async management-plane ops and the op-name fallback. + + create/delete print the operation name to stdout. upgrade/update reliably + return success with empty stdout (gcloud does not print the name for those + subcommands), so _IssueAsync recovers it from `operations list`. + """ + + @staticmethod + def _spec(): + return container_spec.ContainerClusterSpec( + 'NAME', + **{ + 'cloud': 'GCP', + 'vm_spec': { + 'GCP': { + 'machine_type': 'fake-machine-type', + 'zone': 'us-central1-a', + }, + }, + 'vm_count': 2, + }, + ) + + def test_create_returns_op_name_directly(self): + """create prints an op name on stdout - no fallback needed.""" + spec = self._spec() + op = 'operation-create-1779870000000-abcd' + with self.patch_critical_objects(stdout=op + '\n') as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + pool = mock.Mock(name='pool-cfg') + pool.name = 'pkbma000' + pool.num_nodes = 2 + pool.machine_type = 'fake-machine-type' + self.enter_context(mock.patch.object(cluster, '_AddNodeParamsToCmd')) + handle = cluster.CreateNodePoolAsync(pool) + self.assertEqual(op, handle) + self.assertIn( + 'gcloud container node-pools create', issue_command.all_commands + ) + self.assertNotIn( + 'gcloud container operations list', issue_command.all_commands + ) + + def test_upgrade_falls_back_to_ops_list(self): + """upgrade returns empty stdout; op name recovered from operations list.""" + spec = self._spec() + found_op = 'operation-1779870514692-250d3b27-upgrade' + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + issue = self.MockIssueCommand({ + 'clusters upgrade': [('', '', 0)], + 'operations list': [(found_op + '\n', '', 0)], + }) + handle = cluster.UpgradeNodePoolAsync('pkbma000', '1.34') + self.assertEqual(found_op, handle) + self.assertIn('gcloud container clusters upgrade', issue.all_commands) + self.assertIn('gcloud container operations list', issue.all_commands) + self.assertIn('operationType=UPGRADE_NODES', issue.all_commands) + + def test_update_cluster_falls_back_to_ops_list(self): + """update returns empty stdout; op name recovered from operations list.""" + spec = self._spec() + found_op = 'operation-1779873580306-efa66f70-update' + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + issue = self.MockIssueCommand({ + 'clusters update': [('', '', 0)], + 'operations list': [(found_op + '\n', '', 0)], + }) + handle = cluster.UpdateClusterAsync() + self.assertEqual(found_op, handle) + self.assertIn('operationType=UPDATE_CLUSTER', issue.all_commands) + self.assertIn('status=DONE', issue.all_commands) + self.assertIn('startTime>=', issue.all_commands) + + def test_issue_async_raises_when_no_op_name_and_no_fallback(self): + """Empty stdout with no fallback configured is a hard error.""" + spec = self._spec() + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + self.MockIssueCommand({'': [('', '', 0)]}) + # pylint: disable=protected-access + cmd = cluster._GcloudCommand('container', 'node-pools', 'delete', 'x') + with self.assertRaises(errors.VmUtil.IssueCommandError): + cluster._IssueAsync(cmd) + # pylint: enable=protected-access + + class GoogleKubernetesEngineGvnicFlagTestCase(PatchedObjectsTestCase): @staticmethod @@ -887,9 +976,7 @@ def testGetMachineTypeFromNodeName(self): spec = self.create_kubernetes_engine_spec() with self.patch_critical_objects(): cluster = google_kubernetes_engine.GkeAutopilotCluster(spec) - self.MockIssueCommand( - {'get node': [('ek-standard-16', '', 0)]} - ) + self.MockIssueCommand({'get node': [('ek-standard-16', '', 0)]}) self.assertEqual( cluster.GetMachineTypeFromNodeName( 'gke-pkb-cluster-default-pool-node-1' diff --git a/tests/providers/gcp/util_test.py b/tests/providers/gcp/util_test.py index cf72c230fe..dfd29028eb 100644 --- a/tests/providers/gcp/util_test.py +++ b/tests/providers/gcp/util_test.py @@ -526,3 +526,46 @@ def testFormatAndSplitAreInverses(self): if __name__ == '__main__': unittest.main() + + +class GcloudCommandIssueAsyncTestCase(unittest.TestCase): + """Tests for GcloudCommand.IssueAsync (generic async-issue + fallback).""" + + def testIssueAsyncReturnsOpNameDirectly(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object(cmd, 'Issue', return_value=('op-123\n', '', 0)): + result = cmd.IssueAsync() + self.assertEqual(result, 'op-123') + + def testIssueAsyncFallsBackWhenEmptyStdout(self): + cmd = util.GcloudCommand(None, 'fake', 'update') + with mock.patch.object(cmd, 'Issue', return_value=('', '', 0)): + result = cmd.IssueAsync(get_latest_op_fn=lambda: 'recovered-op') + self.assertEqual(result, 'recovered-op') + + def testIssueAsyncRaisesWhenEmptyStdoutAndNoFallback(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object(cmd, 'Issue', return_value=('', 'some error', 0)): + with self.assertRaises(errors.VmUtil.IssueCommandError): + cmd.IssueAsync() + + def testIssueAsyncRaisesOnCommandFailure(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + # Issue() defaults raise_on_failure=True, so a real failure surfaces as + # an exception from Issue() itself, not a returned (stdout, stderr, rc) + # failure tuple -- IssueAsync no longer checks retcode. + with mock.patch.object( + cmd, 'Issue', side_effect=errors.VmUtil.IssueCommandError('boom') + ): + with self.assertRaises(errors.VmUtil.IssueCommandError): + cmd.IssueAsync() + + def testIssueAsyncSetsAsyncFlagAndFormat(self): + cmd = util.GcloudCommand(None, 'fake', 'create') + with mock.patch.object( + cmd, 'Issue', return_value=('op-456\n', '', 0) + ) as mock_issue: + cmd.IssueAsync() + self.assertIn('--async', cmd.args) + self.assertEqual(cmd.flags['format'], 'value(name)') + mock_issue.assert_called_once()