diff --git a/azurefunctions-extensions-base/azurefunctions/extensions/base/runtime.py b/azurefunctions-extensions-base/azurefunctions/extensions/base/runtime.py index af7cbca..a4601b4 100644 --- a/azurefunctions-extensions-base/azurefunctions/extensions/base/runtime.py +++ b/azurefunctions-extensions-base/azurefunctions/extensions/base/runtime.py @@ -7,9 +7,15 @@ - RuntimeTrackerMeta: Metaclass that auto-registers runtimes at import time - RuntimeBase: Abstract base class that all runtimes must extend - RuntimeFeatureChecker: Utility to check if a runtime is loaded +- VersionNamespace: Helper for exposing VERSION in the expected format + +This module should be packaged as azurefunctions.extensions.base """ +import contextvars from abc import abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import Optional base_runtime_module = __name__ @@ -26,6 +32,7 @@ class RuntimeTrackerMeta(type): """ _module = None _runtime_name = None + _package_name = None def __new__(cls, name, bases, dct, **kwargs): new_class = super().__new__(cls, name, bases, dct) @@ -37,6 +44,10 @@ def __new__(cls, name, bases, dct, **kwargs): if cls._module is None: cls._module = new_module cls._runtime_name = runtime_name + if '.runtime' in new_module: + cls._package_name = new_module.rsplit('.runtime', 1)[0] + else: + cls._package_name = new_module.split('.')[0] elif cls._module != new_module: raise Exception( f"Only one runtime package shall be imported at a time. " @@ -55,6 +66,11 @@ def get_runtime_name(cls): """Get the registered runtime name""" return cls._runtime_name + @classmethod + def get_package_name(cls): + """Get the runtime package name (without .runtime suffix)""" + return cls._package_name + @classmethod def module_imported(cls): """Check if a runtime module has been imported""" @@ -69,21 +85,59 @@ class RuntimeBase(metaclass=RuntimeTrackerMeta): 1. Import this base class 2. Create a subclass with runtime_name defined 3. Implement all required event handler methods + 4. Expose VERSION via a 'version' namespace in the package __init__.py - Example: + Example runtime package structure: + # azure_functions_fastapi/runtime.py from azurefunctions.extensions.base import RuntimeBase class Runtime(RuntimeBase): runtime_name = "fastapi" + @property + def VERSION(self): + from . import VERSION + return VERSION + async def worker_init_request(self, request): # Implementation pass + + # ... other methods + + # azure_functions_fastapi/__init__.py + from azurefunctions.extensions.base import VersionNamespace + from .runtime import Runtime + + VERSION = "1.0.0" + version = VersionNamespace(VERSION) # For _library_worker.version.VERSION + + # Export public API + __all__ = ['Runtime', 'VERSION', 'version', ...] """ # Runtime identification (must be set by subclass) runtime_name = None + @property + @abstractmethod + def VERSION(self) -> str: + """ + Get the runtime version string. + + This version is logged during worker initialization and environment + reload for debugging and diagnostics purposes. It should match the + version in the runtime package's version.py file. + + The runtime package must also expose this as _library_worker.version.VERSION + for compatibility with the proxy worker's logging. Use VersionNamespace + helper class in the package's __init__.py. + + Returns: + Version string (e.g., "1.0.0") + """ + raise NotImplementedError() + @abstractmethod async def worker_init_request(self, request): """ @@ -149,6 +203,75 @@ async def function_environment_reload_request(self, request): """ raise NotImplementedError() + @abstractmethod + def start_threadpool_executor(self): + """ + Initialize and start the threadpool executor. + + This is called during worker initialization to set up the thread pool + for executing synchronous functions. The implementation should respect + PYTHON_THREADPOOL_THREAD_COUNT setting if applicable. + + For async-only runtimes, this can be a no-op. + """ + raise NotImplementedError() + + @abstractmethod + def stop_threadpool_executor(self): + """ + Stop and cleanup the threadpool executor. + + This is called during worker shutdown to gracefully terminate the + thread pool. The implementation should wait for pending tasks to + complete. + + For async-only runtimes, this can be a no-op. + """ + raise NotImplementedError() + + @abstractmethod + def get_threadpool_executor(self) -> Optional[ThreadPoolExecutor]: + """ + Get the current threadpool executor instance. + + Returns: + ThreadPoolExecutor instance if available, None otherwise. + For async-only runtimes, this should return None. + """ + raise NotImplementedError() + + @property + @abstractmethod + def invocation_id_cv(self) -> contextvars.ContextVar: + """ + Get the invocation ID context variable. + + This ContextVar is used to track the current invocation ID across + different execution contexts (threads, async tasks). It's essential + for correlating logs and telemetry with specific function invocations. + + Returns: + ContextVar for storing invocation IDs + """ + raise NotImplementedError() + + +class VersionNamespace: + """ + Helper class to create a version namespace for runtime packages. + + The dispatcher accesses _library_worker.version.VERSION, so runtime + packages must expose a 'version' attribute with a 'VERSION' attribute. + + Usage in runtime package's __init__.py: + from azurefunctions.extensions.base import VersionNamespace + + VERSION = "1.0.0" + version = VersionNamespace(VERSION) + """ + def __init__(self, version_string: str): + self.VERSION = version_string + class RuntimeFeatureChecker: """ diff --git a/azurefunctions-extensions-base/tests/test_runtime.py b/azurefunctions-extensions-base/tests/test_runtime.py index ea03bee..a204b62 100644 --- a/azurefunctions-extensions-base/tests/test_runtime.py +++ b/azurefunctions-extensions-base/tests/test_runtime.py @@ -1,9 +1,12 @@ +import contextvars import unittest +from concurrent.futures import ThreadPoolExecutor from azurefunctions.extensions.base.runtime import ( RuntimeBase, RuntimeFeatureChecker, RuntimeTrackerMeta, + VersionNamespace, ) @@ -12,6 +15,7 @@ def setUp(self): # Reset the _module and _runtime_name attributes after each test RuntimeTrackerMeta._module = None RuntimeTrackerMeta._runtime_name = None + RuntimeTrackerMeta._package_name = None self.assertFalse(RuntimeFeatureChecker.runtime_loaded()) def test_classes_imported_from_same_module(self): @@ -75,6 +79,41 @@ class TestRuntime2(metaclass=RuntimeTrackerMeta): self.assertEqual(RuntimeTrackerMeta.get_runtime_name(), "fastapi") self.assertTrue(RuntimeTrackerMeta.module_imported()) + def test_package_name_with_runtime_suffix(self): + class TestRuntime(metaclass=RuntimeTrackerMeta): + __module__ = "azurefunctions.extensions.http.fastapi.runtime" + runtime_name = "fastapi" + + self.assertEqual( + RuntimeTrackerMeta.get_package_name(), + "azurefunctions.extensions.http.fastapi", + ) + self.assertEqual( + RuntimeTrackerMeta.get_module(), + "azurefunctions.extensions.http.fastapi.runtime", + ) + + def test_package_name_without_runtime_suffix(self): + class TestRuntime(metaclass=RuntimeTrackerMeta): + __module__ = "azurefunctions.extensions.http" + runtime_name = "test" + + self.assertEqual(RuntimeTrackerMeta.get_package_name(), "azurefunctions") + self.assertEqual(RuntimeTrackerMeta.get_module(), + "azurefunctions.extensions.http") + + def test_package_name_simple_module(self): + class TestRuntime(metaclass=RuntimeTrackerMeta): + __module__ = "simplemodule" + runtime_name = "simple" + + self.assertEqual(RuntimeTrackerMeta.get_package_name(), "simplemodule") + self.assertEqual(RuntimeTrackerMeta.get_module(), "simplemodule") + + def test_package_name_not_set_before_import(self): + # Before any runtime is imported, package_name should be None + self.assertIsNone(RuntimeTrackerMeta.get_package_name()) + class TestRuntimeBase(unittest.TestCase): def test_worker_init_request_raises_not_implemented_error(self): @@ -210,6 +249,11 @@ async def function_environment_reload_request(self, request): def test_runtime_implementation(self): class TestRuntime(RuntimeBase): runtime_name = "test_runtime" + _invocation_id_cv = contextvars.ContextVar("invocation_id") + + @property + def VERSION(self): + return "1.0.0" async def worker_init_request(self, request): return "worker_init_response" @@ -226,8 +270,22 @@ async def invocation_request(self, request): async def function_environment_reload_request(self, request): return "function_environment_reload_response" + def start_threadpool_executor(self): + pass + + def stop_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + return None + + @property + def invocation_id_cv(self): + return self._invocation_id_cv + runtime = TestRuntime() self.assertEqual(runtime.runtime_name, "test_runtime") + self.assertEqual(runtime.VERSION, "1.0.0") import asyncio @@ -249,6 +307,352 @@ async def function_environment_reload_request(self, request): "function_environment_reload_response", ) + def test_version_property_raises_not_implemented_error(self): + class MockRuntime(RuntimeBase): + runtime_name = "mock" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + pass + + def stop_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + pass + + @property + def invocation_id_cv(self): + pass + + @property + def VERSION(self): + return super().VERSION + + mock_runtime = MockRuntime() + + with self.assertRaises(NotImplementedError): + _ = mock_runtime.VERSION + + def test_version_property_returns_string(self): + """Test that VERSION property works correctly when implemented""" + class TestRuntime(RuntimeBase): + runtime_name = "test" + + @property + def VERSION(self): + return "2.5.3" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + pass + + def stop_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + return None + + @property + def invocation_id_cv(self): + return contextvars.ContextVar("test_invocation_id") + + runtime = TestRuntime() + self.assertEqual(runtime.VERSION, "2.5.3") + self.assertIsInstance(runtime.VERSION, str) + + def test_start_threadpool_executor_raises_not_implemented_error(self): + class MockRuntime(RuntimeBase): + runtime_name = "mock" + + @property + def VERSION(self): + return "1.0.0" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def stop_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + pass + + @property + def invocation_id_cv(self): + pass + + def start_threadpool_executor(self): + super().start_threadpool_executor() + + mock_runtime = MockRuntime() + + with self.assertRaises(NotImplementedError): + mock_runtime.start_threadpool_executor() + + def test_stop_threadpool_executor_raises_not_implemented_error(self): + class MockRuntime(RuntimeBase): + runtime_name = "mock" + + @property + def VERSION(self): + return "1.0.0" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + pass + + @property + def invocation_id_cv(self): + pass + + def stop_threadpool_executor(self): + super().stop_threadpool_executor() + + mock_runtime = MockRuntime() + + with self.assertRaises(NotImplementedError): + mock_runtime.stop_threadpool_executor() + + def test_get_threadpool_executor_raises_not_implemented_error(self): + class MockRuntime(RuntimeBase): + runtime_name = "mock" + + @property + def VERSION(self): + return "1.0.0" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + pass + + def stop_threadpool_executor(self): + pass + + @property + def invocation_id_cv(self): + pass + + def get_threadpool_executor(self): + return super().get_threadpool_executor() + + mock_runtime = MockRuntime() + + with self.assertRaises(NotImplementedError): + mock_runtime.get_threadpool_executor() + + def test_invocation_id_cv_property_raises_not_implemented_error(self): + class MockRuntime(RuntimeBase): + runtime_name = "mock" + + @property + def VERSION(self): + return "1.0.0" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + pass + + def stop_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + pass + + @property + def invocation_id_cv(self): + return super().invocation_id_cv + + mock_runtime = MockRuntime() + + with self.assertRaises(NotImplementedError): + _ = mock_runtime.invocation_id_cv + + def test_threadpool_executor_integration(self): + """Test that threadpool executor methods work correctly when implemented""" + executor = ThreadPoolExecutor(max_workers=2) + + class TestRuntime(RuntimeBase): + runtime_name = "test" + _executor = None + + @property + def VERSION(self): + return "1.0.0" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + self._executor = executor + + def stop_threadpool_executor(self): + if self._executor: + self._executor.shutdown(wait=True) + self._executor = None + + def get_threadpool_executor(self): + return self._executor + + @property + def invocation_id_cv(self): + return contextvars.ContextVar("test_invocation_id") + + runtime = TestRuntime() + self.assertIsNone(runtime.get_threadpool_executor()) + + runtime.start_threadpool_executor() + self.assertIsNotNone(runtime.get_threadpool_executor()) + self.assertEqual(runtime.get_threadpool_executor(), executor) + + runtime.stop_threadpool_executor() + self.assertIsNone(runtime.get_threadpool_executor()) + + def test_invocation_id_cv_context_var(self): + """Test that invocation_id_cv returns a ContextVar""" + + class TestRuntime(RuntimeBase): + runtime_name = "test" + _invocation_id_cv = contextvars.ContextVar("invocation_id", default=None) + + @property + def VERSION(self): + return "1.0.0" + + async def worker_init_request(self, request): + pass + + async def functions_metadata_request(self, request): + pass + + async def function_load_request(self, request): + pass + + async def invocation_request(self, request): + pass + + async def function_environment_reload_request(self, request): + pass + + def start_threadpool_executor(self): + pass + + def stop_threadpool_executor(self): + pass + + def get_threadpool_executor(self): + return None + + @property + def invocation_id_cv(self): + return self._invocation_id_cv + + runtime = TestRuntime() + cv = runtime.invocation_id_cv + + self.assertIsInstance(cv, contextvars.ContextVar) + self.assertEqual(cv.get(), None) + + cv.set("test-invocation-123") + self.assertEqual(cv.get(), "test-invocation-123") + class TestRuntimeFeatureChecker(unittest.TestCase): def setUp(self): @@ -292,3 +696,40 @@ class TestRuntime2(metaclass=RuntimeTrackerMeta): # The runtime should still be loaded with the same name self.assertTrue(RuntimeFeatureChecker.runtime_loaded()) self.assertEqual(RuntimeFeatureChecker.get_runtime_name(), "runtime1") + + +class TestVersionNamespace(unittest.TestCase): + def test_version_namespace_initialization(self): + """Test that VersionNamespace stores the version string correctly""" + version_ns = VersionNamespace("1.2.3") + self.assertEqual(version_ns.VERSION, "1.2.3") + + def test_version_namespace_with_different_versions(self): + """Test VersionNamespace with various version formats""" + test_versions = [ + "1.0.0", + "2.5.3-beta", + "3.0.0-rc1", + "0.1.0-dev", + "1.0.0+build.123", + ] + + for version in test_versions: + version_ns = VersionNamespace(version) + self.assertEqual(version_ns.VERSION, version) + + def test_version_namespace_version_attribute_accessible(self): + """Test that VERSION attribute is accessible as an attribute""" + version_ns = VersionNamespace("4.5.6") + self.assertTrue(hasattr(version_ns, "VERSION")) + self.assertEqual(getattr(version_ns, "VERSION"), "4.5.6") + + def test_version_namespace_use_case(self): + """Test the typical use case for runtime packages""" + # Simulating runtime package's __init__.py usage + VERSION = "2.0.0" + version = VersionNamespace(VERSION) + + # Simulating dispatcher access: _library_worker.version.VERSION + self.assertEqual(version.VERSION, "2.0.0") + self.assertEqual(version.VERSION, VERSION)