Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/clusterfuzz/_internal/base/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class FeatureFlags(Enum):
PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit'

SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution'
# TODO(ibarba): Set this value based off dev & stage metrics and tests.
SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit'

@property
def flag(self):
Expand Down
16 changes: 12 additions & 4 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def get_task_duration(command):
UTASK_MAIN_QUEUE = 'utask_main'
PREPROCESS_QUEUE = 'preprocess'

SWARMING_QUEUES = {
PREPROCESS_QUEUE: 'preprocess-swarming',
UTASK_MAIN_QUEUE: 'utask_main-swarming',
}

# See https://github.com/google/clusterfuzz/issues/3347 for usage
SUBQUEUE_IDENTIFIER = ':'

Expand Down Expand Up @@ -681,10 +686,9 @@ def get_task_from_message(message, queue=None, can_defer=True,
return task


def get_utask_mains() -> List[PubSubTask]:
def get_utask_mains(queue_name: str = UTASK_MAIN_QUEUE) -> List[PubSubTask]: # pylint: disable=dangerous-default-value
"""Returns a list of tasks for preprocessing many utasks on this bot and then
running the uworker_mains in the same batch job."""
queue_name = UTASK_MAIN_QUEUE
base_os_version = environment.get_value('BASE_OS_VERSION')
if base_os_version:
queue_name = f'{queue_name}-{base_os_version}'
Expand Down Expand Up @@ -850,15 +854,19 @@ def run(self):
logs.error('Leaser thread failed.')


def add_utask_main(command, input_url, job_type, wait_time=None):
def add_utask_main(command: str,
input_url: str,
job_type: str,
wait_time: Optional[int] = None,
queue_name: str = UTASK_MAIN_QUEUE) -> None:
"""Adds the utask_main portion of a utask to the utasks queue for scheduling
on batch. This should only be done after preprocessing."""
initial_command = environment.get_value('TASK_PAYLOAD')
add_task(
command,
input_url,
job_type,
queue=UTASK_MAIN_QUEUE,
queue=queue_name,
wait_time=wait_time,
extra_info={'initial_command': initial_command})

Expand Down
11 changes: 9 additions & 2 deletions src/clusterfuzz/_internal/bot/tasks/task_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,16 @@ def execute(self, task_argument, job_type, uworker_env):
if download_url is None:
return

logs.info('Queueing utask for remote execution.', download_url=download_url)
assert batch_service.is_remote_task(command, job_type)
tasks.add_utask_main(command, download_url, job_type)
queue_name = tasks.UTASK_MAIN_QUEUE
if swarming.is_swarming_task(job_type):
queue_name = tasks.SWARMING_QUEUES[tasks.UTASK_MAIN_QUEUE]

logs.info(
'Queueing utask for remote execution.',
queue_name=queue_name,
download_url=download_url)
tasks.add_utask_main(command, download_url, job_type, queue_name)

@logs.task_stage_context(logs.Stage.PREPROCESS)
def preprocess(self, task_argument, job_type, uworker_env):
Expand Down
132 changes: 74 additions & 58 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@
# limitations under the License.
"""Cron job to schedule fuzz tasks that run on batch."""

from abc import ABC
from abc import abstractmethod
import collections
import random
import time

from google.cloud import monitoring_v3

from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal.base import memoize
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.feature_flags import FeatureFlags
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs

PREPROCESS_TARGET_SIZE_DEFAULT = 10000
SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5


@memoize.wrap(memoize.InMemory(60))
Expand Down Expand Up @@ -62,15 +65,55 @@ def get_queue_size(creds, project_id, subscription_id):
return 0


class BaseFuzzTaskScheduler:
class BaseFuzzTaskScheduler(ABC):
"""Base fuzz task scheduler for any deployment of ClusterFuzz."""

def __init__(self, num_tasks):
self.num_tasks = num_tasks

def get_fuzz_tasks(self):
@abstractmethod
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
raise NotImplementedError('Child class must implement.')

def schedule_fuzz_tasks(self) -> bool:
"""Schedules fuzz tasks."""
return self._schedule_fuzz_tasks()

def _schedule_fuzz_tasks(
self,
queue: str = tasks.PREPROCESS_QUEUE,
default_target_size: int = PREPROCESS_TARGET_SIZE_DEFAULT,
target_size_flag: FeatureFlags = FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT
) -> bool:
"""Internal method to schedule fuzz tasks."""
project = utils.get_application_id()
start = time.time()
creds = credentials.get_default()[0]
preprocess_queue_size = get_queue_size(creds, project, queue)

target_size = default_target_size
if target_size_flag.enabled and target_size_flag.content:
target_size = int(target_size_flag.content)

num_tasks = target_size - preprocess_queue_size
logs.info(f'Queue {queue} size: {preprocess_queue_size}. '
f'Target: {target_size}. Needed: {num_tasks}.')

if num_tasks <= 0:
logs.info('Queue size met or exceeded. Not scheduling tasks.')
return False

fuzz_tasks = self.get_fuzz_tasks(num_tasks)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
return False

logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.')
tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')
return True


class FuzzTaskCandidate:
"""Data class that holds more info about FuzzerJobs than the ndb.Models do.
Expand Down Expand Up @@ -101,7 +144,7 @@ def copy(self):
class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler):
"""Fuzz task scheduler for OSS-Fuzz."""

def get_fuzz_tasks(self) -> list[tasks.Task]:
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
# TODO(metzman): Handle high end.
# A job's weight is determined by its own weight and the weight of the
# project is a part of. First get project weights.
Expand Down Expand Up @@ -164,11 +207,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
for fuzz_task_candidate in fuzz_task_candidates:
weights.append(fuzz_task_candidate.weight)

fuzz_tasks_count = self.num_tasks
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.')
logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.')

choices = random.choices(
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks)
fuzz_tasks = [
tasks.Task(
'fuzz',
Expand All @@ -186,7 +227,7 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler):
"""Fuzz task scheduler for Chrome."""

def get_fuzz_tasks(self) -> list[tasks.Task]:
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
"""Returns fuzz tasks for chrome, weighted by job weight."""
logs.info('Getting jobs for Chrome.')

Expand Down Expand Up @@ -214,14 +255,12 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
fuzz_task_candidates.append(fuzz_task_candidate)

weights = [candidate.weight for candidate in fuzz_task_candidates]
fuzz_tasks_count = self.num_tasks
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.')
logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.')

if not fuzz_task_candidates:
return []

choices = random.choices(
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks)
fuzz_tasks = [
tasks.Task(
'fuzz',
Expand All @@ -232,51 +271,28 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
]
return fuzz_tasks

def _schedule_swarming_fuzz_tasks(self) -> bool:
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
return False

def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]:
if utils.is_oss_fuzz():
scheduler = OssfuzzFuzzTaskScheduler(num_tasks)
else:
scheduler = ChromeFuzzTaskScheduler(num_tasks)
fuzz_tasks = scheduler.get_fuzz_tasks()
return fuzz_tasks


def schedule_fuzz_tasks() -> bool:
"""Schedules fuzz tasks."""

project = utils.get_application_id()
start = time.time()
creds = credentials.get_default()[0]
preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE)
swarming_preprocess_queue = tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE]
return self._schedule_fuzz_tasks(
queue=swarming_preprocess_queue,
default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT,
target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT)

target_size = PREPROCESS_TARGET_SIZE_DEFAULT
target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT
if target_size_flag.enabled and target_size_flag.content:
target_size = int(target_size_flag.content)
def _schedule_batch_fuzz_tasks(self) -> bool:
return self._schedule_fuzz_tasks()

num_tasks = target_size - preprocess_queue_size
logs.info(f'Preprocess queue size: {preprocess_queue_size}. '
f'Target: {target_size}. Needed: {num_tasks}.')

if num_tasks <= 0:
logs.info('Queue size met or exceeded. Not scheduling tasks.')
return False

fuzz_tasks = get_fuzz_tasks(num_tasks)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
return False

logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')
return True
def schedule_fuzz_tasks(self) -> bool:
self._schedule_swarming_fuzz_tasks()
self._schedule_batch_fuzz_tasks()
return True


def main():
return schedule_fuzz_tasks()
if utils.is_oss_fuzz():
scheduler = OssfuzzFuzzTaskScheduler()
else:
scheduler = ChromeFuzzTaskScheduler()
return scheduler.schedule_fuzz_tasks()
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self):
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

num_tasks = 5
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks)
tasks = scheduler.get_fuzz_tasks()
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler()
tasks = scheduler.get_fuzz_tasks(num_tasks)
comparable_results = []
for task in tasks:
comparable_results.append((task.command, task.argument, task.job))
Expand Down Expand Up @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self):
name=project_name, base_os_version='project-version').put()
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler()
tasks = scheduler.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
task = tasks[0]

Expand Down Expand Up @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self):
data_types.OssFuzzProject(name=project_name).put()
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler()
tasks = scheduler.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
task = tasks[0]

Expand Down Expand Up @@ -186,8 +186,8 @@ def test_os_version_no_version(self):
data_types.OssFuzzProject(name=project_name).put()
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler()
tasks = scheduler.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
task = tasks[0]

Expand Down Expand Up @@ -216,8 +216,8 @@ def _setup_chrome_entities(self, job_os_version=None):

def _run_and_get_task(self):
"""Runs the scheduler and returns the single task created."""
scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
scheduler = schedule_fuzz.ChromeFuzzTaskScheduler()
tasks = scheduler.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
return tasks[0]

Expand Down
15 changes: 15 additions & 0 deletions src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,21 @@ def test_get_utask_mains_with_os_version(self, mock_env_get, mock_puller):
tasks.get_utask_mains()
mock_puller.assert_called_with('utask_main-ubuntu-24-04')

def test_get_utask_mains_with_custom_queue(self, mock_env_get, mock_puller):
"""Tests that get_utask_mains selects the custom queue."""
mock_puller.return_value.get_messages_time_limited.return_value = []
mock_env_get.return_value = None
tasks.get_utask_mains(queue_name='custom_queue')
mock_puller.assert_called_with('custom_queue')

def test_get_utask_mains_with_custom_queue_and_os_version(
self, mock_env_get, mock_puller):
"""Tests that get_utask_mains selects the custom queue with OS suffix."""
mock_puller.return_value.get_messages_time_limited.return_value = []
mock_env_get.return_value = 'ubuntu-24-04'
tasks.get_utask_mains(queue_name='custom_queue')
mock_puller.assert_called_with('custom_queue-ubuntu-24-04')


@mock.patch('clusterfuzz._internal.system.environment.get_value')
@mock.patch('clusterfuzz._internal.system.environment.platform')
Expand Down
Loading
Loading