From 93ec14e5e0c019d0bb804fdd42960f34c44eea1f Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Fri, 27 Mar 2026 15:44:19 +0000 Subject: [PATCH] move swarming module to the swarming service Signed-off-by: Javan Lacerda --- .../_internal/swarming/__init__.py | 197 --------- src/clusterfuzz/_internal/swarming/service.py | 236 ++++++++++- .../tests/core/swarming/service_test.py | 365 ++++++++++++++++- .../tests/core/swarming/swarming_test.py | 378 ------------------ 4 files changed, 593 insertions(+), 583 deletions(-) delete mode 100644 src/clusterfuzz/_internal/swarming/__init__.py delete mode 100644 src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py deleted file mode 100644 index 4e03ab3bf6..0000000000 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ /dev/null @@ -1,197 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Swarming helpers.""" - -import base64 -import json -import uuid - -from google.auth.transport import requests -from google.protobuf import json_format - -from clusterfuzz._internal.base import utils -from clusterfuzz._internal.base.feature_flags import FeatureFlags -from clusterfuzz._internal.config import local_config -from clusterfuzz._internal.datastore import data_types -from clusterfuzz._internal.google_cloud_utils import credentials -from clusterfuzz._internal.protos import swarming_pb2 -from clusterfuzz._internal.system import environment - -_SWARMING_SCOPES = [ - 'https://www.googleapis.com/auth/cloud-platform', - 'https://www.googleapis.com/auth/userinfo.email' -] - - -def is_swarming_task(command: str, job_name: str): - """Returns True if the task is supposed to run on swarming.""" - if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: - return False - job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job: - return False - - job_environment = job.get_environment() - if not utils.string_is_true(job_environment.get('IS_SWARMING_JOB')): - return False - - try: - _get_new_task_spec(command, job_name, '') - return True - except ValueError: - return False - - -def _get_task_name(): - return 't-' + str(uuid.uuid4()).lower() - - -def _get_swarming_config(): - """Returns the swarming config.""" - return local_config.SwarmingConfig() - - -def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list - ) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member - """ Gets all swarming dimensions for a task. - Job dimensions have more precedence than static dimensions""" - unique_dimensions = {} - unique_dimensions['os'] = job.platform - unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool') - - for dimension in platform_specific_dimensions: - unique_dimensions[dimension['key'].lower()] = dimension['value'] - - swarming_dimensions = environment.get_value('SWARMING_DIMENSIONS') - if isinstance(swarming_dimensions, dict): - for key, value in swarming_dimensions.items(): - unique_dimensions[key.lower()] = value - - task_dimensions = [] - for dimension, value in unique_dimensions.items(): - task_dimensions.append( - swarming_pb2.StringPair( # pylint: disable=no-member - key=dimension, value=value)) - return task_dimensions - - -def _env_vars_to_json( - env_vars: list[swarming_pb2.StringPair]) -> swarming_pb2.StringPair: # pylint: disable=no-member - """ - Compresses all env variables into a single JSON string , which will be used - to set up the env variables in swarming bots that launch clusterfuzz - using a docker container. - """ - env_vars_dict = {pair.key: pair.value for pair in env_vars} - return swarming_pb2.StringPair( # pylint: disable=no-member - key='DOCKER_ENV_VARS', - value=json.dumps(env_vars_dict)) - - -def _get_new_task_spec(command: str, job_name: str, - download_url: str) -> swarming_pb2.NewTaskRequest: # pylint: disable=no-member - """Gets the configured specifications for a swarming task.""" - job = data_types.Job.query(data_types.Job.name == job_name).get() - config_name = job.platform - swarming_config = _get_swarming_config() - instance_spec = swarming_config.get('mapping').get(config_name, None) - if instance_spec is None: - raise ValueError(f'No mapping for {config_name}') - swarming_realm = swarming_config.get('swarming_realm') - logs_project_id = swarming_config.get('logs_project_id') - priority = instance_spec['priority'] - startup_command = instance_spec['command'] - # The service account that the task runs as. - service_account = instance_spec['service_account_email'] - # If this task request slice is not scheduled after waiting this long, - # the task state will be set to EXPIRED. - expiration_secs = instance_spec['expiration_secs'] - # Maximum number of seconds the task can run before its process is - # forcibly terminated and the task results in TIMED_OUT. - execution_timeout_secs = instance_spec['execution_timeout_secs'] - if command == 'fuzz': - execution_timeout_secs = swarming_config.get('fuzz_task_duration') - - # The cipd_input contains the cipd_packages that need to be installed - # before running the task (if any). - cipd_input = instance_spec.get('cipd_input', {}) - # env_prefixes allows the modification of existing environment variables by - # adding the values as prefixes to the env variable. - env_prefixes = instance_spec.get('env_prefixes', {}) - default_task_environment = [ - swarming_pb2.StringPair(key='UWORKER', value='True'), # pylint: disable=no-member - swarming_pb2.StringPair(key='SWARMING_BOT', value='True'), # pylint: disable=no-member - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), # pylint: disable=no-member - swarming_pb2.StringPair( # pylint: disable=no-member - key='LOGGING_CLOUD_PROJECT_ID', - value=logs_project_id), - ] - - platform_specific_env = instance_spec.get('env', []) - swarming_bot_environment = [] - swarming_bot_environment.append( - swarming_pb2.StringPair( # pylint: disable=no-member - key='DOCKER_IMAGE', - value=instance_spec.get('docker_image', ''))) - for var in platform_specific_env: - swarming_bot_environment.append( - swarming_pb2.StringPair(key=var['key'], value=var['value'])) # pylint: disable=no-member - swarming_bot_environment.append(_env_vars_to_json(default_task_environment)) - swarming_bot_environment.extend(default_task_environment) - dimensions = instance_spec.get('dimensions', []) - cas_input_root = instance_spec.get('cas_input_root', {}) - - new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member - name=_get_task_name(), - priority=priority, - realm=swarming_realm, - service_account=service_account, - task_slices=[ - swarming_pb2.TaskSlice( # pylint: disable=no-member - expiration_secs=expiration_secs, - properties=swarming_pb2.TaskProperties( # pylint: disable=no-member - command=startup_command, - dimensions=_get_task_dimensions(job, dimensions), - cipd_input=cipd_input, - cas_input_root=cas_input_root, - execution_timeout_secs=execution_timeout_secs, - env=swarming_bot_environment, - env_prefixes=env_prefixes, - secret_bytes=base64.b64encode(download_url.encode('utf-8')))) - ]) - - return new_task_request - - -def push_swarming_task(command, download_url, job_type): - """Schedules a task on swarming.""" - job = data_types.Job.query(data_types.Job.name == job_type).get() - if not job: - raise ValueError('invalid job_name') - - task_spec = _get_new_task_spec(command, job_type, download_url) - creds, _ = credentials.get_default(_SWARMING_SCOPES) - - if not creds.token: - creds.refresh(requests.Request()) - - headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {creds.token}' - } - swarming_server = _get_swarming_config().get('swarming_server') - url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' - utils.post_url( - url=url, data=json_format.MessageToJson(task_spec), headers=headers) diff --git a/src/clusterfuzz/_internal/swarming/service.py b/src/clusterfuzz/_internal/swarming/service.py index 88c1169d28..35f9e6ecec 100644 --- a/src/clusterfuzz/_internal/swarming/service.py +++ b/src/clusterfuzz/_internal/swarming/service.py @@ -13,10 +13,236 @@ # limitations under the License. """Swarming service.""" -from clusterfuzz._internal import swarming +import base64 +import json +import uuid + +from google.auth.transport import requests +from google.protobuf import json_format + +from clusterfuzz._internal.base import utils +from clusterfuzz._internal.base.feature_flags import FeatureFlags from clusterfuzz._internal.base.tasks import task_utils +from clusterfuzz._internal.config import local_config +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs +from clusterfuzz._internal.protos import swarming_pb2 from clusterfuzz._internal.remote_task import remote_task_types +from clusterfuzz._internal.system import environment + +_SWARMING_SCOPES = [ + 'https://www.googleapis.com/auth/cloud-platform', + 'https://www.googleapis.com/auth/userinfo.email' +] + + +def is_swarming_task(command: str, job_name: str) -> bool: + """Returns True if the task is supposed to run on swarming. + + Args: + command: The command to run (e.g. 'fuzz'). + job_name: The name of the job. + + Returns: + True if the task should run on swarming, False otherwise. + """ + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return False + job = data_types.Job.query(data_types.Job.name == job_name).get() + if not job: + return False + + job_environment = job.get_environment() + if not utils.string_is_true(job_environment.get('IS_SWARMING_JOB')): + return False + + try: + _get_new_task_spec(command, job_name, '') + return True + except ValueError: + return False + + +def _get_task_name() -> str: + """Returns a unique task name.""" + return 't-' + str(uuid.uuid4()).lower() + + +def _get_swarming_config(): + """Returns the swarming configuration.""" + return local_config.SwarmingConfig() + + +def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list + ) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member + """Gets all swarming dimensions for a task. + + Job dimensions have more precedence than static dimensions. + + Args: + job: The Job entity. + platform_specific_dimensions: A list of platform-specific dimensions. + + Returns: + A list of swarming_pb2.StringPair dimensions. + """ + unique_dimensions = {} + unique_dimensions['os'] = job.platform + unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool') + + for dimension in platform_specific_dimensions: + unique_dimensions[dimension['key'].lower()] = dimension['value'] + + swarming_dimensions = environment.get_value('SWARMING_DIMENSIONS') + if isinstance(swarming_dimensions, dict): + for key, value in swarming_dimensions.items(): + unique_dimensions[key.lower()] = value + + task_dimensions = [] + for dimension, value in unique_dimensions.items(): + task_dimensions.append( + swarming_pb2.StringPair( # pylint: disable=no-member + key=dimension, value=value)) + return task_dimensions + + +def _env_vars_to_json( + env_vars: list[swarming_pb2.StringPair]) -> swarming_pb2.StringPair: # pylint: disable=no-member + """Compresses environment variables into a single JSON string. + + This JSON string will be used to set up the environment variables in + swarming bots that launch ClusterFuzz using a docker container. + + Args: + env_vars: A list of swarming_pb2.StringPair environment variables. + + Returns: + A swarming_pb2.StringPair containing the JSON-encoded environment variables. + """ + env_vars_dict = {pair.key: pair.value for pair in env_vars} + return swarming_pb2.StringPair( # pylint: disable=no-member + key='DOCKER_ENV_VARS', + value=json.dumps(env_vars_dict)) + + +def _get_new_task_spec(command: str, job_name: str, + download_url: str) -> swarming_pb2.NewTaskRequest: # pylint: disable=no-member + """Gets the configured specifications for a swarming task. + + Args: + command: The command to run. + job_name: The name of the job. + download_url: The URL to download the task input. + + Returns: + A swarming_pb2.NewTaskRequest containing the task specification. + + Raises: + ValueError: If no mapping is found for the job's platform. + """ + job = data_types.Job.query(data_types.Job.name == job_name).get() + config_name = job.platform + swarming_config = _get_swarming_config() + instance_spec = swarming_config.get('mapping').get(config_name, None) + if instance_spec is None: + raise ValueError(f'No mapping for {config_name}') + swarming_realm = swarming_config.get('swarming_realm') + logs_project_id = swarming_config.get('logs_project_id') + priority = instance_spec['priority'] + startup_command = instance_spec['command'] + # The service account that the task runs as. + service_account = instance_spec['service_account_email'] + # If this task request slice is not scheduled after waiting this long, + # the task state will be set to EXPIRED. + expiration_secs = instance_spec['expiration_secs'] + # Maximum number of seconds the task can run before its process is + # forcibly terminated and the task results in TIMED_OUT. + execution_timeout_secs = instance_spec['execution_timeout_secs'] + if command == 'fuzz': + execution_timeout_secs = swarming_config.get('fuzz_task_duration') + + # The cipd_input contains the cipd_packages that need to be installed + # before running the task (if any). + cipd_input = instance_spec.get('cipd_input', {}) + # env_prefixes allows the modification of existing environment variables by + # adding the values as prefixes to the env variable. + env_prefixes = instance_spec.get('env_prefixes', {}) + default_task_environment = [ + swarming_pb2.StringPair(key='UWORKER', value='True'), # pylint: disable=no-member + swarming_pb2.StringPair(key='SWARMING_BOT', value='True'), # pylint: disable=no-member + swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), # pylint: disable=no-member + swarming_pb2.StringPair( # pylint: disable=no-member + key='LOGGING_CLOUD_PROJECT_ID', + value=logs_project_id), + ] + + platform_specific_env = instance_spec.get('env', []) + swarming_bot_environment = [] + swarming_bot_environment.append( + swarming_pb2.StringPair( # pylint: disable=no-member + key='DOCKER_IMAGE', + value=instance_spec.get('docker_image', ''))) + for var in platform_specific_env: + swarming_bot_environment.append( + swarming_pb2.StringPair(key=var['key'], value=var['value'])) # pylint: disable=no-member + swarming_bot_environment.append(_env_vars_to_json(default_task_environment)) + swarming_bot_environment.extend(default_task_environment) + dimensions = instance_spec.get('dimensions', []) + cas_input_root = instance_spec.get('cas_input_root', {}) + + new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member + name=_get_task_name(), + priority=priority, + realm=swarming_realm, + service_account=service_account, + task_slices=[ + swarming_pb2.TaskSlice( # pylint: disable=no-member + expiration_secs=expiration_secs, + properties=swarming_pb2.TaskProperties( # pylint: disable=no-member + command=startup_command, + dimensions=_get_task_dimensions(job, dimensions), + cipd_input=cipd_input, + cas_input_root=cas_input_root, + execution_timeout_secs=execution_timeout_secs, + env=swarming_bot_environment, + env_prefixes=env_prefixes, + secret_bytes=base64.b64encode(download_url.encode('utf-8')))) + ]) + + return new_task_request + + +def push_swarming_task(command: str, download_url: str, job_type: str): + """Schedules a task on swarming. + + Args: + command: The command to run. + download_url: The URL to download the task input. + job_type: The name of the job. + + Raises: + ValueError: If the job_type is invalid. + """ + job = data_types.Job.query(data_types.Job.name == job_type).get() + if not job: + raise ValueError('invalid job_name') + + task_spec = _get_new_task_spec(command, job_type, download_url) + creds, _ = credentials.get_default(_SWARMING_SCOPES) + + if not creds.token: + creds.refresh(requests.Request()) + + headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {creds.token}' + } + swarming_server = _get_swarming_config().get('swarming_server') + url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' + utils.post_url( + url=url, data=json_format.MessageToJson(task_spec), headers=headers) class SwarmingService(remote_task_types.RemoteTaskInterface): @@ -39,17 +265,17 @@ def create_utask_main_jobs(self, remote_tasks: list[remote_task_types.RemoteTask] ) -> list[remote_task_types.RemoteTask]: """Creates many remote tasks for uworker main tasks. - Returns the tasks that couldn't be created. + + Returns the tasks that couldn't be created. """ unscheduled_tasks = [] for task in remote_tasks: try: - if not swarming.is_swarming_task(task.command, task.job_type): + if not is_swarming_task(task.command, task.job_type): unscheduled_tasks.append(task) continue - swarming.push_swarming_task(task.command, task.input_download_url, - task.job_type) + push_swarming_task(task.command, task.input_download_url, task.job_type) except Exception: # pylint: disable=broad-except logs.error( f'Failed to push task to Swarming: {task.command}, {task.job_type}.' diff --git a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py index 09c485c426..155caade49 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py @@ -11,14 +11,373 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for SwarmingService.""" +"""Tests for swarming service.""" +import base64 import unittest from unittest import mock +from google.protobuf import json_format + +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.protos import swarming_pb2 from clusterfuzz._internal.remote_task import remote_task_types from clusterfuzz._internal.swarming import service +from clusterfuzz._internal.system import environment from clusterfuzz._internal.tests.test_libs import helpers +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +class SwarmingTest(unittest.TestCase): + """Tests for swarming utility functions.""" + + def setUp(self): + helpers.patch(self, [ + 'clusterfuzz._internal.base.utils.post_url', + 'clusterfuzz._internal.swarming.service._get_task_name', + 'clusterfuzz._internal.google_cloud_utils.credentials.get_default', + 'google.auth.transport.requests.Request', + 'clusterfuzz._internal.swarming.service.FeatureFlags', + ]) + helpers.patch_environ(self) + self.mock._get_task_name.return_value = 'task_name' # pylint: disable=protected-access + self.mock.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled = True + self.maxDiff = None + + def test_get_spec_from_config_with_docker_image(self): + """Tests that _get_new_task_spec works as expected.""" + job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + job.put() + spec = service._get_new_task_spec( # pylint: disable=protected-access + 'corpus_pruning', job.name, 'https://download_url') + expected_spec = swarming_pb2.NewTaskRequest( + name='task_name', + priority=1, + realm='realm-name', + service_account='test-clusterfuzz-service-account-email', + task_slices=[ + swarming_pb2.TaskSlice( + expiration_secs=86400, + properties=swarming_pb2.TaskProperties( + command=[ + 'luci-auth', 'context', '--', './linux_entry_point.sh' + ], + dimensions=[ + swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair(key='pool', value='pool-name') + ], + cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member + cas_input_root=swarming_pb2.CASReference( + cas_instance= + 'projects/server-name/instances/instance_name', + digest=swarming_pb2.Digest( + hash='linux_entry_point_archive_hash', + size_bytes=1234)), + execution_timeout_secs=86400, + env=[ + swarming_pb2.StringPair( + key='DOCKER_IMAGE', + value= + 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' + ), + swarming_pb2.StringPair( + key='DOCKER_ENV_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + ), + swarming_pb2.StringPair(key='UWORKER', value='True'), + swarming_pb2.StringPair( + key='SWARMING_BOT', value='True'), + swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair( + key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), + ], + secret_bytes=base64.b64encode( + 'https://download_url'.encode('utf-8')))) + ]) + + self.assertEqual(spec, expected_spec) + + def test_get_spec_from_config_raises_error_on_unknown_config(self): + """Tests that _get_new_task_spec raises error when there's no mapping for + the config.""" + job = data_types.Job(name='some_job_name', platform='UNKNOWN-PLATFORM') + job.put() + with self.assertRaises(ValueError): + service._get_new_task_spec( # pylint: disable=protected-access + 'corpus_pruning', job.name, 'https://download_url') + + def test_get_spec_from_config_without_docker_image(self): + """Tests that _get_new_task_spec works as expected (without a docker + image).""" + job = data_types.Job(name='libfuzzer_chrome_asan', platform='MAC') + job.put() + spec = service._get_new_task_spec( # pylint: disable=protected-access + 'corpus_pruning', job.name, 'https://download_url') + expected_spec = swarming_pb2.NewTaskRequest( + name='task_name', + priority=1, + realm='realm-name', + service_account='test-clusterfuzz-service-account-email', + task_slices=[ + swarming_pb2.TaskSlice( + expiration_secs=86400, + properties=swarming_pb2.TaskProperties( + command=[ + 'luci-auth', 'context', '--', './mac_entry_point.sh' + ], + dimensions=[ + swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair(key='pool', value='pool-name'), + swarming_pb2.StringPair(key='key1', value='value1'), + swarming_pb2.StringPair(key='key2', value='value2'), + ], + cipd_input=swarming_pb2.CipdInput(packages=[ + swarming_pb2.CipdPackage( + package_name='package1_name', + version='package1_version', + path='package_install_path'), + swarming_pb2.CipdPackage( + package_name='package2_name', + version='package2_version', + path='package_install_path'), + ]), + cas_input_root=swarming_pb2.CASReference( + cas_instance= + 'projects/server-name/instances/instance_name', + digest=swarming_pb2.Digest( + hash='mac_entry_point_archive_hash', + size_bytes=456)), + execution_timeout_secs=86400, + env=[ + swarming_pb2.StringPair(key='DOCKER_IMAGE', value=''), + swarming_pb2.StringPair(key='ENV_VAR1', value='VALUE1'), + swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2'), + swarming_pb2.StringPair( + key='DOCKER_ENV_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + ), + swarming_pb2.StringPair(key='UWORKER', value='True'), + swarming_pb2.StringPair( + key='SWARMING_BOT', value='True'), + swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair( + key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), + ], + env_prefixes=[ + swarming_pb2.StringListPair( + key='PATH', + value=[ + 'package_install_path', + 'package_install_path/bin' + ]) + ], + secret_bytes=base64.b64encode( + 'https://download_url'.encode('utf-8')))) + ]) + self.assertEqual(spec, expected_spec) + + def test_get_spec_from_config_for_fuzz_task(self): + """Tests that _get_new_task_spec works as expected for fuzz commands.""" + job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + job.put() + spec = service._get_new_task_spec( # pylint: disable=protected-access + 'fuzz', job.name, 'https://download_url') + expected_spec = swarming_pb2.NewTaskRequest( + name='task_name', + priority=1, + realm='realm-name', + service_account='test-clusterfuzz-service-account-email', + task_slices=[ + swarming_pb2.TaskSlice( + expiration_secs=86400, + properties=swarming_pb2.TaskProperties( + command=[ + 'luci-auth', 'context', '--', './linux_entry_point.sh' + ], + dimensions=[ + swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair(key='pool', value='pool-name') + ], + cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member + cas_input_root=swarming_pb2.CASReference( + cas_instance= + 'projects/server-name/instances/instance_name', + digest=swarming_pb2.Digest( + hash='linux_entry_point_archive_hash', + size_bytes=1234)), + execution_timeout_secs=12345, + env=[ + swarming_pb2.StringPair( + key='DOCKER_IMAGE', + value= + 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' + ), + swarming_pb2.StringPair( + key='DOCKER_ENV_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + ), + swarming_pb2.StringPair(key='UWORKER', value='True'), + swarming_pb2.StringPair( + key='SWARMING_BOT', value='True'), + swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair( + key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), + ], + secret_bytes=base64.b64encode( + 'https://download_url'.encode('utf-8')))) + ]) + self.assertEqual(spec, expected_spec) + + def test_push_swarming_task(self): + """Tests that push_swarming_task works as expected.""" + mock_creds = mock.MagicMock() + mock_creds.token = 'fake_token' + self.mock.get_default.return_value = (mock_creds, None) + + job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + job.put() + service.push_swarming_task('fuzz', 'https://download_url', job.name) + + expected_new_task_request = swarming_pb2.NewTaskRequest( + name='task_name', + priority=1, + realm='realm-name', + service_account='test-clusterfuzz-service-account-email', + task_slices=[ + swarming_pb2.TaskSlice( + expiration_secs=86400, + properties=swarming_pb2.TaskProperties( + command=[ + 'luci-auth', 'context', '--', './linux_entry_point.sh' + ], + dimensions=[ + swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair(key='pool', value='pool-name') + ], + cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member + cas_input_root=swarming_pb2.CASReference( + cas_instance= + 'projects/server-name/instances/instance_name', + digest=swarming_pb2.Digest( + hash='linux_entry_point_archive_hash', + size_bytes=1234)), + execution_timeout_secs=12345, + env=[ + swarming_pb2.StringPair( + key='DOCKER_IMAGE', + value= + 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' + ), + swarming_pb2.StringPair( + key='DOCKER_ENV_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + ), + swarming_pb2.StringPair(key='UWORKER', value='True'), + swarming_pb2.StringPair( + key='SWARMING_BOT', value='True'), + swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair( + key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), + ], + secret_bytes=base64.b64encode( + 'https://download_url'.encode('utf-8')))) + ]) + + self.mock.get_default.assert_called_with(service._SWARMING_SCOPES) # pylint: disable=protected-access + expected_headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer fake_token' + } + expected_url = 'https://server-name/prpc/swarming.v2.Tasks/NewTask' + self.mock.post_url.assert_called_with( + url=expected_url, + data=json_format.MessageToJson(expected_new_task_request), + headers=expected_headers) + + def test_push_swarming_task_with_refresh(self): + """Tests that push_swarming_task refreshes credentials if token is missing.""" + mock_creds = mock.MagicMock() + mock_creds.token = None + self.mock.get_default.return_value = (mock_creds, None) + + def refresh_side_effect(_): + mock_creds.token = 'refreshed_token' + + mock_creds.refresh.side_effect = refresh_side_effect + + job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + job.put() + service.push_swarming_task('fuzz', 'https://download_url', job.name) + + mock_creds.refresh.assert_called_with(self.mock.Request.return_value) + expected_headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'Authorization': 'Bearer refreshed_token' + } + self.assertEqual(self.mock.post_url.call_args[1]['headers'], + expected_headers) + + def test_is_swarming_task(self): + """Tests that is_swarming_task works as expected.""" + job = data_types.Job( + name='libfuzzer_chrome_asan', + platform='LINUX', + environment_string='IS_SWARMING_JOB = True') + job.put() + self.assertTrue(service.is_swarming_task('fuzz', job.name)) + + job.environment_string = 'IS_SWARMING_JOB = False' + job.put() + self.assertFalse(service.is_swarming_task('fuzz', job.name)) + + job.environment_string = '' + job.put() + self.assertFalse(service.is_swarming_task('fuzz', job.name)) + + def test_get_task_dimensions_with_env_var(self): + """Tests that _get_task_dimensions handles SWARMING_DIMENSIONS env var.""" + environment.set_value('SWARMING_DIMENSIONS', { + 'cpu': 'x86', + 'os': 'windows' + }) + job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + dimensions = service._get_task_dimensions(job, []) # pylint: disable=protected-access + + expected_dimensions = [ + swarming_pb2.StringPair(key='os', value='windows'), + swarming_pb2.StringPair(key='pool', value='pool-name'), + swarming_pb2.StringPair(key='cpu', value='x86'), + ] + self.assertCountEqual(dimensions, expected_dimensions) + + def test_get_task_dimensions_job_precedence(self): + """Tests that job swarming dimensions have more precedence than platform ones.""" + # Use 'MAC' platform which has static dimensions (key1, key2) in swarming.yaml. + job = data_types.Job(name='mac_job', platform='MAC') + job.put() + + # Platform dimensions for MAC are: key1: value1, key2: value2. + # We set SWARMING_DIMENSIONS in the environment to override key1. + environment.set_value('SWARMING_DIMENSIONS', {'key1': 'job_value1'}) + + spec = service._get_new_task_spec( # pylint: disable=protected-access + 'fuzz', job.name, 'https://download_url') + dimensions = spec.task_slices[0].properties.dimensions + + expected_dimensions = [ + swarming_pb2.StringPair(key='os', value='MAC'), + swarming_pb2.StringPair(key='pool', value='pool-name'), + swarming_pb2.StringPair(key='key1', value='job_value1'), + swarming_pb2.StringPair(key='key2', value='value2'), + ] + self.assertCountEqual(dimensions, expected_dimensions) class SwarmingServiceTest(unittest.TestCase): @@ -26,8 +385,8 @@ class SwarmingServiceTest(unittest.TestCase): def setUp(self): helpers.patch(self, [ - 'clusterfuzz._internal.swarming.is_swarming_task', - 'clusterfuzz._internal.swarming.push_swarming_task', + 'clusterfuzz._internal.swarming.service.is_swarming_task', + 'clusterfuzz._internal.swarming.service.push_swarming_task', 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module', 'clusterfuzz._internal.metrics.logs.error', ]) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py deleted file mode 100644 index 016a41f13b..0000000000 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ /dev/null @@ -1,378 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Swarming tests.""" -import base64 -import unittest -from unittest import mock - -from google.protobuf import json_format - -from clusterfuzz._internal import swarming -from clusterfuzz._internal.datastore import data_types -from clusterfuzz._internal.protos import swarming_pb2 -from clusterfuzz._internal.system import environment -from clusterfuzz._internal.tests.test_libs import helpers -from clusterfuzz._internal.tests.test_libs import test_utils - - -@test_utils.with_cloud_emulators('datastore') -class SwarmingTest(unittest.TestCase): - """Tests for swarming utils.""" - - def setUp(self): - helpers.patch(self, [ - 'clusterfuzz._internal.base.utils.post_url', - 'clusterfuzz._internal.swarming._get_task_name', - 'clusterfuzz._internal.google_cloud_utils.credentials.get_default', - 'google.auth.transport.requests.Request', - 'clusterfuzz._internal.swarming.FeatureFlags', - ]) - helpers.patch_environ(self) - self.mock._get_task_name.return_value = 'task_name' # pylint: disable=protected-access - self.mock.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled = True - self.maxDiff = None - - def test_get_spec_from_config_with_docker_image(self): - """Tests that _get_new_task_spec works as expected.""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') - expected_spec = swarming_pb2.NewTaskRequest( - name='task_name', - priority=1, - realm='realm-name', - service_account='test-clusterfuzz-service-account-email', - task_slices=[ - swarming_pb2.TaskSlice( - expiration_secs=86400, - properties=swarming_pb2.TaskProperties( - command=[ - 'luci-auth', 'context', '--', './linux_entry_point.sh' - ], - dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), - swarming_pb2.StringPair(key='pool', value='pool-name') - ], - cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member - cas_input_root=swarming_pb2.CASReference( - cas_instance= - 'projects/server-name/instances/instance_name', - digest=swarming_pb2.Digest( - hash='linux_entry_point_archive_hash', - size_bytes=1234)), - execution_timeout_secs=86400, - env=[ - swarming_pb2.StringPair( - key='DOCKER_IMAGE', - value= - 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' - ), - swarming_pb2.StringPair( - key='DOCKER_ENV_VARS', - value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' - ), - swarming_pb2.StringPair(key='UWORKER', value='True'), - swarming_pb2.StringPair( - key='SWARMING_BOT', value='True'), - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), - swarming_pb2.StringPair( - key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), - ], - secret_bytes=base64.b64encode( - 'https://download_url'.encode('utf-8')))) - ]) - - self.assertEqual(spec, expected_spec) - - def test_get_spec_from_config_raises_error_on_unknown_config(self): - """Tests that _get_new_task_spec raises error when there's no mapping for - the config.""" - job = data_types.Job(name='some_job_name', platform='UNKNOWN-PLATFORM') - job.put() - with self.assertRaises(ValueError): - swarming._get_new_task_spec( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') - - def test_get_spec_from_config_without_docker_image(self): - """Tests that _get_new_task_spec works as expected (without a docker - image).""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='MAC') - job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') - expected_spec = swarming_pb2.NewTaskRequest( - name='task_name', - priority=1, - realm='realm-name', - service_account='test-clusterfuzz-service-account-email', - task_slices=[ - swarming_pb2.TaskSlice( - expiration_secs=86400, - properties=swarming_pb2.TaskProperties( - command=[ - 'luci-auth', 'context', '--', './mac_entry_point.sh' - ], - dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), - swarming_pb2.StringPair(key='pool', value='pool-name'), - swarming_pb2.StringPair(key='key1', value='value1'), - swarming_pb2.StringPair(key='key2', value='value2'), - ], - cipd_input=swarming_pb2.CipdInput(packages=[ - swarming_pb2.CipdPackage( - package_name='package1_name', - version='package1_version', - path='package_install_path'), - swarming_pb2.CipdPackage( - package_name='package2_name', - version='package2_version', - path='package_install_path'), - ]), - cas_input_root=swarming_pb2.CASReference( - cas_instance= - 'projects/server-name/instances/instance_name', - digest=swarming_pb2.Digest( - hash='mac_entry_point_archive_hash', - size_bytes=456)), - execution_timeout_secs=86400, - env=[ - swarming_pb2.StringPair(key='DOCKER_IMAGE', value=''), - swarming_pb2.StringPair(key='ENV_VAR1', value='VALUE1'), - swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2'), - swarming_pb2.StringPair( - key='DOCKER_ENV_VARS', - value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' - ), - swarming_pb2.StringPair(key='UWORKER', value='True'), - swarming_pb2.StringPair( - key='SWARMING_BOT', value='True'), - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), - swarming_pb2.StringPair( - key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), - ], - env_prefixes=[ - swarming_pb2.StringListPair( - key='PATH', - value=[ - 'package_install_path', - 'package_install_path/bin' - ]) - ], - secret_bytes=base64.b64encode( - 'https://download_url'.encode('utf-8')))) - ]) - self.assertEqual(spec, expected_spec) - - def test_get_spec_from_config_for_fuzz_task(self): - """Tests that _get_new_task_spec works as expected for fuzz commands.""" - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'fuzz', job.name, 'https://download_url') - expected_spec = swarming_pb2.NewTaskRequest( - name='task_name', - priority=1, - realm='realm-name', - service_account='test-clusterfuzz-service-account-email', - task_slices=[ - swarming_pb2.TaskSlice( - expiration_secs=86400, - properties=swarming_pb2.TaskProperties( - command=[ - 'luci-auth', 'context', '--', './linux_entry_point.sh' - ], - dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), - swarming_pb2.StringPair(key='pool', value='pool-name') - ], - cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member - cas_input_root=swarming_pb2.CASReference( - cas_instance= - 'projects/server-name/instances/instance_name', - digest=swarming_pb2.Digest( - hash='linux_entry_point_archive_hash', - size_bytes=1234)), - execution_timeout_secs=12345, - env=[ - swarming_pb2.StringPair( - key='DOCKER_IMAGE', - value= - 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' - ), - swarming_pb2.StringPair( - key='DOCKER_ENV_VARS', - value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' - ), - swarming_pb2.StringPair(key='UWORKER', value='True'), - swarming_pb2.StringPair( - key='SWARMING_BOT', value='True'), - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), - swarming_pb2.StringPair( - key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), - ], - secret_bytes=base64.b64encode( - 'https://download_url'.encode('utf-8')))) - ]) - self.assertEqual(spec, expected_spec) - - def test_push_swarming_task(self): - """Tests that push_swarming_task works as expected.""" - mock_creds = mock.MagicMock() - mock_creds.token = 'fake_token' - self.mock.get_default.return_value = (mock_creds, None) - - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - swarming.push_swarming_task('fuzz', 'https://download_url', job.name) - - expected_new_task_request = swarming_pb2.NewTaskRequest( - name='task_name', - priority=1, - realm='realm-name', - service_account='test-clusterfuzz-service-account-email', - task_slices=[ - swarming_pb2.TaskSlice( - expiration_secs=86400, - properties=swarming_pb2.TaskProperties( - command=[ - 'luci-auth', 'context', '--', './linux_entry_point.sh' - ], - dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), - swarming_pb2.StringPair(key='pool', value='pool-name') - ], - cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member - cas_input_root=swarming_pb2.CASReference( - cas_instance= - 'projects/server-name/instances/instance_name', - digest=swarming_pb2.Digest( - hash='linux_entry_point_archive_hash', - size_bytes=1234)), - execution_timeout_secs=12345, - env=[ - swarming_pb2.StringPair( - key='DOCKER_IMAGE', - value= - 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' - ), - swarming_pb2.StringPair( - key='DOCKER_ENV_VARS', - value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' - ), - swarming_pb2.StringPair(key='UWORKER', value='True'), - swarming_pb2.StringPair( - key='SWARMING_BOT', value='True'), - swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), - swarming_pb2.StringPair( - key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), - ], - secret_bytes=base64.b64encode( - 'https://download_url'.encode('utf-8')))) - ]) - - self.mock.get_default.assert_called_with(swarming._SWARMING_SCOPES) # pylint: disable=protected-access - expected_headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': 'Bearer fake_token' - } - expected_url = 'https://server-name/prpc/swarming.v2.Tasks/NewTask' - self.mock.post_url.assert_called_with( - url=expected_url, - data=json_format.MessageToJson(expected_new_task_request), - headers=expected_headers) - - def test_push_swarming_task_with_refresh(self): - """Tests that push_swarming_task refreshes credentials if token is missing.""" - mock_creds = mock.MagicMock() - mock_creds.token = None - self.mock.get_default.return_value = (mock_creds, None) - - def refresh_side_effect(_): - mock_creds.token = 'refreshed_token' - - mock_creds.refresh.side_effect = refresh_side_effect - - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - job.put() - swarming.push_swarming_task('fuzz', 'https://download_url', job.name) - - mock_creds.refresh.assert_called_with(self.mock.Request.return_value) - expected_headers = { - 'Accept': 'application/json', - 'Content-Type': 'application/json', - 'Authorization': 'Bearer refreshed_token' - } - self.assertEqual(self.mock.post_url.call_args[1]['headers'], - expected_headers) - - def test_is_swarming_task(self): - """Tests that is_swarming_task works as expected.""" - job = data_types.Job( - name='libfuzzer_chrome_asan', - platform='LINUX', - environment_string='IS_SWARMING_JOB = True') - job.put() - self.assertTrue(swarming.is_swarming_task('fuzz', job.name)) - - job.environment_string = 'IS_SWARMING_JOB = False' - job.put() - self.assertFalse(swarming.is_swarming_task('fuzz', job.name)) - - job.environment_string = '' - job.put() - self.assertFalse(swarming.is_swarming_task('fuzz', job.name)) - - def test_get_task_dimensions_with_env_var(self): - """Tests that _get_task_dimensions handles SWARMING_DIMENSIONS env var.""" - environment.set_value('SWARMING_DIMENSIONS', { - 'cpu': 'x86', - 'os': 'windows' - }) - job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') - dimensions = swarming._get_task_dimensions(job, []) # pylint: disable=protected-access - - expected_dimensions = [ - swarming_pb2.StringPair(key='os', value='windows'), - swarming_pb2.StringPair(key='pool', value='pool-name'), - swarming_pb2.StringPair(key='cpu', value='x86'), - ] - self.assertCountEqual(dimensions, expected_dimensions) - - def test_get_task_dimensions_job_precedence(self): - """Tests that job swarming dimensions have more precedence than platform ones.""" - # Use 'MAC' platform which has static dimensions (key1, key2) in swarming.yaml. - job = data_types.Job(name='mac_job', platform='MAC') - job.put() - - # Platform dimensions for MAC are: key1: value1, key2: value2. - # We set SWARMING_DIMENSIONS in the environment to override key1. - environment.set_value('SWARMING_DIMENSIONS', {'key1': 'job_value1'}) - - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'fuzz', job.name, 'https://download_url') - dimensions = spec.task_slices[0].properties.dimensions - - expected_dimensions = [ - swarming_pb2.StringPair(key='os', value='MAC'), - swarming_pb2.StringPair(key='pool', value='pool-name'), - swarming_pb2.StringPair(key='key1', value='job_value1'), - swarming_pb2.StringPair(key='key2', value='value2'), - ] - self.assertCountEqual(dimensions, expected_dimensions)