Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions openevolve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,13 @@ class Config:

# Parallel controller settings
max_tasks_per_child: Optional[int] = None
# Hard-timeout watchdog for stuck process futures.
# If None, defaults to evaluator.timeout + stuck_future_timeout_buffer.
stuck_future_timeout: Optional[float] = None
# Additional buffer seconds used when stuck_future_timeout is not explicitly set.
stuck_future_timeout_buffer: int = 30
# Number of retry attempts when watchdog times out a stuck future.
stuck_future_max_retries: int = 1

@classmethod
def from_yaml(cls, path: Union[str, Path]) -> "Config":
Expand Down
135 changes: 114 additions & 21 deletions openevolve/process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,20 +490,70 @@ async def run_evolution(
# Track pending futures by island to maintain distribution
pending_futures: Dict[int, Future] = {}
island_pending: Dict[int, List[int]] = {i: [] for i in range(self.num_islands)}
future_start_times: Dict[int, float] = {}
future_retry_counts: Dict[int, int] = {}
future_island_map: Dict[int, int] = {}
batch_size = min(self.num_workers * 2, max_iterations)

# Submit initial batch - distribute across islands
batch_per_island = max(1, batch_size // self.num_islands) if batch_size > 0 else 0
current_iteration = start_iteration

# Hard-timeout watchdog for futures that never transition to done().
stuck_future_timeout = self.config.stuck_future_timeout
if stuck_future_timeout is None:
stuck_future_timeout = (
self.config.evaluator.timeout + self.config.stuck_future_timeout_buffer
)
if stuck_future_timeout is not None and stuck_future_timeout <= 0:
stuck_future_timeout = None
stuck_future_max_retries = max(0, self.config.stuck_future_max_retries)

if stuck_future_timeout is not None:
logger.info(
f"Hard-timeout watchdog enabled: timeout={stuck_future_timeout}s, "
f"max_retries={stuck_future_max_retries}"
)
else:
logger.info("Hard-timeout watchdog disabled")

def remove_from_island_pending(iteration: int) -> None:
for island_id, iteration_list in island_pending.items():
if iteration in iteration_list:
iteration_list.remove(iteration)
break

def submit_iteration_to_island(
iteration: int, island_id: int, retry_count: int = 0
) -> bool:
future = self._submit_iteration(iteration, island_id)
if not future:
return False

pending_futures[iteration] = future
island_pending[island_id].append(iteration)
future_start_times[iteration] = time.monotonic()
future_retry_counts[iteration] = retry_count
future_island_map[iteration] = island_id
return True

def submit_next_iteration_if_needed() -> None:
nonlocal next_iteration
for island_id in range(self.num_islands):
if (
len(island_pending[island_id]) < batch_per_island
and next_iteration < total_iterations
and not self.shutdown_event.is_set()
):
if submit_iteration_to_island(next_iteration, island_id, retry_count=0):
next_iteration += 1
break # Keep per-island balance by submitting one per completion.

# Round-robin distribution across islands
for island_id in range(self.num_islands):
for _ in range(batch_per_island):
if current_iteration < total_iterations:
future = self._submit_iteration(current_iteration, island_id)
if future:
pending_futures[current_iteration] = future
island_pending[island_id].append(current_iteration)
submit_iteration_to_island(current_iteration, island_id, retry_count=0)
current_iteration += 1

next_iteration = current_iteration
Expand Down Expand Up @@ -536,17 +586,74 @@ async def run_evolution(
):
# Find completed futures
completed_iteration = None
completed_by_watchdog = False
watchdog_elapsed = 0.0
for iteration, future in list(pending_futures.items()):
if future.done():
completed_iteration = iteration
break
if stuck_future_timeout is not None:
started_at = future_start_times.get(iteration)
if started_at is not None:
elapsed = time.monotonic() - started_at
if elapsed > stuck_future_timeout:
completed_iteration = iteration
completed_by_watchdog = True
watchdog_elapsed = elapsed
break

if completed_iteration is None:
await asyncio.sleep(0.01)
continue

# Process completed result
future = pending_futures.pop(completed_iteration)
island_id = future_island_map.get(completed_iteration)

if completed_by_watchdog:
retry_count = future_retry_counts.get(completed_iteration, 0)
future.cancel()
remove_from_island_pending(completed_iteration)

can_retry = (
retry_count < stuck_future_max_retries
and island_id is not None
and not self.shutdown_event.is_set()
)
if can_retry:
next_retry = retry_count + 1
logger.warning(
f"⏰ Hard-timeout watchdog: iteration {completed_iteration} exceeded "
f"{stuck_future_timeout}s (elapsed {watchdog_elapsed:.2f}s). "
f"Requeueing attempt {next_retry}/{stuck_future_max_retries}."
)
if submit_iteration_to_island(
completed_iteration, island_id, retry_count=next_retry
):
continue
logger.error(
f"Failed to requeue timed out iteration {completed_iteration}; "
f"counting it as failed."
)
else:
logger.error(
f"⏰ Hard-timeout watchdog: iteration {completed_iteration} exceeded "
f"{stuck_future_timeout}s (elapsed {watchdog_elapsed:.2f}s). "
f"Retries exhausted ({retry_count}/{stuck_future_max_retries}); "
f"counting as failed."
)

future_start_times.pop(completed_iteration, None)
future_retry_counts.pop(completed_iteration, None)
future_island_map.pop(completed_iteration, None)
completed_iterations += 1
submit_next_iteration_if_needed()
continue

# Completed normally (or failed with result/error): clean watchdog state.
future_start_times.pop(completed_iteration, None)
future_retry_counts.pop(completed_iteration, None)
future_island_map.pop(completed_iteration, None)

try:
# Use evaluator timeout + buffer to gracefully handle stuck processes
Expand Down Expand Up @@ -758,24 +865,10 @@ async def run_evolution(
completed_iterations += 1

# Remove completed iteration from island tracking
for island_id, iteration_list in island_pending.items():
if completed_iteration in iteration_list:
iteration_list.remove(completed_iteration)
break
remove_from_island_pending(completed_iteration)

# Submit next iterations maintaining island balance
for island_id in range(self.num_islands):
if (
len(island_pending[island_id]) < batch_per_island
and next_iteration < total_iterations
and not self.shutdown_event.is_set()
):
future = self._submit_iteration(next_iteration, island_id)
if future:
pending_futures[next_iteration] = future
island_pending[island_id].append(next_iteration)
next_iteration += 1
break # Only submit one iteration per completion to maintain balance
# Submit next iteration while keeping island distribution balanced.
submit_next_iteration_if_needed()

# Handle shutdown
if self.shutdown_event.is_set():
Expand Down
68 changes: 68 additions & 0 deletions tests/test_process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,74 @@ def test_request_shutdown(self):
# Verify shutdown event is set
self.assertTrue(controller.shutdown_event.is_set())

def test_hard_timeout_watchdog_requeues_stuck_future(self):
"""Test that watchdog cancels and requeues a stuck future."""

async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
controller.executor = Mock() # run_evolution requires a started executor

self.config.stuck_future_timeout = 0.01
self.config.stuck_future_max_retries = 1

stuck_future = MagicMock(spec=Future)
stuck_future.done.return_value = False
stuck_future.cancel.return_value = True

retry_future = MagicMock(spec=Future)
retry_future.done.return_value = True
retry_future.result.return_value = SerializableResult(
child_program_dict={
"id": "child_retry_success",
"code": "def evolved(): return 42",
"language": "python",
"parent_id": "test_0",
"generation": 1,
"metrics": {"score": 0.8, "performance": 0.9},
"iteration_found": 1,
"metadata": {"changes": "watchdog retry", "island": 0},
},
parent_id="test_0",
iteration_time=0.1,
iteration=1,
target_island=0,
)

with patch.object(
controller, "_submit_iteration", side_effect=[stuck_future, retry_future]
) as mock_submit:
await controller.run_evolution(start_iteration=1, max_iterations=1, target_score=None)

self.assertEqual(mock_submit.call_count, 2)
mock_submit.assert_any_call(1, 0)
self.assertTrue(stuck_future.cancel.called)
self.assertIn("child_retry_success", self.database.programs)

asyncio.run(run_test())

def test_hard_timeout_watchdog_respects_retry_limit(self):
"""Test that watchdog stops requeueing after max retries are exhausted."""

async def run_test():
controller = ProcessParallelController(self.config, self.eval_file, self.database)
controller.executor = Mock() # run_evolution requires a started executor

self.config.stuck_future_timeout = 0.01
self.config.stuck_future_max_retries = 0

stuck_future = MagicMock(spec=Future)
stuck_future.done.return_value = False
stuck_future.cancel.return_value = True

with patch.object(controller, "_submit_iteration", return_value=stuck_future) as mock_submit:
await controller.run_evolution(start_iteration=1, max_iterations=1, target_score=None)

self.assertEqual(mock_submit.call_count, 1)
self.assertTrue(stuck_future.cancel.called)
self.assertNotIn("child_retry_success", self.database.programs)

asyncio.run(run_test())

def test_serializable_result(self):
"""Test SerializableResult dataclass"""
result = SerializableResult(
Expand Down