diff --git a/openevolve/config.py b/openevolve/config.py index bef193da21..0cc9c3f4fe 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -429,6 +429,13 @@ class Config: # Parallel controller settings max_tasks_per_child: Optional[int] = None + # Hard-timeout watchdog for stuck process futures. + # If None, defaults to evaluator.timeout + stuck_future_timeout_buffer. + stuck_future_timeout: Optional[float] = None + # Additional buffer seconds used when stuck_future_timeout is not explicitly set. + stuck_future_timeout_buffer: int = 30 + # Number of retry attempts when watchdog times out a stuck future. + stuck_future_max_retries: int = 1 @classmethod def from_yaml(cls, path: Union[str, Path]) -> "Config": diff --git a/openevolve/process_parallel.py b/openevolve/process_parallel.py index a2fd6592a9..b26a3da9fb 100644 --- a/openevolve/process_parallel.py +++ b/openevolve/process_parallel.py @@ -490,20 +490,70 @@ async def run_evolution( # Track pending futures by island to maintain distribution pending_futures: Dict[int, Future] = {} island_pending: Dict[int, List[int]] = {i: [] for i in range(self.num_islands)} + future_start_times: Dict[int, float] = {} + future_retry_counts: Dict[int, int] = {} + future_island_map: Dict[int, int] = {} batch_size = min(self.num_workers * 2, max_iterations) # Submit initial batch - distribute across islands batch_per_island = max(1, batch_size // self.num_islands) if batch_size > 0 else 0 current_iteration = start_iteration + # Hard-timeout watchdog for futures that never transition to done(). + stuck_future_timeout = self.config.stuck_future_timeout + if stuck_future_timeout is None: + stuck_future_timeout = ( + self.config.evaluator.timeout + self.config.stuck_future_timeout_buffer + ) + if stuck_future_timeout is not None and stuck_future_timeout <= 0: + stuck_future_timeout = None + stuck_future_max_retries = max(0, self.config.stuck_future_max_retries) + + if stuck_future_timeout is not None: + logger.info( + f"Hard-timeout watchdog enabled: timeout={stuck_future_timeout}s, " + f"max_retries={stuck_future_max_retries}" + ) + else: + logger.info("Hard-timeout watchdog disabled") + + def remove_from_island_pending(iteration: int) -> None: + for island_id, iteration_list in island_pending.items(): + if iteration in iteration_list: + iteration_list.remove(iteration) + break + + def submit_iteration_to_island( + iteration: int, island_id: int, retry_count: int = 0 + ) -> bool: + future = self._submit_iteration(iteration, island_id) + if not future: + return False + + pending_futures[iteration] = future + island_pending[island_id].append(iteration) + future_start_times[iteration] = time.monotonic() + future_retry_counts[iteration] = retry_count + future_island_map[iteration] = island_id + return True + + def submit_next_iteration_if_needed() -> None: + nonlocal next_iteration + for island_id in range(self.num_islands): + if ( + len(island_pending[island_id]) < batch_per_island + and next_iteration < total_iterations + and not self.shutdown_event.is_set() + ): + if submit_iteration_to_island(next_iteration, island_id, retry_count=0): + next_iteration += 1 + break # Keep per-island balance by submitting one per completion. + # Round-robin distribution across islands for island_id in range(self.num_islands): for _ in range(batch_per_island): if current_iteration < total_iterations: - future = self._submit_iteration(current_iteration, island_id) - if future: - pending_futures[current_iteration] = future - island_pending[island_id].append(current_iteration) + submit_iteration_to_island(current_iteration, island_id, retry_count=0) current_iteration += 1 next_iteration = current_iteration @@ -536,10 +586,21 @@ async def run_evolution( ): # Find completed futures completed_iteration = None + completed_by_watchdog = False + watchdog_elapsed = 0.0 for iteration, future in list(pending_futures.items()): if future.done(): completed_iteration = iteration break + if stuck_future_timeout is not None: + started_at = future_start_times.get(iteration) + if started_at is not None: + elapsed = time.monotonic() - started_at + if elapsed > stuck_future_timeout: + completed_iteration = iteration + completed_by_watchdog = True + watchdog_elapsed = elapsed + break if completed_iteration is None: await asyncio.sleep(0.01) @@ -547,6 +608,52 @@ async def run_evolution( # Process completed result future = pending_futures.pop(completed_iteration) + island_id = future_island_map.get(completed_iteration) + + if completed_by_watchdog: + retry_count = future_retry_counts.get(completed_iteration, 0) + future.cancel() + remove_from_island_pending(completed_iteration) + + can_retry = ( + retry_count < stuck_future_max_retries + and island_id is not None + and not self.shutdown_event.is_set() + ) + if can_retry: + next_retry = retry_count + 1 + logger.warning( + f"⏰ Hard-timeout watchdog: iteration {completed_iteration} exceeded " + f"{stuck_future_timeout}s (elapsed {watchdog_elapsed:.2f}s). " + f"Requeueing attempt {next_retry}/{stuck_future_max_retries}." + ) + if submit_iteration_to_island( + completed_iteration, island_id, retry_count=next_retry + ): + continue + logger.error( + f"Failed to requeue timed out iteration {completed_iteration}; " + f"counting it as failed." + ) + else: + logger.error( + f"⏰ Hard-timeout watchdog: iteration {completed_iteration} exceeded " + f"{stuck_future_timeout}s (elapsed {watchdog_elapsed:.2f}s). " + f"Retries exhausted ({retry_count}/{stuck_future_max_retries}); " + f"counting as failed." + ) + + future_start_times.pop(completed_iteration, None) + future_retry_counts.pop(completed_iteration, None) + future_island_map.pop(completed_iteration, None) + completed_iterations += 1 + submit_next_iteration_if_needed() + continue + + # Completed normally (or failed with result/error): clean watchdog state. + future_start_times.pop(completed_iteration, None) + future_retry_counts.pop(completed_iteration, None) + future_island_map.pop(completed_iteration, None) try: # Use evaluator timeout + buffer to gracefully handle stuck processes @@ -758,24 +865,10 @@ async def run_evolution( completed_iterations += 1 # Remove completed iteration from island tracking - for island_id, iteration_list in island_pending.items(): - if completed_iteration in iteration_list: - iteration_list.remove(completed_iteration) - break + remove_from_island_pending(completed_iteration) - # Submit next iterations maintaining island balance - for island_id in range(self.num_islands): - if ( - len(island_pending[island_id]) < batch_per_island - and next_iteration < total_iterations - and not self.shutdown_event.is_set() - ): - future = self._submit_iteration(next_iteration, island_id) - if future: - pending_futures[next_iteration] = future - island_pending[island_id].append(next_iteration) - next_iteration += 1 - break # Only submit one iteration per completion to maintain balance + # Submit next iteration while keeping island distribution balanced. + submit_next_iteration_if_needed() # Handle shutdown if self.shutdown_event.is_set(): diff --git a/tests/test_process_parallel.py b/tests/test_process_parallel.py index 8cdd525b33..05a85cfaa6 100644 --- a/tests/test_process_parallel.py +++ b/tests/test_process_parallel.py @@ -163,6 +163,74 @@ def test_request_shutdown(self): # Verify shutdown event is set self.assertTrue(controller.shutdown_event.is_set()) + def test_hard_timeout_watchdog_requeues_stuck_future(self): + """Test that watchdog cancels and requeues a stuck future.""" + + async def run_test(): + controller = ProcessParallelController(self.config, self.eval_file, self.database) + controller.executor = Mock() # run_evolution requires a started executor + + self.config.stuck_future_timeout = 0.01 + self.config.stuck_future_max_retries = 1 + + stuck_future = MagicMock(spec=Future) + stuck_future.done.return_value = False + stuck_future.cancel.return_value = True + + retry_future = MagicMock(spec=Future) + retry_future.done.return_value = True + retry_future.result.return_value = SerializableResult( + child_program_dict={ + "id": "child_retry_success", + "code": "def evolved(): return 42", + "language": "python", + "parent_id": "test_0", + "generation": 1, + "metrics": {"score": 0.8, "performance": 0.9}, + "iteration_found": 1, + "metadata": {"changes": "watchdog retry", "island": 0}, + }, + parent_id="test_0", + iteration_time=0.1, + iteration=1, + target_island=0, + ) + + with patch.object( + controller, "_submit_iteration", side_effect=[stuck_future, retry_future] + ) as mock_submit: + await controller.run_evolution(start_iteration=1, max_iterations=1, target_score=None) + + self.assertEqual(mock_submit.call_count, 2) + mock_submit.assert_any_call(1, 0) + self.assertTrue(stuck_future.cancel.called) + self.assertIn("child_retry_success", self.database.programs) + + asyncio.run(run_test()) + + def test_hard_timeout_watchdog_respects_retry_limit(self): + """Test that watchdog stops requeueing after max retries are exhausted.""" + + async def run_test(): + controller = ProcessParallelController(self.config, self.eval_file, self.database) + controller.executor = Mock() # run_evolution requires a started executor + + self.config.stuck_future_timeout = 0.01 + self.config.stuck_future_max_retries = 0 + + stuck_future = MagicMock(spec=Future) + stuck_future.done.return_value = False + stuck_future.cancel.return_value = True + + with patch.object(controller, "_submit_iteration", return_value=stuck_future) as mock_submit: + await controller.run_evolution(start_iteration=1, max_iterations=1, target_score=None) + + self.assertEqual(mock_submit.call_count, 1) + self.assertTrue(stuck_future.cancel.called) + self.assertNotIn("child_retry_success", self.database.programs) + + asyncio.run(run_test()) + def test_serializable_result(self): """Test SerializableResult dataclass""" result = SerializableResult(