Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def uworker_main(input_download_url) -> None:
_start_web_server_if_needed(uworker_input.job_type)

utask_module = get_utask_module(uworker_input.module_name)
execution_mode = Mode.SWARMING if environment.is_swarming_bot(
execution_mode = Mode.SWARMING if environment.is_running_on_swarming(
) else Mode.BATCH
recorder.set_task_details(
utask_module, uworker_input.job_type, execution_mode,
Expand Down
53 changes: 36 additions & 17 deletions src/clusterfuzz/_internal/metrics/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from typing import NamedTuple
from typing import TYPE_CHECKING

from clusterfuzz._internal.system import environment

# This is needed to avoid circular import
if TYPE_CHECKING:
from clusterfuzz._internal.cron.grouper import TestcaseAttributes
Expand Down Expand Up @@ -93,26 +95,18 @@ def _console_logging_enabled():

# TODO(pmeuleman) Revert the changeset that added these once
# https://github.com/google/clusterfuzz/pull/3422 lands.
def _file_logging_enabled():
def _file_logging_enabled() -> bool:
"""Return bool True when logging to files (bot/logs/*.log) is enabled.
This is enabled by default.
This is disabled if we are running in app engine or kubernetes as these have
their dedicated loggers, see configure_appengine() and configure_k8s().
"""
return bool(os.getenv(
'LOG_TO_FILE',
'True')) and not _is_running_on_app_engine() and not _is_running_on_k8s()
This is enabled by default."""
return environment.get_value('LOG_TO_FILE', True)
Comment thread
IvanBM18 marked this conversation as resolved.


def _cloud_logging_enabled():
def _cloud_logging_enabled() -> bool:
"""Return bool True where Google Cloud Logging is enabled.
This is enabled by default.
This is disabled for local development and if we are running in a app engine
or kubernetes as these have their dedicated loggers, see
configure_appengine() and configure_k8s()."""
return (bool(os.getenv('LOG_TO_GCP', 'True')) and
not os.getenv("PY_UNITTESTS") and not _is_local() and
not _is_running_on_app_engine() and not _is_running_on_k8s())
This is enabled by default but disabled for local development."""
return environment.get_value('LOG_TO_GCP',
True) and (not os.getenv('PY_UNITTESTS') and
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I would move this PY_UNITTESTS check to its own helper method in environment.py: is_running_unit_tests().

not _is_local())


def suppress_unwanted_warnings():
Expand Down Expand Up @@ -554,12 +548,38 @@ def cloud_label_filter(record):
logging.getLogger().addHandler(handler)


def configure_swarming(name: str, extras: dict[str, str] = None) -> None:
Comment thread
IvanBM18 marked this conversation as resolved.
Outdated
"""Configure logging for swarming bots."""
if extras is None:
extras = {}
Comment thread
letitz marked this conversation as resolved.
extras['task_id'] = os.getenv('TASK_ID')
extras['instance_id'] = os.getenv('BOT_NAME')
extras['platform'] = 'swarming'

global _default_extras
_default_extras = extras

logging.basicConfig(level=logging.INFO)
if _cloud_logging_enabled():
configure_cloud_logging()

logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
set_logger(logger)

sys.excepthook = uncaught_exception_handler


def configure(name, extras=None):
"""Set logger. See the list of loggers in bot/config/logging.yaml.
Also configures the process to log any uncaught exceptions as an error.
|extras| will be included by emit() in log messages."""
suppress_unwanted_warnings()

if environment.is_running_on_swarming():
configure_swarming(name, extras)
return

if _is_running_on_k8s():
configure_k8s()
return
Expand Down Expand Up @@ -792,7 +812,6 @@ def get_common_log_context() -> dict[str, str]:
"""Return common context to be propagated by logs."""
# Avoid circular imports on the top level.
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.system import environment

try:
os_type = environment.platform()
Expand Down
47 changes: 2 additions & 45 deletions src/clusterfuzz/_internal/system/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import sys
import uuid

import requests
import yaml

from clusterfuzz._internal import fuzzing
from clusterfuzz._internal.metrics import logs

# Tools supporting customization of options via ADDITIONAL_{TOOL_NAME}_OPTIONS.
# FIXME: Support ADDITIONAL_UBSAN_OPTIONS and ADDITIONAL_LSAN_OPTIONS in an
Expand Down Expand Up @@ -749,9 +747,9 @@ def get_runtime() -> UtaskMainRuntime:
return UtaskMainRuntime.INSTANCE_GROUP


def is_swarming_bot():
def is_running_on_swarming() -> bool:
"""Return whether or not the current bot is a swarming bot."""
return get_value('SWARMING_BOT')
return get_value('SWARMING_BOT') is True


def is_running_on_app_engine():
Expand Down Expand Up @@ -1238,44 +1236,3 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id):

def is_tworker():
return get_value('TWORKER', False)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviews:
To remove a circular dependency between logs.py and environment.py we moved this method into run_bot.py, this method required the logs module hence causing a circular dependency. Its safe to remove it from here as this was really only used at run_bot.py


def update_task_enabled() -> bool:
""" It uses the GCE VM metadata server `update_task_enabled` flag.

This flag will be used to rollout the update_task deprecation
by disabling it progressively for each instance group through
the instance template metadata
"""
metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" +
"instance/attributes/")
metadata_header = {"Metadata-Flavor": "Google"}
metadata_key = "update_task_enabled"

running_on_batch = bool(is_uworker())

try:
# Construct the full URL for your specific metadata key
response = requests.get(
f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10)

# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()

# The metadata value is in the response text
metadata_value = response.text
logs.info(f"The value for '{metadata_key}' is: {metadata_value}")
is_update_task_enabled = metadata_value.lower() != 'false'

# The flag is_uworker is true for Batch environment
# The update task should run if it's not a Batch environment
# and the flag is enabled on the VM template metadata
return not running_on_batch and is_update_task_enabled

except requests.exceptions.HTTPError as http_error:
logs.warning(f"Http error fetching metadata: {http_error}")

except Exception as ex:
logs.error(f"Unknown exception fetching metadata: {ex}")

return not running_on_batch
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def setUp(self):
'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_input',
'clusterfuzz._internal.bot.tasks.utasks.uworker_io.serialize_and_upload_uworker_output',
'clusterfuzz._internal.bot.tasks.utasks.get_utask_module',
'clusterfuzz._internal.system.environment.is_swarming_bot',
'clusterfuzz._internal.system.environment.is_running_on_swarming',
'clusterfuzz._internal.metrics.events.emit',
])
self.module = mock.MagicMock(__name__='tasks.analyze_task')
Expand All @@ -210,10 +210,8 @@ def test_uworker_main(self, execution_mode: utasks.Mode):
"""Tests that uworker_main works as intended."""
start_time_ns = time.time_ns()

if execution_mode == utasks.Mode.SWARMING:
self.mock.is_swarming_bot.return_value = True # pylint: disable=protected-access
else:
self.mock.is_swarming_bot.return_value = False
self.mock.is_running_on_swarming.return_value = (
execution_mode == utasks.Mode.SWARMING)

preprocess_start_time_ns = start_time_ns - 42 * 10**9 # In the past.
preprocess_start_timestamp = timestamp_pb2.Timestamp()
Expand Down
20 changes: 20 additions & 0 deletions src/clusterfuzz/_internal/tests/core/metrics/logs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,26 @@ def test_configure_appengine(self):
logs.configure('test')
self.assertEqual(0, self.mock.dictConfig.call_count)

def test_configure_swarming(self):
"""Test configure for swarming bot."""
# pylint: disable=protected-access
os.environ['SWARMING_BOT'] = 'True'
os.environ['TASK_ID'] = 'task-123'
os.environ['BOT_NAME'] = 'bot-123'

helpers.patch(
self,
['clusterfuzz._internal.system.environment.is_running_on_swarming'])

logger = mock.MagicMock()
self.mock.getLogger.return_value = logger

logs.configure('test')

self.assertEqual(logs._default_extras['task_id'], 'task-123')
self.assertEqual(logs._default_extras['instance_id'], 'bot-123')
self.assertEqual(logs._default_extras['platform'], 'swarming')


@test_utils.with_cloud_emulators('datastore')
class EmitTest(unittest.TestCase):
Expand Down
45 changes: 44 additions & 1 deletion src/python/bot/startup/run_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import time
import traceback

import requests

from clusterfuzz._internal.base import dates
from clusterfuzz._internal.base import errors
from clusterfuzz._internal.base import tasks
Expand Down Expand Up @@ -125,7 +127,7 @@ def task_loop():
# This caches the current environment on first run. Don't move this.
environment.reset_environment()
try:
if environment.update_task_enabled():
if update_task_enabled():
logs.info("Running update task.")
# Run regular updates.
# TODO(metzman): Move this after utask_main execution
Expand Down Expand Up @@ -193,6 +195,47 @@ def task_loop():
return stacktrace, clean_exit, task_payload


def update_task_enabled() -> bool:
Copy link
Copy Markdown
Collaborator Author

@IvanBM18 IvanBM18 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See context at context for this change at this comment and this other comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention this change in the PR description (which will become the commit message).

""" It uses the GCE VM metadata server `update_task_enabled` flag.

This flag will be used to rollout the update_task deprecation
by disabling it progressively for each instance group through
the instance template metadata
"""
metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" +
"instance/attributes/")
metadata_header = {"Metadata-Flavor": "Google"}
metadata_key = "update_task_enabled"

running_on_batch = bool(environment.is_uworker())

try:
# Construct the full URL for your specific metadata key
response = requests.get(
f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10)

# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()

# The metadata value is in the response text
metadata_value = response.text
logs.info(f"The value for '{metadata_key}' is: {metadata_value}")
is_update_task_enabled = metadata_value.lower() != 'false'

# The flag is_uworker is true for Batch environment
# The update task should run if it's not a Batch environment
# and the flag is enabled on the VM template metadata
return not running_on_batch and is_update_task_enabled

except requests.exceptions.HTTPError as http_error:
logs.warning(f"Http error fetching metadata: {http_error}")

except Exception as ex:
logs.error(f"Unknown exception fetching metadata: {ex}")

return not running_on_batch


def main():
"""Prepare the configuration options and start requesting tasks."""
logs.configure('run_bot')
Expand Down
Loading