Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 134 additions & 4 deletions cinder/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@

"""

from eventlet import greenpool
from concurrent import futures
import threading
from typing import Callable
from typing import List
from typing import Optional

from eventlet import tpool
from oslo_config import cfg
import oslo_config.types
Expand All @@ -69,6 +74,10 @@
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import utils

# Maximum number of worker threads for async operations
# This replaces the eventlet GreenPool which had unlimited concurrency
DEFAULT_THREADPOOL_SIZE = 10

CONF = cfg.CONF
LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,12 +170,133 @@ def get_log_levels(self, context, log_request):


class ThreadPoolManager(Manager):
"""Manager class that provides a managed thread pool.

Tasks spawned via _add_to_threadpool() will be waited on during
graceful shutdown, ensuring in-flight operations complete before
the service terminates.

This implementation uses native Python threads via ThreadPoolExecutor
instead of eventlet green threads, as eventlet is being deprecated
and will be removed in a future OpenStack release.
"""

def __init__(self, *args, **kwargs):
self._tp = greenpool.GreenPool()
# Use native Python ThreadPoolExecutor instead of eventlet GreenPool
# This provides real OS threads and proper shutdown semantics
pool_size = CONF.threadpool_size or DEFAULT_THREADPOOL_SIZE
self._tp = futures.ThreadPoolExecutor(
max_workers=pool_size,
thread_name_prefix='cinder-volume-worker'
)
self._shutdown_event = threading.Event()
self._pending_futures: List[futures.Future] = []
self._futures_lock = threading.Lock()
LOG.info("Initialized ThreadPoolExecutor with %d workers", pool_size)
super(ThreadPoolManager, self).__init__(*args, **kwargs)

def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)
def _add_to_threadpool(
self,
func: Callable,
*args,
**kwargs
) -> Optional[futures.Future]:
"""Spawn a task in the threadpool.

Tasks spawned here will be waited on during graceful shutdown.
Uses native Python threads via ThreadPoolExecutor.

:param func: The function to execute
:param args: Positional arguments to pass to the function
:param kwargs: Keyword arguments to pass to the function
:returns: A Future object representing the task, or None if shutdown
has been signaled
"""
if self._shutdown_event.is_set():
LOG.warning("Rejecting threadpool task during shutdown: %s",
func.__name__)
return None

future = self._tp.submit(func, *args, **kwargs)

# Track the future for shutdown waiting
with self._futures_lock:
# Clean up completed futures to avoid memory leak
self._pending_futures = [f for f in self._pending_futures
if not f.done()]
self._pending_futures.append(future)

return future

def wait_for_tasks(self, timeout: Optional[float] = None) -> bool:
"""Wait for all threadpool tasks to complete.

Uses concurrent.futures.wait() for proper thread synchronization.

:param timeout: Maximum time to wait in seconds. None means wait
indefinitely. 0 also means wait indefinitely.
:returns: True if all tasks completed, False if timeout was reached.
"""
# Treat 0 as "wait forever" for compatibility with oslo.service
if timeout == 0:
timeout = None

with self._futures_lock:
pending = [f for f in self._pending_futures if not f.done()]

if not pending:
LOG.info("No pending threadpool tasks.")
return True

LOG.info("Waiting for %d threadpool tasks to complete...",
len(pending))

# Use concurrent.futures.wait() for proper timeout handling
done, not_done = futures.wait(
pending,
timeout=timeout,
return_when=futures.ALL_COMPLETED
)

if not_done:
LOG.warning("Timeout waiting for threadpool tasks. "
"%d tasks still running.", len(not_done))
# Log details of incomplete tasks for debugging
for future in not_done:
LOG.warning("Incomplete task: %s", future)
return False

# Check for exceptions in completed tasks
for future in done:
try:
future.result() # This will raise if the task failed
except Exception:
LOG.exception("Task completed with exception")

LOG.info("All %d threadpool tasks completed.", len(done))
return True

def signal_shutdown(self) -> None:
"""Signal that shutdown is in progress.

After this is called, new tasks submitted via _add_to_threadpool()
will be rejected. Existing tasks will continue to run.
"""
LOG.info("Shutdown signaled, rejecting new threadpool tasks")
self._shutdown_event.set()

def cleanup_threadpool(self) -> None:
"""Cleanup the threadpool executor.

Should be called after wait_for_tasks() during service shutdown.
This ensures the executor is properly shut down and all resources
are released.
"""
LOG.info("Shutting down threadpool executor")
# wait=True ensures all pending tasks complete before returning
# cancel_futures=False allows in-flight tasks to finish
self._tp.shutdown(wait=True, cancel_futures=False)
LOG.info("Threadpool executor shutdown complete")


class SchedulerDependentManager(ThreadPoolManager):
Expand Down
117 changes: 108 additions & 9 deletions cinder/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,29 @@
cfg.BoolOpt('osapi_volume_use_ssl',
default=False,
help='Wraps the socket in a SSL context if True is set. '
'A certificate file and key file must be specified.'), ]
'A certificate file and key file must be specified.'),
cfg.IntOpt('graceful_shutdown_timeout',
default=120,
min=0,
help='Maximum time in seconds to wait for operations to '
'complete during graceful shutdown. A value of 0 means '
'wait indefinitely. This should be less than the '
'Kubernetes terminationGracePeriodSeconds if running '
'in Kubernetes. Default: 120 seconds.'),
cfg.BoolOpt('graceful_shutdown_reject_new_operations',
default=True,
help='If True, reject new operations at the manager level '
'during shutdown (defense in depth). If False, rely '
'only on RPC server stop to prevent new work.'),
cfg.IntOpt('threadpool_size',
default=10,
min=1,
max=100,
help='Number of native Python threads in the threadpool '
'for async volume operations. This replaces the previous '
'eventlet GreenPool. Adjust based on workload and '
'available system resources. Default: 10.'),
]


CONF = cfg.CONF
Expand Down Expand Up @@ -151,6 +173,8 @@ def __init__(self, host: str, binary: str, topic: str,
self.topic = topic
self.manager_class_name = manager
self.coordination = coordination
# Flag to indicate graceful shutdown in progress
self._draining = False
manager_class = importutils.import_class(self.manager_class_name)
if CONF.profiler.enabled:
manager_class = profiler.trace_cls("rpc")(manager_class)
Expand Down Expand Up @@ -429,33 +453,97 @@ def create(cls,
return service_obj

def stop(self) -> None:
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
"""Stop the service gracefully.

This method:
1. Sets draining state (stops heartbeat reporting)
2. Signals manager to reject new threadpool tasks
3. Stops all RPC servers (removes from consumer list)
4. Does NOT stop coordination here (needed during drain)
5. Does NOT cleanup RPC transport (needed for outbound RPC)

The coordination service and actual waiting for tasks is done
in wait() to allow in-flight operations to complete while still
being able to make outbound RPC calls and hold distributed locks.
"""
LOG.info("Initiating graceful shutdown for service %s on host %s",
self.binary, self.host)

# Set draining state - stops heartbeat reporting
self._draining = True

# Signal manager to stop accepting new threadpool tasks
if hasattr(self.manager, 'signal_shutdown'):
self.manager.signal_shutdown()

# Stop accepting new RPC messages
# This removes us from RabbitMQ consumer list
# New messages will go to other service instances
try:
if self.rpcserver is not None:
LOG.info("Stopping RPC server (topic: %s)", self.topic)
self.rpcserver.stop()
if self.backend_rpcserver:
self.backend_rpcserver.stop()
if self.cluster_rpcserver:
self.cluster_rpcserver.stop()
except Exception:
pass
LOG.exception("Error stopping RPC servers")

# NOTE: Do NOT stop coordination here!
# In-flight operations may still need distributed locks.
# Coordination will be stopped in wait() after operations complete.

# NOTE: Do NOT cleanup RPC transport here!
# In-flight operations may need to make outbound RPC calls.

if self.coordination:
try:
coordination.COORDINATOR.stop()
except Exception:
pass
super(Service, self).stop(graceful=True)

def wait(self) -> None:
"""Wait for all service operations to complete.

This method:
1. Waits for manager threadpool tasks (outbound RPC still available)
2. Waits for in-flight RPC handlers (outbound RPC still available)
3. Stops coordination service (after all ops complete)
"""
# Wait for manager's threadpool tasks
# Outbound RPC is still available during this time
if hasattr(self.manager, 'wait_for_tasks'):
timeout = CONF.graceful_shutdown_timeout
if timeout == 0:
timeout = None # 0 means wait forever
LOG.info("Waiting for manager tasks (timeout=%s seconds)", timeout)
completed = self.manager.wait_for_tasks(timeout=timeout)
if not completed:
LOG.warning("Some manager tasks did not complete within "
"timeout")

# Wait for RPC servers to finish processing in-flight requests
# Outbound RPC is still available during this time
LOG.info("Waiting for in-flight RPC requests to complete...")
if self.rpcserver:
self.rpcserver.wait()
if self.backend_rpcserver:
self.backend_rpcserver.wait()
if self.cluster_rpcserver:
self.cluster_rpcserver.wait()

# NOW stop coordination - all operations have completed
# and no longer need distributed locks
if self.coordination:
try:
LOG.info("Stopping coordination service")
coordination.COORDINATOR.stop()
except Exception:
LOG.exception("Error stopping coordination")

super(Service, self).wait()
LOG.info("Service %s shutdown complete", self.binary)

# Cleanup the threadpool executor after all operations complete
if hasattr(self.manager, 'cleanup_threadpool'):
self.manager.cleanup_threadpool()

def periodic_tasks(self, raise_on_error: bool = False) -> None:
"""Tasks to be run at a periodic interval."""
Expand All @@ -464,6 +552,12 @@ def periodic_tasks(self, raise_on_error: bool = False) -> None:

def report_state(self) -> None:
"""Update the state of this service in the datastore."""
# Don't report state if we're draining
# This causes scheduler to see us as "down" after service_down_time
if self._draining:
LOG.debug("Service is draining, skipping state report")
return

if not self.manager.is_working():
# NOTE(dulek): If manager reports a problem we're not sending
# heartbeats - to indicate that service is actually down.
Expand Down Expand Up @@ -518,6 +612,11 @@ def reset(self) -> None:
self.manager.reset()
super(Service, self).reset()

@property
def is_draining(self) -> bool:
"""Return True if the service is shutting down gracefully."""
return self._draining


class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a 'paste' configuration."""
Expand Down
Loading