Skip to content
276 changes: 276 additions & 0 deletions perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import json
import logging
import math
import time
import os
import re
import typing
from typing import Any

from absl import flags
from perfkitbenchmarker import errors
from perfkitbenchmarker import vm_util
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import virtual_machine_spec
from perfkitbenchmarker.configs import container_spec as container_spec_lib
Expand Down Expand Up @@ -630,6 +632,280 @@ def ResizeNodePool(
cmd.flags['node-pool'] = node_pool
cmd.Issue()

def _IssueAsync(self, cmd: util.GcloudCommand) -> str:
"""Issues a gcloud command with --async, returns the operation name."""
cmd.args.append('--async')

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was wondering what made these operations async. got it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overall looks useful enough & broad enough we should consider putting it in providers/gcp/utils.py's GcloudCommand. The --async feature is supported across many gcloud commands & should be broadly applicable right? Even if the fallback "get lastest" behavior would vary - it could take like an optional "get_latest" lambda as an argument. (and given that said fallback is necessary).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added GcloudCommand.IssueAsync(get_latest_op_fn=...) to util.py. GKE's _GetLatestOperationName is now passed in as the callable, so the GKE-specific filter logic stays in google_kubernetes_engine.py while the async-issue/fallback shape is reusable by any GCP resource. Purely additive — GcloudCommand.Issue() itself is unchanged, so none of the existing 194 call sites across GCP providers are affected. Verified with the full tests/providers/gcp/ suite.

cmd.flags['format'] = 'value(name)'
stdout, stderr, retcode = cmd.Issue(timeout=600, raise_on_failure=False)
if retcode:
raise errors.Resource.CreationError(stderr)
op_name = stdout.strip().splitlines()[-1].strip() if stdout else ''
if not op_name:
raise errors.Resource.CreationError(
f'GKE async command returned no operation name; stderr={stderr}'
)
return op_name

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

below you have _GetLatestOperationName command.. does this command here not output the operation name? Shouldn't it just be:

  • start operation, name is returned
  • wait for operation to finish. now done.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For create and delete, that's exactly the flow — _IssueAsync issues --async, gcloud prints the operation name, we wait on it. The exception is clusters upgrade --node-pool and clusters update: those reliably return success with empty stdout (gcloud just doesn't print the name for those two subcommands). Verified on a 100-pool GKE run — 99/99 upgrades and the cluster-update all came back with no operation name, so _GetLatestOperationName recovers it from the operations list. I didn't find a gcloud flag that makes upgrade/update print it; if one exists I'd happily drop the fallback.


def _GetLatestOperationName(
self,
operation_type: str = 'UPGRADE_NODES',
target_name: str = '',
max_attempts: int = 5,
retry_delay: int = 3,
op_start_time: float = 0.0,
) -> str:
"""Returns the name of the most recent matching operation for this cluster.

The async gcloud command may return before the GKE control plane has
transitioned the operation from PENDING to RUNNING. For fast operations
(e.g. label updates) the operation may already be DONE by the time this
method is called. Passing op_start_time handles both cases.

Args:
operation_type: GKE operationType to filter on, e.g. 'UPGRADE_NODES'
for node pool upgrades or 'UPDATE_CLUSTER' for cluster-level
updates via 'gcloud container clusters update'.
target_name: Substring to match against targetLink (e.g. nodepool name
for UPGRADE_NODES, or cluster name for UPDATE_CLUSTER). If empty,
falls back to self.name (the cluster name).
max_attempts: Number of query attempts before giving up.
retry_delay: Seconds to wait between attempts.
op_start_time: Unix timestamp recorded just before the async gcloud
command was issued. When provided, the status filter is broadened
to include DONE (so fast-completing operations are found) and a
startTime >= guard is added to avoid matching old operations.

Returns:
Operation name string, or empty string if none found.
"""
link_target = target_name or self.name
if op_start_time:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good logic; can we not just do it every time / require op_start_time? Additional conditions when not needed are just additional complexity.

@ashishsuneja ashishsuneja Jun 15, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — collapsed it. _GetLatestOperationName now always uses the broadened filter (RUNNING/PENDING/DONE with a startTime>= guard) and always takes op_start_time. The guard already prevents matching stale completed ops, so the two-branch conditional was redundant.

# Fast operations (e.g. --update-labels) may be DONE before we query.
# Broaden the status filter and add a startTime guard (with a 30-second
# buffer for clock skew) to avoid picking up older completed operations.
from_time = time.strftime(
'%Y-%m-%dT%H:%M:%SZ', time.gmtime(op_start_time - 30)
)
status_filter = '(status=RUNNING OR status=PENDING OR status=DONE)'
time_filter = f' AND startTime>="{from_time}"'
else:
# Slow operations (e.g. node pool upgrades): only look for active ops.
status_filter = '(status=RUNNING OR status=PENDING)'
time_filter = ''

filter_str = (
f'operationType={operation_type} AND '
f'{status_filter} AND '
f'targetLink ~ {link_target}'
f'{time_filter}'
)
for attempt in range(1, max_attempts + 1):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are some examples where you try to call the operation but it doesn't get seen immediately? does it just take a minute for results to show up?

A unit test with semi-real output examples would be helpful.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit tests (GoogleKubernetesEngineAsyncOpsTestCase) with captured-style gcloud output: create returns the op name directly (no fallback), upgrade and update fall back and recover from the operations list, and the no-fallback-configured case raises. On the timing: the operation can take a moment to leave PENDING, and fast ops (the label-update) may already be DONE by the time we query — both handled by the broadened filter. In practice on the 100-pool run it resolved on the first query every time; the 5×3s retry is a safety margin for the PENDING-transition window.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you looked at vm_util.Retry? Used with @vm_util.Retry(poll_interval,max_retries,timeout). I suspect you're rewriting code rather than utilizing PKB standards/logic.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — refactored to use vm_util.Retry instead of the manual for-loop/sleep. Since max_attempts/retry_delay are call-time parameters here (not fixed), I apply the decorator to a locally-defined inner function inside the method rather than statically on it — poll_interval/max_retries still fully respect the caller's values via closure. Scoped retryable_exceptions to errors.Resource.GetError so only the "not found yet" case retries, not unrelated failures.

list_cmd = self._GcloudCommand('container', 'operations', 'list')
list_cmd.flags['filter'] = filter_str
list_cmd.flags['sort-by'] = '~startTime'
list_cmd.flags['limit'] = 1
list_cmd.flags['format'] = 'value(name)'
stdout, stderr, _ = list_cmd.Issue(raise_on_failure=False)
op_name = stdout.strip()
if op_name:
logging.info(
'GetLatestOp: found %s type=%s target=%s attempt=%d/%d',
op_name,
operation_type,
link_target,
attempt,
max_attempts,
)
return op_name
logging.info(
'GetLatestOp: no %s op for %s attempt=%d/%d retry in %ds.',
operation_type,
link_target,
attempt,
max_attempts,
retry_delay,
)
time.sleep(retry_delay)
raise errors.Resource.GetError(
f'_GetLatestOperationName: no {operation_type} op found '
f'for target={link_target} after {max_attempts} attempts. '
f'stderr={stderr}'
)

def CreateNodePoolAsync(
self,
nodepool_config: container.BaseNodePoolConfig,
node_version: str | None = None,
) -> str:
"""Initiates node pool create; returns op handle. Does NOT wait."""
cmd = self._GcloudCommand(
'container',
'node-pools',
'create',
nodepool_config.name,
'--cluster',
self.name,
)
self._AddNodeParamsToCmd(nodepool_config, cmd)
if node_version:
cmd.flags['node-version'] = node_version
# --async is incompatible with the long --timeout flag in some gcloud
# builds; remove it so the CLI just hands back the op name immediately.
cmd.flags.pop('timeout', None)
return self._IssueAsync(cmd)

def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str:
"""Initiates node pool upgrade; returns op handle. Does NOT wait."""
cmd = self._GcloudCommand(
'container',
'clusters',
'upgrade',
self.name,
'--node-pool',
name,
'--cluster-version',
target_version,
)
try:
return self._IssueAsync(cmd)
except errors.Resource.CreationError as e:
if 'returned no operation name' not in str(e):
raise
# Fallback: gcloud succeeded but printed nothing. Query the operations
# list scoped to this specific nodepool to find the operation name.
logging.info(
'UpgradeNodePoolAsync: ops list fallback for %s: %s',

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why/when/how frequently does this happen?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time, not intermittently — clusters upgrade --node-pool --async returned no operation name on 99/99 upgrades in the 100-pool run. So the fallback runs on every upgrade/update.

name,
e,
)
op_name = self._GetLatestOperationName(
operation_type='UPGRADE_NODES', target_name=name
)
if not op_name:
raise
return op_name

def DeleteNodePoolAsync(self, name: str) -> str:
cmd = self._GcloudCommand(
'container',
'node-pools',
'delete',
name,
'--cluster',
self.name,
)
cmd.args.append('--quiet')
return self._IssueAsync(cmd)

def UpdateClusterAsync(self) -> str:
"""Initiates cluster update; returns op handle. Does NOT wait."""
cmd = self._GcloudCommand('container', 'clusters', 'update', self.name)
cmd.flags['update-labels'] = f'k8s-mgmt-ts={int(time.time())}'
# 'gcloud container clusters update --async' suppresses stdout when
# --quiet is active (same behaviour as 'clusters upgrade'), so the
# operation name is never printed. Remove --quiet here; the label-update
# is non-interactive so no confirmation prompt is needed.
cmd.flags.pop('quiet', None)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this pop logic is very GcloudCommand specific. Seconding the "add async as option in utils.py" suggestion as a refactor. Might move some unit tests over as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added GcloudCommandIssueAsyncTestCase to util_test.py, covering direct op-name return, fallback invocation, raise-when-no-fallback, raise-on-command-failure, and the --async/format flag setup.

# Record start time BEFORE issuing. The label-update operation completes
# in seconds, so it may already be DONE by the time the fallback queries
# the operations list. The timestamp lets us safely include DONE ops
# without matching older completed operations from previous runs.
op_start_time = time.time()
try:
return self._IssueAsync(cmd)
except errors.Resource.CreationError as e:
if 'returned no operation name' not in str(e):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is shared with UpgradeNodePoolAsync above. Refactor to only use in one location. Perhaps an optional parameter in _IssueAsync which can handle this "no operation name" case. If ofc this is necessary at all.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, exactly as suggested — moved the fallback into _IssueAsync behind optional fallback_op_type / fallback_target params, so the "no op name → query operations list" path lives in one place. UpgradeNodePoolAsync and UpdateClusterAsync are now one-liners

raise
# Fallback: gcloud returned retcode=0 but empty stdout. Query the
# operations list including DONE status (fast label-update ops complete
# before we query) guarded by op_start_time to avoid stale matches.
logging.info(
'UpdateClusterAsync: ops list fallback for %s: %s',
self.name,
e,
)
op_name = self._GetLatestOperationName(
operation_type='UPDATE_CLUSTER',
target_name=self.name,
op_start_time=op_start_time,
)
if not op_name:
raise
return op_name

def ResolveNodePoolVersions(self) -> tuple[str, str]:
"""Returns (initial, target) GKE node versions: initial=N-1, target=N.

GKE requires fully-qualified node versions (e.g. '1.34.4-gke.1234'),
so we query `gcloud container get-server-config` and pick the newest
valid version per minor.
"""
cmd = self._GcloudCommand('container', 'get-server-config')
cmd.flags['format'] = 'json'
stdout, stderr, retcode = cmd.Issue(raise_on_failure=False)
if retcode:
raise errors.Resource.GetError(
f'gcloud get-server-config failed: {stderr}'
)
config = json.loads(stdout)
valid = list(config.get('validNodeVersions', []))
if not valid:
raise errors.Resource.GetError(
'GKE get-server-config returned no validNodeVersions'
)

def _version_tuple(v):
return tuple(int(x) for x in v.split('-', 1)[0].split('.'))

valid.sort(key=_version_tuple, reverse=True)
target = valid[0]
target_parts = target.split('-', 1)[0].split('.')
initial_minor = f'{target_parts[0]}.{int(target_parts[1]) - 1}'
for v in valid:
v_bare = '.'.join(v.split('-', 1)[0].split('.')[:2])
if v_bare == initial_minor:
return v, target
raise errors.Resource.GetError(
f'No GKE node version found for minor {initial_minor!r}; '
f'available top 5: {valid[:5]}'
)

def WaitForOperation(self, op_handle: str) -> None:
"""Polls a GKE operation until terminal; raises on failure."""

@vm_util.Retry(
poll_interval=5,
fuzz=0,
timeout=ONE_HOUR,
retryable_exceptions=(errors.Resource.RetryableCreationError,),
)
def _poll():
describe = self._GcloudCommand(
'container',
'operations',
'describe',
op_handle,
)
describe.flags['format'] = 'json'
out, err, rc = describe.Issue(raise_on_failure=False)
if rc:
raise errors.Resource.RetryableCreationError(
f'describe op failed: {err}'
)
try:
status = json.loads(out).get('status')
except (json.JSONDecodeError, ValueError):
status = out.strip()
if status == 'DONE':
return
if status in ('ABORTING', 'ABORTED'):
raise errors.Resource.CreationError(f'op {op_handle} aborted')
raise errors.Resource.RetryableCreationError(
f'op {op_handle} status={status}'
)

_poll()


class GkeAutopilotCluster(BaseGkeCluster):
"""Class representing an Autopilot GKE cluster, which has no nodepools."""
Expand Down