-
Notifications
You must be signed in to change notification settings - Fork 612
[Swarming] Push & pull preprocess tasks to swarming queue #5282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
99bf603
56e6692
07bae18
de19568
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,22 +13,25 @@ | |
| # limitations under the License. | ||
| """Cron job to schedule fuzz tasks that run on batch.""" | ||
|
|
||
| from abc import ABC | ||
| from abc import abstractmethod | ||
| import collections | ||
| import random | ||
| import time | ||
|
|
||
| from google.cloud import monitoring_v3 | ||
|
|
||
| from clusterfuzz._internal.base import feature_flags | ||
| from clusterfuzz._internal.base import memoize | ||
| from clusterfuzz._internal.base import tasks | ||
| from clusterfuzz._internal.base import utils | ||
| from clusterfuzz._internal.base.feature_flags import FeatureFlags | ||
| from clusterfuzz._internal.datastore import data_types | ||
| from clusterfuzz._internal.datastore import ndb_utils | ||
| from clusterfuzz._internal.google_cloud_utils import credentials | ||
| from clusterfuzz._internal.metrics import logs | ||
|
|
||
| PREPROCESS_TARGET_SIZE_DEFAULT = 10000 | ||
| SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 5 | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Swarming pool has a hard limit of 25 (LINUX) bots running 1 task each.
So, Injecting 5 tasks every 10 minutes matches the expected Swarming rate, preventing an infinitely growing backlog of stale tasks. This is still the default value, the real value is managed trough a feature flag, we will later tweak this feature flag based on metrics & how swarming handled this workload, so that we have a more acqurate value |
||
|
|
||
|
|
||
| @memoize.wrap(memoize.InMemory(60)) | ||
|
|
@@ -62,15 +65,55 @@ def get_queue_size(creds, project_id, subscription_id): | |
| return 0 | ||
|
|
||
|
|
||
| class BaseFuzzTaskScheduler: | ||
| class BaseFuzzTaskScheduler(ABC): | ||
| """Base fuzz task scheduler for any deployment of ClusterFuzz.""" | ||
|
|
||
| def __init__(self, num_tasks): | ||
| self.num_tasks = num_tasks | ||
|
|
||
| def get_fuzz_tasks(self): | ||
| @abstractmethod | ||
| def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: | ||
| raise NotImplementedError('Child class must implement.') | ||
|
|
||
| def schedule_fuzz_tasks(self) -> bool: | ||
| """Schedules fuzz tasks.""" | ||
| return self._schedule_fuzz_tasks() | ||
|
|
||
| def _schedule_fuzz_tasks( | ||
| self, | ||
| queue: str = tasks.PREPROCESS_QUEUE, | ||
| default_target_size: int = PREPROCESS_TARGET_SIZE_DEFAULT, | ||
| target_size_flag: FeatureFlags = FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT | ||
| ) -> bool: | ||
| """Internal method to schedule fuzz tasks.""" | ||
| project = utils.get_application_id() | ||
| start = time.time() | ||
| creds = credentials.get_default()[0] | ||
| preprocess_queue_size = get_queue_size(creds, project, queue) | ||
|
|
||
| target_size = default_target_size | ||
| if target_size_flag.enabled and target_size_flag.content: | ||
| target_size = int(target_size_flag.content) | ||
|
|
||
| num_tasks = target_size - preprocess_queue_size | ||
| logs.info(f'Queue {queue} size: {preprocess_queue_size}. ' | ||
| f'Target: {target_size}. Needed: {num_tasks}.') | ||
|
|
||
| if num_tasks <= 0: | ||
| logs.info('Queue size met or exceeded. Not scheduling tasks.') | ||
| return False | ||
|
|
||
| fuzz_tasks = self.get_fuzz_tasks(num_tasks) | ||
| if not fuzz_tasks: | ||
| logs.error('No fuzz tasks found to schedule.') | ||
| return False | ||
|
|
||
| logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue}.') | ||
| tasks.bulk_add_tasks(fuzz_tasks, queue=queue, eta_now=True) | ||
| logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue}.') | ||
|
|
||
| end = time.time() | ||
| total = end - start | ||
| logs.info(f'Task scheduling took {total} seconds.') | ||
| return True | ||
|
|
||
|
|
||
| class FuzzTaskCandidate: | ||
| """Data class that holds more info about FuzzerJobs than the ndb.Models do. | ||
|
|
@@ -101,7 +144,7 @@ def copy(self): | |
| class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): | ||
| """Fuzz task scheduler for OSS-Fuzz.""" | ||
|
|
||
| def get_fuzz_tasks(self) -> list[tasks.Task]: | ||
| def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: | ||
| # TODO(metzman): Handle high end. | ||
| # A job's weight is determined by its own weight and the weight of the | ||
| # project is a part of. First get project weights. | ||
|
|
@@ -164,11 +207,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| for fuzz_task_candidate in fuzz_task_candidates: | ||
| weights.append(fuzz_task_candidate.weight) | ||
|
|
||
| fuzz_tasks_count = self.num_tasks | ||
| logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') | ||
| logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.') | ||
|
|
||
| choices = random.choices( | ||
| fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) | ||
| choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) | ||
| fuzz_tasks = [ | ||
| tasks.Task( | ||
| 'fuzz', | ||
|
|
@@ -186,13 +227,14 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): | ||
| """Fuzz task scheduler for Chrome.""" | ||
|
|
||
| def get_fuzz_tasks(self) -> list[tasks.Task]: | ||
| def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]: | ||
| """Returns fuzz tasks for chrome, weighted by job weight.""" | ||
| logs.info('Getting jobs for Chrome.') | ||
|
|
||
| candidates_by_job = {} | ||
| # Only consider linux jobs for chrome fuzzing. | ||
| job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') | ||
| # Only consider LINUX or ANDROID jobs | ||
| job_query = data_types.Job.query( | ||
| data_types.Job.platform.IN(['LINUX', 'ANDROID'])) | ||
| for job in ndb_utils.get_all_from_query(job_query): | ||
| base_os_version = None | ||
| if job.base_os_version: | ||
|
|
@@ -214,14 +256,12 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| fuzz_task_candidates.append(fuzz_task_candidate) | ||
|
|
||
| weights = [candidate.weight for candidate in fuzz_task_candidates] | ||
| fuzz_tasks_count = self.num_tasks | ||
| logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') | ||
| logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.') | ||
|
|
||
| if not fuzz_task_candidates: | ||
| return [] | ||
|
|
||
| choices = random.choices( | ||
| fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) | ||
| choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks) | ||
| fuzz_tasks = [ | ||
| tasks.Task( | ||
| 'fuzz', | ||
|
|
@@ -232,51 +272,28 @@ def get_fuzz_tasks(self) -> list[tasks.Task]: | |
| ] | ||
| return fuzz_tasks | ||
|
|
||
| def _schedule_swarming_fuzz_tasks(self) -> bool: | ||
| if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: | ||
| return False | ||
|
|
||
| def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: | ||
| if utils.is_oss_fuzz(): | ||
| scheduler = OssfuzzFuzzTaskScheduler(num_tasks) | ||
| else: | ||
| scheduler = ChromeFuzzTaskScheduler(num_tasks) | ||
| fuzz_tasks = scheduler.get_fuzz_tasks() | ||
| return fuzz_tasks | ||
|
|
||
|
|
||
| def schedule_fuzz_tasks() -> bool: | ||
| """Schedules fuzz tasks.""" | ||
|
|
||
| project = utils.get_application_id() | ||
| start = time.time() | ||
| creds = credentials.get_default()[0] | ||
| preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) | ||
| swarming_preprocess_queue = tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE] | ||
| return self._schedule_fuzz_tasks( | ||
| queue=swarming_preprocess_queue, | ||
| default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT, | ||
| target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT) | ||
|
|
||
| target_size = PREPROCESS_TARGET_SIZE_DEFAULT | ||
| target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT | ||
| if target_size_flag.enabled and target_size_flag.content: | ||
| target_size = int(target_size_flag.content) | ||
| def _schedule_batch_fuzz_tasks(self) -> bool: | ||
| return self._schedule_fuzz_tasks() | ||
|
|
||
| num_tasks = target_size - preprocess_queue_size | ||
| logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' | ||
| f'Target: {target_size}. Needed: {num_tasks}.') | ||
|
|
||
| if num_tasks <= 0: | ||
| logs.info('Queue size met or exceeded. Not scheduling tasks.') | ||
| return False | ||
|
|
||
| fuzz_tasks = get_fuzz_tasks(num_tasks) | ||
| if not fuzz_tasks: | ||
| logs.error('No fuzz tasks found to schedule.') | ||
| return False | ||
|
|
||
| logs.info(f'Adding {fuzz_tasks} to preprocess queue.') | ||
| tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) | ||
| logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') | ||
|
|
||
| end = time.time() | ||
| total = end - start | ||
| logs.info(f'Task scheduling took {total} seconds.') | ||
| return True | ||
| def schedule_fuzz_tasks(self) -> bool: | ||
| self._schedule_swarming_fuzz_tasks() | ||
| self._schedule_batch_fuzz_tasks() | ||
| return True | ||
|
|
||
|
|
||
| def main(): | ||
| return schedule_fuzz_tasks() | ||
| if utils.is_oss_fuzz(): | ||
| scheduler = OssfuzzFuzzTaskScheduler() | ||
| else: | ||
| scheduler = ChromeFuzzTaskScheduler() | ||
| return scheduler.schedule_fuzz_tasks() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still true? if this is going to master then is going to stage and prod ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is still true, right now no swarming related code executes outside of dev, we have a ton of featureFlags in place for this reason. So when this changes reach stage/prod they are going to be safe.