From 99bf603570db94c78542b78fdca818831a4e2819 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 05:27:10 +0000 Subject: [PATCH 1/4] Add support for Swarming preprocess queue and task scheduling --- .../_internal/base/feature_flags.py | 2 + .../_internal/base/tasks/__init__.py | 5 + .../_internal/cron/schedule_fuzz.py | 132 ++++++++++-------- .../handlers/cron/schedule_fuzz_test.py | 20 +-- 4 files changed, 91 insertions(+), 68 deletions(-) diff --git a/src/clusterfuzz/_internal/base/feature_flags.py b/src/clusterfuzz/_internal/base/feature_flags.py index 9bea8c41f97..0b0e24750e6 100644 --- a/src/clusterfuzz/_internal/base/feature_flags.py +++ b/src/clusterfuzz/_internal/base/feature_flags.py @@ -36,6 +36,8 @@ class FeatureFlags(Enum): PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit' SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution' + # TODO(ibarba): Set this value based off dev & stage metrics and tests. + SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit' @property def flag(self): diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index ae0c4e9ed02..156f57e6d7f 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -101,6 +101,11 @@ def get_task_duration(command): UTASK_MAIN_QUEUE = 'utask_main' PREPROCESS_QUEUE = 'preprocess' +SWARMING_QUEUES = { + PREPROCESS_QUEUE: 'preprocess_swarming', + UTASK_MAIN_QUEUE: 'utask_main_swarming', +} + # See https://github.com/google/clusterfuzz/issues/3347 for usage SUBQUEUE_IDENTIFIER = ':' diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index e032923bad8..89708489062 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -13,22 +13,25 @@ # limitations under the License. """Cron job to schedule fuzz tasks that run on batch.""" +from abc import ABC +from abc import abstractmethod import collections import random import time from google.cloud import monitoring_v3 -from clusterfuzz._internal.base import feature_flags from clusterfuzz._internal.base import memoize from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils +from clusterfuzz._internal.base.feature_flags import FeatureFlags from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs PREPROCESS_TARGET_SIZE_DEFAULT = 10000 +SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 @memoize.wrap(memoize.InMemory(60)) @@ -62,15 +65,55 @@ def get_queue_size(creds, project_id, subscription_id): return 0 -class BaseFuzzTaskScheduler: +class BaseFuzzTaskScheduler(ABC): """Base fuzz task scheduler for any deployment of ClusterFuzz.""" - def __init__(self, num_tasks): - self.num_tasks = num_tasks - - def get_fuzz_tasks(self): + @abstractmethod + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: raise NotImplementedError('Child class must implement.') + def schedule_fuzz_tasks(self) -> bool: + """Schedules fuzz tasks.""" + return self._schedule_fuzz_tasks() + + def _schedule_fuzz_tasks( + self, + queue: str = tasks.PREPROCESS_QUEUE, + default_target_size: int = PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag: FeatureFlags = FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT + ) -> bool: + """Internal method to schedule fuzz tasks.""" + project = utils.get_application_id() + start = time.time() + creds = credentials.get_default()[0] + preprocess_queue_size = get_queue_size(creds, project, queue) + + target_size = default_target_size + if target_size_flag.enabled and target_size_flag.content: + target_size = int(target_size_flag.content) + + num_tasks = target_size - preprocess_queue_size + logs.info(f'Queue {queue} size: {preprocess_queue_size}. ' + f'Target: {target_size}. Needed: {num_tasks}.') + + if num_tasks <= 0: + logs.info('Queue size met or exceeded. Not scheduling tasks.') + return False + + fuzz_tasks = self.get_fuzz_tasks(num_tasks) + if not fuzz_tasks: + logs.error('No fuzz tasks found to schedule.') + return False + + logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') + tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') + return True + class FuzzTaskCandidate: """Data class that holds more info about FuzzerJobs than the ndb.Models do. @@ -101,7 +144,7 @@ def copy(self): class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for OSS-Fuzz.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: # TODO(metzman): Handle high end. # A job's weight is determined by its own weight and the weight of the # project is a part of. First get project weights. @@ -164,11 +207,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: for fuzz_task_candidate in fuzz_task_candidates: weights.append(fuzz_task_candidate.weight) - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.') - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -186,7 +227,7 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for Chrome.""" - def get_fuzz_tasks(self) -> list[tasks.Task]: + def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: """Returns fuzz tasks for chrome, weighted by job weight.""" logs.info('Getting jobs for Chrome.') @@ -214,14 +255,12 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: fuzz_task_candidates.append(fuzz_task_candidate) weights = [candidate.weight for candidate in fuzz_task_candidates] - fuzz_tasks_count = self.num_tasks - logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') + logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') if not fuzz_task_candidates: return [] - choices = random.choices( - fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) + choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -232,51 +271,28 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: ] return fuzz_tasks + def _schedule_swarming_fuzz_tasks(self) -> bool: + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return False -def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: - if utils.is_oss_fuzz(): - scheduler = OssfuzzFuzzTaskScheduler(num_tasks) - else: - scheduler = ChromeFuzzTaskScheduler(num_tasks) - fuzz_tasks = scheduler.get_fuzz_tasks() - return fuzz_tasks - - -def schedule_fuzz_tasks() -> bool: - """Schedules fuzz tasks.""" - - project = utils.get_application_id() - start = time.time() - creds = credentials.get_default()[0] - preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) + swarming_preprocess_queue = tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE] + return self._schedule_fuzz_tasks( + queue=swarming_preprocess_queue, + default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, + target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT) - target_size = PREPROCESS_TARGET_SIZE_DEFAULT - target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT - if target_size_flag.enabled and target_size_flag.content: - target_size = int(target_size_flag.content) + def _schedule_batch_fuzz_tasks(self) -> bool: + return self._schedule_fuzz_tasks() - num_tasks = target_size - preprocess_queue_size - logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' - f'Target: {target_size}. Needed: {num_tasks}.') - - if num_tasks <= 0: - logs.info('Queue size met or exceeded. Not scheduling tasks.') - return False - - fuzz_tasks = get_fuzz_tasks(num_tasks) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False - - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') - return True + def schedule_fuzz_tasks(self) -> bool: + self._schedule_swarming_fuzz_tasks() + self._schedule_batch_fuzz_tasks() + return True def main(): - return schedule_fuzz_tasks() + if utils.is_oss_fuzz(): + scheduler = OssfuzzFuzzTaskScheduler() + else: + scheduler = ChromeFuzzTaskScheduler() + return scheduler.schedule_fuzz_tasks() diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index ae23d78fd07..a007004a082 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self): data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() num_tasks = 5 - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks) comparable_results = [] for task in tasks: comparable_results.append((task.command, task.argument, task.job)) @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self): name=project_name, base_os_version='project-version').put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -186,8 +186,8 @@ def test_os_version_no_version(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) task = tasks[0] @@ -216,8 +216,8 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1) - tasks = scheduler.get_fuzz_tasks() + scheduler = schedule_fuzz.ChromeFuzzTaskScheduler() + tasks = scheduler.get_fuzz_tasks(num_tasks=1) self.assertEqual(len(tasks), 1) return tasks[0] From 07bae1828b1418bd618c60ced70ab1025943636b Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 23:09:55 +0000 Subject: [PATCH 2/4] Update queue name to use correct one --- src/clusterfuzz/_internal/base/tasks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 156f57e6d7f..7b054d2ee96 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -102,8 +102,8 @@ def get_task_duration(command): PREPROCESS_QUEUE = 'preprocess' SWARMING_QUEUES = { - PREPROCESS_QUEUE: 'preprocess_swarming', - UTASK_MAIN_QUEUE: 'utask_main_swarming', + PREPROCESS_QUEUE: 'preprocess-swarming', + UTASK_MAIN_QUEUE: 'utask_main-swarming', } # See https://github.com/google/clusterfuzz/issues/3347 for usage From ebbb9f4c378be71fc8dd2a2eea9131d8e1888292 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 18 May 2026 23:20:40 +0000 Subject: [PATCH 3/4] Pull and push utask_main tasks to Swarming queues --- .../_internal/base/tasks/__init__.py | 30 ++++++++++-- .../tests/core/base/tasks/tasks_test.py | 47 +++++++++++++++++++ .../tests/core/bot/startup/run_bot_test.py | 42 ++++++++++++++++- src/python/bot/startup/run_bot.py | 9 +++- 4 files changed, 121 insertions(+), 7 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 7b054d2ee96..95feb76a380 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -686,10 +686,9 @@ def get_task_from_message(message, queue=None, can_defer=True, return task -def get_utask_mains() -> List[PubSubTask]: +def get_utask_mains(queue_name: str = UTASK_MAIN_QUEUE) -> List[PubSubTask]: # pylint: disable=dangerous-default-value """Returns a list of tasks for preprocessing many utasks on this bot and then running the uworker_mains in the same batch job.""" - queue_name = UTASK_MAIN_QUEUE base_os_version = environment.get_value('BASE_OS_VERSION') if base_os_version: queue_name = f'{queue_name}-{base_os_version}' @@ -855,17 +854,40 @@ def run(self): logs.error('Leaser thread failed.') -def add_utask_main(command, input_url, job_type, wait_time=None): +def get_target_runtime(job_type: str) -> str: + """Returns the target runtime for a job type.""" + target_runtime = environment.UtaskMainRuntime.BATCH.value + + if job_type != 'none': + job = data_types.Job.query(data_types.Job.name == job_type).get() + if job: + job_environment = job.get_environment() + if utils.string_is_true(job_environment.get('IS_SWARMING_JOB')): + target_runtime = environment.UtaskMainRuntime.SWARMING.value + elif utils.string_is_true(job_environment.get('IS_K8S_ENV')): + target_runtime = environment.UtaskMainRuntime.KATA_CONTAINER.value + + return target_runtime + + +def add_utask_main(command: str, + input_url: str, + job_type: str, + wait_time: Optional[int] = None) -> None: """Adds the utask_main portion of a utask to the utasks queue for scheduling on batch. This should only be done after preprocessing.""" initial_command = environment.get_value('TASK_PAYLOAD') + target_runtime = get_target_runtime(job_type) add_task( command, input_url, job_type, queue=UTASK_MAIN_QUEUE, wait_time=wait_time, - extra_info={'initial_command': initial_command}) + extra_info={ + 'initial_command': initial_command, + 'target_runtime': target_runtime, + }) def bulk_add_tasks(tasks, queue=None, eta_now=False): diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index 68b4ebe5154..22e619dbd06 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -493,6 +493,21 @@ def test_get_utask_mains_with_os_version(self, mock_env_get, mock_puller): tasks.get_utask_mains() mock_puller.assert_called_with('utask_main-ubuntu-24-04') + def test_get_utask_mains_with_custom_queue(self, mock_env_get, mock_puller): + """Tests that get_utask_mains selects the custom queue.""" + mock_puller.return_value.get_messages_time_limited.return_value = [] + mock_env_get.return_value = None + tasks.get_utask_mains(queue_name='custom_queue') + mock_puller.assert_called_with('custom_queue') + + def test_get_utask_mains_with_custom_queue_and_os_version( + self, mock_env_get, mock_puller): + """Tests that get_utask_mains selects the custom queue with OS suffix.""" + mock_puller.return_value.get_messages_time_limited.return_value = [] + mock_env_get.return_value = 'ubuntu-24-04' + tasks.get_utask_mains(queue_name='custom_queue') + mock_puller.assert_called_with('custom_queue-ubuntu-24-04') + @mock.patch('clusterfuzz._internal.system.environment.get_value') @mock.patch('clusterfuzz._internal.system.environment.platform') @@ -535,3 +550,35 @@ def test_default_queue_suffix_mac_with_os_version(self, mock_platform, }.get(key, default) mock_platform.return_value = 'MAC' self.assertEqual(tasks.default_queue_suffix(), '-mac') + + +@test_utils.with_cloud_emulators('datastore') +class GetTargetRuntimeTest(unittest.TestCase): + """Tests for get_target_runtime.""" + + def test_job_type_none(self): + """Test that job_type='none' returns BATCH.""" + self.assertEqual(tasks.get_target_runtime('none'), 'batch') + + def test_job_not_found(self): + """Test that a non-existent job returns BATCH.""" + self.assertEqual(tasks.get_target_runtime('nonexistent_job'), 'batch') + + def test_swarming_job(self): + """Test that a job with IS_SWARMING_JOB=True returns SWARMING.""" + job = data_types.Job( + name='swarming_job', environment_string='IS_SWARMING_JOB = True') + job.put() + self.assertEqual(tasks.get_target_runtime('swarming_job'), 'swarming') + + def test_k8s_job(self): + """Test that a job with IS_K8S_ENV=True returns KATA_CONTAINER.""" + job = data_types.Job(name='k8s_job', environment_string='IS_K8S_ENV = True') + job.put() + self.assertEqual(tasks.get_target_runtime('k8s_job'), 'kata_container') + + def test_default_batch_job(self): + """Test that a job without special env vars returns BATCH.""" + job = data_types.Job(name='default_job', environment_string='FOO = BAR') + job.put() + self.assertEqual(tasks.get_target_runtime('default_job'), 'batch') diff --git a/src/clusterfuzz/_internal/tests/core/bot/startup/run_bot_test.py b/src/clusterfuzz/_internal/tests/core/bot/startup/run_bot_test.py index 585ec248c32..9d700900a6d 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/startup/run_bot_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/startup/run_bot_test.py @@ -21,6 +21,7 @@ from clusterfuzz._internal.metrics import monitor from clusterfuzz._internal.metrics import monitoring_metrics from clusterfuzz._internal.tests.test_libs import helpers +from clusterfuzz._internal.tests.test_libs import test_utils from python.bot.startup import run_bot @@ -128,6 +129,7 @@ def test_lease_all_tasks_on_pubsubtasks(self): lease.assert_called_with() +@test_utils.with_cloud_emulators('datastore') class ScheduleUtaskMainsTest(unittest.TestCase): """Tests for schedule_utask_mains.""" @@ -136,6 +138,12 @@ def setUp(self): 'clusterfuzz._internal.base.tasks.get_utask_mains', 'clusterfuzz._internal.remote_task.remote_task_gate.RemoteTaskGate', ]) + patcher = mock.patch( + 'clusterfuzz._internal.base.feature_flags.FeatureFlags.enabled', + new_callable=mock.PropertyMock) + self.mock_swarming_enabled = patcher.start() + self.mock_swarming_enabled.return_value = False + self.addCleanup(patcher.stop) def test_schedule_tasks_requeue_uncreated(self): """Test that uncreated tasks are not acked.""" @@ -146,7 +154,7 @@ def test_schedule_tasks_requeue_uncreated(self): mock_task.lease.return_value.__enter__.return_value = None mock_task.lease.return_value.__exit__.return_value = None - self.mock.get_utask_mains.return_value = [mock_task] + self.mock.get_utask_mains.side_effect = [[mock_task], []] # Simulate that the tasks were not created and returned back. self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.side_effect = lambda tasks: tasks @@ -168,10 +176,40 @@ def test_schedule_tasks_success(self): mock_task.lease.return_value.__enter__.return_value = None mock_task.lease.return_value.__exit__.return_value = None - self.mock.get_utask_mains.return_value = [mock_task] + self.mock.get_utask_mains.side_effect = [[mock_task], []] self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.return_value = [] run_bot.schedule_utask_mains() self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.assert_called_once( ) + + def test_schedule_tasks_both_queues(self): + """Test scheduling tasks from both queues.""" + mock_task1 = mock.MagicMock() + mock_task1.command = 'command1' + mock_task1.job = 'job1' + mock_task1.argument = 'argument1' + mock_task1.lease.return_value.__enter__.return_value = None + mock_task1.lease.return_value.__exit__.return_value = None + + mock_task2 = mock.MagicMock() + mock_task2.command = 'command2' + mock_task2.job = 'job2' + mock_task2.argument = 'argument2' + mock_task2.lease.return_value.__enter__.return_value = None + mock_task2.lease.return_value.__exit__.return_value = None + + self.mock.get_utask_mains.side_effect = [[mock_task1], [mock_task2]] + self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.return_value = [] + + self.mock_swarming_enabled.return_value = True + run_bot.schedule_utask_mains() + + self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.assert_called_once( + ) + call_args = self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.call_args[ + 0][0] + self.assertEqual(len(call_args), 2) + self.assertEqual(call_args[0].pubsub_task, mock_task1) + self.assertEqual(call_args[1].pubsub_task, mock_task2) diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index 6825a28e149..142383a0548 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -31,6 +31,7 @@ from clusterfuzz._internal.base import dates from clusterfuzz._internal.base import errors +from clusterfuzz._internal.base import feature_flags from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import untrusted from clusterfuzz._internal.base import utils @@ -92,7 +93,13 @@ def schedule_utask_mains(): """Schedules utask_mains from preprocessed utasks on remote backends.""" logs.info('Attempting to combine batch tasks.') - utask_mains = tasks.get_utask_mains() + utask_mains = tasks.get_utask_mains(tasks.UTASK_MAIN_QUEUE) + + if feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + swarming_queue = tasks.SWARMING_QUEUES[tasks.UTASK_MAIN_QUEUE] + swarming_utask_mains = tasks.get_utask_mains(swarming_queue) + utask_mains.extend(swarming_utask_mains) + if not utask_mains: logs.info('No utask mains.') return From 0d1afda5c15850a1076c86a4189f14ba7423b936 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 20 May 2026 05:46:52 +0000 Subject: [PATCH 4/4] Changes target queue to stop using filtered subscriptions --- .../_internal/base/tasks/__init__.py | 27 +++------------- .../_internal/bot/tasks/task_types.py | 11 +++++-- .../tests/core/base/tasks/tasks_test.py | 32 ------------------- 3 files changed, 13 insertions(+), 57 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 95feb76a380..7e9aa2cbe5b 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -854,40 +854,21 @@ def run(self): logs.error('Leaser thread failed.') -def get_target_runtime(job_type: str) -> str: - """Returns the target runtime for a job type.""" - target_runtime = environment.UtaskMainRuntime.BATCH.value - - if job_type != 'none': - job = data_types.Job.query(data_types.Job.name == job_type).get() - if job: - job_environment = job.get_environment() - if utils.string_is_true(job_environment.get('IS_SWARMING_JOB')): - target_runtime = environment.UtaskMainRuntime.SWARMING.value - elif utils.string_is_true(job_environment.get('IS_K8S_ENV')): - target_runtime = environment.UtaskMainRuntime.KATA_CONTAINER.value - - return target_runtime - - def add_utask_main(command: str, input_url: str, job_type: str, - wait_time: Optional[int] = None) -> None: + wait_time: Optional[int] = None, + queue_name: str = UTASK_MAIN_QUEUE) -> None: """Adds the utask_main portion of a utask to the utasks queue for scheduling on batch. This should only be done after preprocessing.""" initial_command = environment.get_value('TASK_PAYLOAD') - target_runtime = get_target_runtime(job_type) add_task( command, input_url, job_type, - queue=UTASK_MAIN_QUEUE, + queue=queue_name, wait_time=wait_time, - extra_info={ - 'initial_command': initial_command, - 'target_runtime': target_runtime, - }) + extra_info={'initial_command': initial_command}) def bulk_add_tasks(tasks, queue=None, eta_now=False): diff --git a/src/clusterfuzz/_internal/bot/tasks/task_types.py b/src/clusterfuzz/_internal/bot/tasks/task_types.py index 7f568516871..bb9f8b3fad9 100644 --- a/src/clusterfuzz/_internal/bot/tasks/task_types.py +++ b/src/clusterfuzz/_internal/bot/tasks/task_types.py @@ -177,9 +177,16 @@ def execute(self, task_argument, job_type, uworker_env): if download_url is None: return - logs.info('Queueing utask for remote execution.', download_url=download_url) assert batch_service.is_remote_task(command, job_type) - tasks.add_utask_main(command, download_url, job_type) + queue_name = tasks.UTASK_MAIN_QUEUE + if swarming.is_swarming_task(job_type): + queue_name = tasks.SWARMING_QUEUES[tasks.UTASK_MAIN_QUEUE] + + logs.info( + 'Queueing utask for remote execution.', + queue_name=queue_name, + download_url=download_url) + tasks.add_utask_main(command, download_url, job_type, queue_name) @logs.task_stage_context(logs.Stage.PREPROCESS) def preprocess(self, task_argument, job_type, uworker_env): diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index 22e619dbd06..5d7d8d8360c 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -550,35 +550,3 @@ def test_default_queue_suffix_mac_with_os_version(self, mock_platform, }.get(key, default) mock_platform.return_value = 'MAC' self.assertEqual(tasks.default_queue_suffix(), '-mac') - - -@test_utils.with_cloud_emulators('datastore') -class GetTargetRuntimeTest(unittest.TestCase): - """Tests for get_target_runtime.""" - - def test_job_type_none(self): - """Test that job_type='none' returns BATCH.""" - self.assertEqual(tasks.get_target_runtime('none'), 'batch') - - def test_job_not_found(self): - """Test that a non-existent job returns BATCH.""" - self.assertEqual(tasks.get_target_runtime('nonexistent_job'), 'batch') - - def test_swarming_job(self): - """Test that a job with IS_SWARMING_JOB=True returns SWARMING.""" - job = data_types.Job( - name='swarming_job', environment_string='IS_SWARMING_JOB = True') - job.put() - self.assertEqual(tasks.get_target_runtime('swarming_job'), 'swarming') - - def test_k8s_job(self): - """Test that a job with IS_K8S_ENV=True returns KATA_CONTAINER.""" - job = data_types.Job(name='k8s_job', environment_string='IS_K8S_ENV = True') - job.put() - self.assertEqual(tasks.get_target_runtime('k8s_job'), 'kata_container') - - def test_default_batch_job(self): - """Test that a job without special env vars returns BATCH.""" - job = data_types.Job(name='default_job', environment_string='FOO = BAR') - job.put() - self.assertEqual(tasks.get_target_runtime('default_job'), 'batch')