Skip to content
261 changes: 261 additions & 0 deletions perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import json
import logging
import math
import time
import os
import re
import typing
from typing import Any

from absl import flags
from perfkitbenchmarker import errors
from perfkitbenchmarker import vm_util
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import virtual_machine_spec
from perfkitbenchmarker.configs import container_spec as container_spec_lib
Expand Down Expand Up @@ -630,6 +632,265 @@ def ResizeNodePool(
cmd.flags['node-pool'] = node_pool
cmd.Issue()

def _IssueAsync(
self,
cmd: util.GcloudCommand,
fallback_op_type: str | None = None,
fallback_target: str = '',
) -> str:
"""Issues a gcloud --async command and returns the operation name.

Most async commands (node-pool create/delete) print the operation name to
stdout. A few (`clusters upgrade --node-pool`, `clusters update`) reliably
return success with empty stdout -- gcloud simply does not emit the name
for those subcommands. For those, pass fallback_op_type/fallback_target
and the operation name is recovered from `gcloud container operations
list` via GcloudCommand.IssueAsync's get_latest_op_fn fallback.

Args:
cmd: the gcloud command to issue (--async and format are added by
GcloudCommand.IssueAsync).
fallback_op_type: GKE operationType (e.g. 'UPGRADE_NODES',
'UPDATE_CLUSTER') to look up if stdout is empty. None disables the
fallback (create/delete, which always print the name).
fallback_target: targetLink substring for the fallback lookup (node-pool
or cluster name).

Returns:
The operation name.
"""
# Recorded before issuing so the fallback's startTime>= guard can include
# fast ops that may already be DONE before the operations-list query runs.
op_start_time = time.time()
get_latest_op_fn = None
if fallback_op_type is not None:
get_latest_op_fn = lambda: self._GetLatestOperationName(
operation_type=fallback_op_type,
target_name=fallback_target,
op_start_time=op_start_time,
)
return cmd.IssueAsync(get_latest_op_fn=get_latest_op_fn, timeout=600)

def _GetLatestOperationName(
self,
operation_type: str = 'UPGRADE_NODES',
target_name: str = '',
max_attempts: int = 5,
retry_delay: int = 3,
op_start_time: float = 0.0,
) -> str:
"""Returns the name of the most recent matching operation for this cluster.

Used to recover an operation name for async commands that don't print one
(upgrade/update). The async gcloud command may return before the control
plane has transitioned the operation out of PENDING, and fast operations
(e.g. label updates) may already be DONE by the time this runs — so the
status filter always includes RUNNING/PENDING/DONE, with a startTime guard
(op_start_time minus a 30s clock-skew buffer) to avoid matching older
completed operations.

Args:
operation_type: GKE operationType to filter on, e.g. 'UPGRADE_NODES'
for node pool upgrades or 'UPDATE_CLUSTER' for cluster-level
updates.
target_name: Substring to match against targetLink (node-pool name for
UPGRADE_NODES, cluster name for UPDATE_CLUSTER). If empty, falls
back to self.name.
max_attempts: Number of query attempts before giving up.
retry_delay: Seconds to wait between attempts.
op_start_time: Unix timestamp recorded just before the async command
was issued; used for the startTime>= guard. Defaults to now minus
the buffer if not supplied.

Returns:
Operation name string.
"""
link_target = target_name or self.name
# 30-second buffer absorbs clock skew between client and control plane.
from_time = time.strftime(
'%Y-%m-%dT%H:%M:%SZ', time.gmtime((op_start_time or time.time()) - 30)
)
filter_str = (
f'operationType={operation_type} AND '
'(status=RUNNING OR status=PENDING OR status=DONE) AND '
f'targetLink ~ {link_target} AND '
f'startTime>="{from_time}"'
)
@vm_util.Retry(
poll_interval=retry_delay,
max_retries=max_attempts,
retryable_exceptions=(errors.Resource.GetError,),
log_errors=False,
)
def _QueryOnce() -> str:
list_cmd = self._GcloudCommand('container', 'operations', 'list')
list_cmd.flags['filter'] = filter_str
list_cmd.flags['sort-by'] = '~startTime'
list_cmd.flags['limit'] = 1
list_cmd.flags['format'] = 'value(name)'
stdout, stderr, _ = list_cmd.Issue(raise_on_failure=False)
op_name = stdout.strip()
if not op_name:
raise errors.Resource.GetError(
f'_GetLatestOperationName: no {operation_type} op found '
f'for target={link_target}. stderr={stderr}'
)
logging.info(
'GetLatestOp: found %s type=%s target=%s',
op_name,
operation_type,
link_target,
)
return op_name

return _QueryOnce()

def CreateNodePoolAsync(
self,
nodepool_config: container.BaseNodePoolConfig,
node_version: str | None = None,
) -> str:
"""Initiates node pool create; returns op handle. Does NOT wait."""
cmd = self._GcloudCommand(
'container',
'node-pools',
'create',
nodepool_config.name,
'--cluster',
self.name,
)
self._AddNodeParamsToCmd(nodepool_config, cmd)
if node_version:
cmd.flags['node-version'] = node_version
# --async is incompatible with the long --timeout flag in some gcloud
# builds; remove it so the CLI just hands back the op name immediately.
cmd.flags.pop('timeout', None)
return self._IssueAsync(cmd)

def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str:
"""Initiates node pool upgrade; returns op handle. Does NOT wait.

`clusters upgrade --node-pool --async` returns success with empty stdout

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it ok I see where we need this now. Are we running multiple of these in parallel? That's where "get latest" would break.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — concurrent_node_pool_ops runs many of these in parallel (concurrent upgrades across node pools). It's safe: the filter includes targetLink ~ {link_target}, and each concurrent call passes its own specific fallback_target (the individual node-pool name) — so concurrent calls each resolve only operations matching their own target, not just "whatever's most recent." Combined with the op_start_time guard, this disambiguates correctly even when many ops start within the same few seconds. I'll add an explicit concurrent-calls test case to make this guarantee visible in the test suite.

(gcloud doesn't print the op name for this subcommand), so the operation
name is recovered from the operations list via _IssueAsync's fallback.
"""
cmd = self._GcloudCommand(
'container',
'clusters',
'upgrade',
self.name,
'--node-pool',
name,
'--cluster-version',
target_version,
)
return self._IssueAsync(
cmd, fallback_op_type='UPGRADE_NODES', fallback_target=name
)

def DeleteNodePoolAsync(self, name: str) -> str:
cmd = self._GcloudCommand(
'container',
'node-pools',
'delete',
name,
'--cluster',
self.name,
)
cmd.args.append('--quiet')
return self._IssueAsync(cmd)

def UpdateClusterAsync(self) -> str:
"""Initiates cluster update; returns op handle. Does NOT wait.

Toggles a label for a non-destructive cluster update. Like
`clusters upgrade`, `clusters update --async` returns success with empty
stdout, so the op name is recovered via _IssueAsync's fallback. The
label-update completes in seconds, so the fallback may find it already
DONE — handled by _GetLatestOperationName's startTime-guarded filter.
"""
cmd = self._GcloudCommand('container', 'clusters', 'update', self.name)
cmd.flags['update-labels'] = f'k8s-mgmt-ts={int(time.time())}'
# GcloudCommand sets --quiet by default; the label update is
# non-interactive so it's safe to drop (matches how this was validated).
cmd.flags.pop('quiet', None)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this pop logic is very GcloudCommand specific. Seconding the "add async as option in utils.py" suggestion as a refactor. Might move some unit tests over as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added GcloudCommandIssueAsyncTestCase to util_test.py, covering direct op-name return, fallback invocation, raise-when-no-fallback, raise-on-command-failure, and the --async/format flag setup.

return self._IssueAsync(
cmd, fallback_op_type='UPDATE_CLUSTER', fallback_target=self.name
)

def ResolveNodePoolVersions(self) -> tuple[str, str]:
"""Returns (initial, target) GKE node versions: initial=N-1, target=N.

GKE requires fully-qualified node versions (e.g. '1.34.4-gke.1234'),
so we query `gcloud container get-server-config` and pick the newest
valid version per minor.
"""
cmd = self._GcloudCommand('container', 'get-server-config')
cmd.flags['format'] = 'json'
stdout, stderr, retcode = cmd.Issue(raise_on_failure=False)
if retcode:
raise errors.Resource.GetError(
f'gcloud get-server-config failed: {stderr}'
)
config = json.loads(stdout)
valid = list(config.get('validNodeVersions', []))
if not valid:
raise errors.Resource.GetError(
'GKE get-server-config returned no validNodeVersions'
)

def _version_tuple(v):
return tuple(int(x) for x in v.split('-', 1)[0].split('.'))

valid.sort(key=_version_tuple, reverse=True)
target = valid[0]
target_parts = target.split('-', 1)[0].split('.')
initial_minor = f'{target_parts[0]}.{int(target_parts[1]) - 1}'
for v in valid:
v_bare = '.'.join(v.split('-', 1)[0].split('.')[:2])
if v_bare == initial_minor:
return v, target
raise errors.Resource.GetError(
f'No GKE node version found for minor {initial_minor!r}; '
f'available top 5: {valid[:5]}'
)

def WaitForOperation(self, op_handle: str) -> None:
"""Polls a GKE operation until terminal; raises on failure."""

@vm_util.Retry(
poll_interval=5,
fuzz=0,
timeout=ONE_HOUR,
retryable_exceptions=(errors.Resource.RetryableCreationError,),
)
def _poll():
describe = self._GcloudCommand(
'container',
'operations',
'describe',
op_handle,
)
describe.flags['format'] = 'json'
out, err, rc = describe.Issue(raise_on_failure=False)
if rc:
raise errors.Resource.RetryableCreationError(
f'describe op failed: {err}'
)
try:
status = json.loads(out).get('status')
except (json.JSONDecodeError, ValueError):
status = out.strip()
if status == 'DONE':
return
if status in ('ABORTING', 'ABORTED'):
raise errors.Resource.CreationError(f'op {op_handle} aborted')
raise errors.Resource.RetryableCreationError(
f'op {op_handle} status={status}'
)

_poll()


class GkeAutopilotCluster(BaseGkeCluster):
"""Class representing an Autopilot GKE cluster, which has no nodepools."""
Expand Down
46 changes: 45 additions & 1 deletion perfkitbenchmarker/providers/gcp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
import logging
import re
from typing import Any, Set, cast
from typing import Any, Callable, Set, cast

from absl import flags
from google.cloud import monitoring_v3
Expand Down Expand Up @@ -399,6 +399,50 @@ def Issue(self, **kwargs: Any):

return stdout, stderr, retcode

def IssueAsync(
self,
get_latest_op_fn: Callable[[], str] | None = None,
**kwargs,
) -> str:
"""Issues this command with --async and returns the operation name.

Most async gcloud commands print an operation name to stdout. Some
subcommands (e.g. GKE's `clusters upgrade --node-pool`, `clusters
update`) reliably return success with empty stdout instead. For those,
pass get_latest_op_fn -- a zero-arg callable that looks up the
operation name by some resource-specific means (e.g. listing
operations and filtering) when stdout is empty.

Args:
get_latest_op_fn: Optional callable invoked when stdout is empty
after issuing the async command. Resource-specific -- e.g. for GKE
this queries `gcloud container operations list` with a
type/target filter. If None, empty stdout raises an error (the
default for resources where async commands always print the
operation name, e.g. create/delete).
**kwargs: Forwarded to Issue().

Returns:
The operation name.

Raises:
errors.Resource.CreationError: if stdout is empty and no
get_latest_op_fn is supplied, or if the command itself fails.
"""
self.args.append('--async')
self.flags['format'] = 'value(name)'
stdout, stderr, retcode = self.Issue(raise_on_failure=False, **kwargs)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies if I asked this already, but why raise_on_failure=False ? That's a big problem for general use - maybe it should be an argument passed from google_kubernetes_engine. Especially given the "if retcode" below it just looks like you're changing the exception type returned.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - removed the hardcoded override. IssueAsync now just calls self.Issue(**kwargs), so it inherits Issue()'s own default (raise_on_failure=True) unless a caller explicitly opts out via **kwargs. The manual if retcode: raise is gone too — it was just reimplementing what the default already does.

if retcode:
raise errors.Resource.CreationError(stderr)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Said exception type also doesn't really make sense in a more general location - this IssueAsync command doesn't have to be called in a _Create context.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed errors.Resource.CreationError from this path entirely. Since raise_on_failure now defaults to True, a failing command raises via Issue()/IssueCommand's own exception (errors.VmUtil.IssueCommandError) before we'd ever reach a retcode check here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed errors.Resource.CreationError from this path entirely. Since raise_on_failure now defaults to True, a failing command raises via Issue()/IssueCommand's own exception (errors.VmUtil.IssueCommandError) before we'd ever reach a retcode check here.

op_name = stdout.strip().splitlines()[-1].strip() if stdout else ''
if op_name:
return op_name
if get_latest_op_fn is None:
raise errors.Resource.CreationError(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise probably errors.VmUtil.IssueCommandError instead.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — switched the "no op name and no fallback" raise to errors.VmUtil.IssueCommandError, matching what the command-failure path now raises by default too.

f'Async gcloud command returned no operation name; stderr={stderr}'
)
return get_latest_op_fn()

def _RaiseRateLimitedException(self, error):
"""Raise rate limited exception based on the retry_on_rate_limited flag.

Expand Down
Loading