From 99bf603570db94c78542b78fdca818831a4e2819 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 05:27:10 +0000 Subject: [PATCH 1/3] Add support for Swarming preprocess queue and task scheduling --- .../_internal/base/feature_flags.py | 2 + .../_internal/base/tasks/__init__.py | 5 + .../_internal/cron/schedule_fuzz.py | 132 ++++++++++-------- .../handlers/cron/schedule_fuzz_test.py | 20 +-- 4 files changed, 91 insertions(+), 68 deletions(-) diff --git a/src/clusterfuzz/_internal/base/feature_flags.py b/src/clusterfuzz/_internal/base/feature_flags.py index 9bea8c41f97..0b0e24750e6 100644 --- a/src/clusterfuzz/_internal/base/feature_flags.py +++ b/src/clusterfuzz/_internal/base/feature_flags.py @@ -36,6 +36,8 @@ class FeatureFlags(Enum): PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit' SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution' + # TODO(ibarba): Set this value based off dev & stage metrics and tests. + SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit' @property def flag(self): diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ae0c4e9ed02..156f57e6d7f 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -101,6 +101,11 @@ def get_task_duration(command): UTASK_MAIN_QUEUE = 'utask_main' PREPROCESS_QUEUE = 'preprocess' +SWARMING_QUEUES = { + PREPROCESS_QUEUE: 'preprocess_swarming', + UTASK_MAIN_QUEUE: 'utask_main_swarming', +} + # See https://github.com/google/clusterfuzz/issues/3347 for usage SUBQUEUE_IDENTIFIER = ':' diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index e032923bad8..89708489062 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -13,22 +13,25 @@ # limitations under the License. """Cron job to schedule fuzz tasks that run on batch.""" +from abc import ABC +from abc import abstractmethod import collections import random import time from google.cloud import monitoring_v3 -from clusterfuzz._internal.base import feature_flags from clusterfuzz._internal.base import memoize from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils +from clusterfuzz._internal.base.feature_flags import FeatureFlags from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs PREPROCESS_TARGET_SIZE_DEFAULT = 10000 +SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 @memoize.wrap(memoize.InMemory(60)) @@ -62,15 +65,55 @@ def get_queue_size(creds, project_id, subscription_id): return 0 -class BaseFuzzTaskScheduler: +class BaseFuzzTaskScheduler(ABC): """Base fuzz task scheduler for any deployment of ClusterFuzz.""" - def __init__(self, num_tasks): - self.num_tasks = num_tasks - - def get_fuzz_tasks(self): + @abstractmethod + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: raise NotImplementedError('Child class must implement.') + def schedule_fuzz_tasks(self) -> bool: + """Schedules fuzz tasks.""" + return self._schedule_fuzz_tasks() + + def _schedule_fuzz_tasks( + self, + queue: str = tasks.PREPROCESS_QUEUE, + default_target_size: int = PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag: FeatureFlags = FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT + ) -> bool: + """Internal method to schedule fuzz tasks.""" + project = utils.get_application_id() + start = time.time() + creds = credentials.get_default()[0] + preprocess_queue_size = get_queue_size(creds, project, queue) + + target_size = default_target_size + if target_size_flag.enabled and target_size_flag.content: + target_size = int(target_size_flag.content) + + num_tasks = target_size - preprocess_queue_size + logs.info(f'Queue {queue} size: {preprocess_queue_size}. ' + f'Target: {target_size}. Needed: {num_tasks}.') + + if num_tasks <= 0: + logs.info('Queue size met or exceeded. Not scheduling tasks.') + return False + + fuzz_tasks = self.get_fuzz_tasks(num_tasks) + if not fuzz_tasks: + logs.error('No fuzz tasks found to schedule.') + return False + + logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') + tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') + return True + class FuzzTaskCandidate: """Data class that holds more info about FuzzerJobs than the ndb.Models do. @@ -101,7 +144,7 @@ def copy(self): class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for OSS-Fuzz.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: # TODO(metzman): Handle high end. # A job's weight is determined by its own weight and the weight of the # project is a part of. First get project weights. @@ -164,11 +207,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: for fuzz_task_candidate in fuzz_task_candidates: weights.append(fuzz_task_candidate.weight) - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.') - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -186,7 +227,7 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for Chrome.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: """Returns fuzz tasks for chrome, weighted by job weight.""" logs.info('Getting jobs for Chrome.') @@ -214,14 +255,12 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: fuzz_task_candidates.append(fuzz_task_candidate) weights = [candidate.weight for candidate in fuzz_task_candidates] - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') if not fuzz_task_candidates: return [] - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -232,51 +271,28 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: ] return fuzz_tasks + def _schedule_swarming_fuzz_tasks(self) -> bool: + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return False -def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: - if utils.is_oss_fuzz(): - scheduler = OssfuzzFuzzTaskScheduler(num_tasks) - else: - scheduler = ChromeFuzzTaskScheduler(num_tasks) - fuzz_tasks = scheduler.get_fuzz_tasks() - return fuzz_tasks - - -def schedule_fuzz_tasks() -> bool: - """Schedules fuzz tasks.""" - - project = utils.get_application_id() - start = time.time() - creds = credentials.get_default()[0] - preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) + swarming_preprocess_queue = tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE] + return self._schedule_fuzz_tasks( + queue=swarming_preprocess_queue, + default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT) - target_size = PREPROCESS_TARGET_SIZE_DEFAULT - target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT - if target_size_flag.enabled and target_size_flag.content: - target_size = int(target_size_flag.content) + def _schedule_batch_fuzz_tasks(self) -> bool: + return self._schedule_fuzz_tasks() - num_tasks = target_size - preprocess_queue_size - logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' - f'Target: {target_size}. Needed: {num_tasks}.') - - if num_tasks <= 0: - logs.info('Queue size met or exceeded. Not scheduling tasks.') - return False - - fuzz_tasks = get_fuzz_tasks(num_tasks) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False - - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') - return True + def schedule_fuzz_tasks(self) -> bool: + self._schedule_swarming_fuzz_tasks() + self._schedule_batch_fuzz_tasks() + return True def main(): - return schedule_fuzz_tasks() + if utils.is_oss_fuzz(): + scheduler = OssfuzzFuzzTaskScheduler() + else: + scheduler = ChromeFuzzTaskScheduler() + return scheduler.schedule_fuzz_tasks() diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index ae23d78fd07..a007004a082 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self): data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() num_tasks = 5 - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks) comparable_results = [] for task in tasks: comparable_results.append((task.command, task.argument, task.job)) @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self): name=project_name, base_os_version='project-version').put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -186,8 +186,8 @@ def test_os_version_no_version(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -216,8 +216,8 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) return tasks[0] From 07bae1828b1418bd618c60ced70ab1025943636b Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 23:09:55 +0000 Subject: [PATCH 2/3] Update queue name to use correct one --- src/clusterfuzz/_internal/base/tasks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 156f57e6d7f..7b054d2ee96 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -102,8 +102,8 @@ def get_task_duration(command): PREPROCESS_QUEUE = 'preprocess' SWARMING_QUEUES = { - PREPROCESS_QUEUE: 'preprocess_swarming', - UTASK_MAIN_QUEUE: 'utask_main_swarming', + PREPROCESS_QUEUE: 'preprocess-swarming', + UTASK_MAIN_QUEUE: 'utask_main-swarming', } # See https://github.com/google/clusterfuzz/issues/3347 for usage From de1956810f85424d001eab237bd4937458d129bd Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 20 May 2026 05:23:13 +0000 Subject: [PATCH 3/3] Add Android as a requested platform for chrome jobs --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 5 +++-- .../handlers/cron/schedule_fuzz_test.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 89708489062..89abcbdce58 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -232,8 +232,9 @@ def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: logs.info('Getting jobs for Chrome.') candidates_by_job = {} - # Only consider linux jobs for chrome fuzzing. - job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') + # Only consider LINUX or ANDROID jobs + job_query = data_types.Job.query( + data_types.Job.platform.IN(['LINUX', 'ANDROID'])) for job in ndb_utils.get_all_from_query(job_query): base_os_version = None if job.base_os_version: diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index a007004a082..9caa08c7aba 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -232,3 +232,19 @@ def test_os_version_job_without_version(self): self._setup_chrome_entities() task = self._run_and_get_task() self.assertIsNone(task.extra_info.get('base_os_version')) + + def test_job_filtering(self): + """Tests that jobs are filtered by platform""" + # Test wrong platform. + data_types.Job( + name='windows_job', + project='chrome', + platform='WINDOWS', + environment_string='IS_SWARMING_JOB = True').put() + data_types.FuzzerJob( + job='windows_job', platform='WINDOWS', fuzzer='libFuzzer', + weight=1.0).put() + + scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) + self.assertEqual(len(tasks), 0)