diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 107ea5d907a..e7b5f7938e2 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -454,7 +454,7 @@ def uworker_main(input_download_url) -> None: _start_web_server_if_needed(uworker_input.job_type) utask_module = get_utask_module(uworker_input.module_name) - execution_mode = Mode.SWARMING if environment.is_swarming_bot( + execution_mode = Mode.SWARMING if environment.is_running_on_swarming( ) else Mode.BATCH recorder.set_task_details( utask_module, uworker_input.job_type, execution_mode, diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index 8fa6c026795..f5d00c2d551 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -32,6 +32,8 @@ from typing import NamedTuple from typing import TYPE_CHECKING +from clusterfuzz._internal.system import environment + # This is needed to avoid circular import if TYPE_CHECKING: from clusterfuzz._internal.cron.grouper import TestcaseAttributes @@ -93,26 +95,17 @@ def _console_logging_enabled(): # TODO(pmeuleman) Revert the changeset that added these once # https://github.com/google/clusterfuzz/pull/3422 lands. -def _file_logging_enabled(): +def _file_logging_enabled() -> bool: """Return bool True when logging to files (bot/logs/*.log) is enabled. - This is enabled by default. - This is disabled if we are running in app engine or kubernetes as these have - their dedicated loggers, see configure_appengine() and configure_k8s(). - """ - return bool(os.getenv( - 'LOG_TO_FILE', - 'True')) and not _is_running_on_app_engine() and not _is_running_on_k8s() + This is enabled by default.""" + return environment.get_value('LOG_TO_FILE', True) -def _cloud_logging_enabled(): +def _cloud_logging_enabled() -> bool: """Return bool True where Google Cloud Logging is enabled. - This is enabled by default. - This is disabled for local development and if we are running in a app engine - or kubernetes as these have their dedicated loggers, see - configure_appengine() and configure_k8s().""" - return (bool(os.getenv('LOG_TO_GCP', 'True')) and - not os.getenv("PY_UNITTESTS") and not _is_local() and - not _is_running_on_app_engine() and not _is_running_on_k8s()) + This is enabled by default but disabled for local development.""" + return (environment.get_value('LOG_TO_GCP', True) and + not environment.is_running_unit_tests() and not _is_local()) def suppress_unwanted_warnings(): @@ -418,7 +411,7 @@ def configure_appengine(): """Configure logging for App Engine.""" logging.getLogger().setLevel(logging.INFO) - if os.getenv('LOCAL_DEVELOPMENT') or os.getenv('PY_UNITTESTS'): + if os.getenv('LOCAL_DEVELOPMENT') or environment.is_running_unit_tests(): return import google.cloud.logging @@ -554,12 +547,38 @@ def cloud_label_filter(record): logging.getLogger().addHandler(handler) +def configure_swarming(name: str, extras: dict[str, str] | None = None) -> None: + """Configure logging for swarming bots.""" + if extras is None: + extras = {} + extras['task_id'] = os.getenv('TASK_ID') + extras['instance_id'] = os.getenv('BOT_NAME') + extras['platform'] = 'swarming' + + global _default_extras + _default_extras = extras + + logging.basicConfig(level=logging.INFO) + if _cloud_logging_enabled(): + configure_cloud_logging() + + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + set_logger(logger) + + sys.excepthook = uncaught_exception_handler + + def configure(name, extras=None): """Set logger. See the list of loggers in bot/config/logging.yaml. Also configures the process to log any uncaught exceptions as an error. |extras| will be included by emit() in log messages.""" suppress_unwanted_warnings() + if environment.is_running_on_swarming(): + configure_swarming(name, extras) + return + if _is_running_on_k8s(): configure_k8s() return @@ -792,7 +811,6 @@ def get_common_log_context() -> dict[str, str]: """Return common context to be propagated by logs.""" # Avoid circular imports on the top level. from clusterfuzz._internal.base import utils - from clusterfuzz._internal.system import environment try: os_type = environment.platform() diff --git a/src/clusterfuzz/_internal/system/environment.py b/src/clusterfuzz/_internal/system/environment.py index 2e6dd86f5e6..1292b5fa946 100644 --- a/src/clusterfuzz/_internal/system/environment.py +++ b/src/clusterfuzz/_internal/system/environment.py @@ -23,11 +23,9 @@ import sys import uuid -import requests import yaml from clusterfuzz._internal import fuzzing -from clusterfuzz._internal.metrics import logs # Tools supporting customization of options via ADDITIONAL_{TOOL_NAME}_OPTIONS. # FIXME: Support ADDITIONAL_UBSAN_OPTIONS and ADDITIONAL_LSAN_OPTIONS in an @@ -749,9 +747,9 @@ def get_runtime() -> UtaskMainRuntime: return UtaskMainRuntime.INSTANCE_GROUP -def is_swarming_bot(): +def is_running_on_swarming() -> bool: """Return whether or not the current bot is a swarming bot.""" - return get_value('SWARMING_BOT') + return get_value('SWARMING_BOT') is True def is_running_on_app_engine(): @@ -1240,42 +1238,6 @@ def is_tworker(): return get_value('TWORKER', False) -def update_task_enabled() -> bool: - """ It uses the GCE VM metadata server `update_task_enabled` flag. - - This flag will be used to rollout the update_task deprecation - by disabling it progressively for each instance group through - the instance template metadata - """ - metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" + - "instance/attributes/") - metadata_header = {"Metadata-Flavor": "Google"} - metadata_key = "update_task_enabled" - - running_on_batch = bool(is_uworker()) - - try: - # Construct the full URL for your specific metadata key - response = requests.get( - f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10) - - # Raise an exception for bad status codes (4xx or 5xx) - response.raise_for_status() - - # The metadata value is in the response text - metadata_value = response.text - logs.info(f"The value for '{metadata_key}' is: {metadata_value}") - is_update_task_enabled = metadata_value.lower() != 'false' - - # The flag is_uworker is true for Batch environment - # The update task should run if it's not a Batch environment - # and the flag is enabled on the VM template metadata - return not running_on_batch and is_update_task_enabled - - except requests.exceptions.HTTPError as http_error: - logs.warning(f"Http error fetching metadata: {http_error}") - - except Exception as ex: - logs.error(f"Unknown exception fetching metadata: {ex}") - - return not running_on_batch +def is_running_unit_tests() -> bool: + """Returns whether or not we're running unit tests.""" + return get_value('PY_UNITTESTS', False) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 0c16e120d6a..38db68e09ee 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -199,7 +199,7 @@ def setUp(self): 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_input', 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.serialize_and_upload_uworker_output', 'clusterfuzz._internal.bot.tasks.utasks.get_utask_module', - 'clusterfuzz._internal.system.environment.is_swarming_bot', + 'clusterfuzz._internal.system.environment.is_running_on_swarming', 'clusterfuzz._internal.metrics.events.emit', ]) self.module = mock.MagicMock(__name__='tasks.analyze_task') @@ -210,10 +210,8 @@ def test_uworker_main(self, execution_mode: utasks.Mode): """Tests that uworker_main works as intended.""" start_time_ns = time.time_ns() - if execution_mode == utasks.Mode.SWARMING: - self.mock.is_swarming_bot.return_value = True # pylint: disable=protected-access - else: - self.mock.is_swarming_bot.return_value = False + self.mock.is_running_on_swarming.return_value = ( + execution_mode == utasks.Mode.SWARMING) preprocess_start_time_ns = start_time_ns - 42 * 10**9 # In the past. preprocess_start_timestamp = timestamp_pb2.Timestamp() diff --git a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py index e9dc16b6176..68ad9153a10 100644 --- a/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py +++ b/src/clusterfuzz/_internal/tests/core/metrics/logs_test.py @@ -521,6 +521,26 @@ def test_configure_appengine(self): logs.configure('test') self.assertEqual(0, self.mock.dictConfig.call_count) + def test_configure_swarming(self): + """Test configure for swarming bot.""" + # pylint: disable=protected-access + os.environ['SWARMING_BOT'] = 'True' + os.environ['TASK_ID'] = 'task-123' + os.environ['BOT_NAME'] = 'bot-123' + + helpers.patch( + self, + ['clusterfuzz._internal.system.environment.is_running_on_swarming']) + + logger = mock.MagicMock() + self.mock.getLogger.return_value = logger + + logs.configure('test') + + self.assertEqual(logs._default_extras['task_id'], 'task-123') + self.assertEqual(logs._default_extras['instance_id'], 'bot-123') + self.assertEqual(logs._default_extras['platform'], 'swarming') + @test_utils.with_cloud_emulators('datastore') class EmitTest(unittest.TestCase): diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index de7fcaf849e..6825a28e149 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -27,6 +27,8 @@ import time import traceback +import requests + from clusterfuzz._internal.base import dates from clusterfuzz._internal.base import errors from clusterfuzz._internal.base import tasks @@ -125,7 +127,7 @@ def task_loop(): # This caches the current environment on first run. Don't move this. environment.reset_environment() try: - if environment.update_task_enabled(): + if update_task_enabled(): logs.info("Running update task.") # Run regular updates. # TODO(metzman): Move this after utask_main execution @@ -193,6 +195,47 @@ def task_loop(): return stacktrace, clean_exit, task_payload +def update_task_enabled() -> bool: + """ It uses the GCE VM metadata server `update_task_enabled` flag. + + This flag will be used to rollout the update_task deprecation + by disabling it progressively for each instance group through + the instance template metadata + """ + metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" + + "instance/attributes/") + metadata_header = {"Metadata-Flavor": "Google"} + metadata_key = "update_task_enabled" + + running_on_batch = bool(environment.is_uworker()) + + try: + # Construct the full URL for your specific metadata key + response = requests.get( + f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10) + + # Raise an exception for bad status codes (4xx or 5xx) + response.raise_for_status() + + # The metadata value is in the response text + metadata_value = response.text + logs.info(f"The value for '{metadata_key}' is: {metadata_value}") + is_update_task_enabled = metadata_value.lower() != 'false' + + # The flag is_uworker is true for Batch environment + # The update task should run if it's not a Batch environment + # and the flag is enabled on the VM template metadata + return not running_on_batch and is_update_task_enabled + + except requests.exceptions.HTTPError as http_error: + logs.warning(f"Http error fetching metadata: {http_error}") + + except Exception as ex: + logs.error(f"Unknown exception fetching metadata: {ex}") + + return not running_on_batch + + def main(): """Prepare the configuration options and start requesting tasks.""" logs.configure('run_bot')