From 76f9eb9bdf173896e30a0b5165c4009e2ec71368 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 09:12:29 -0600 Subject: [PATCH 1/9] [sdk-1710] fix: Add context manager to rw lock --- ldclient/__init__.py | 37 +++++-------- ldclient/client.py | 28 +++------- ldclient/feature_store.py | 30 +++-------- ldclient/impl/datasource/status.py | 10 +--- ldclient/impl/datastore/status.py | 12 ++--- ldclient/impl/datasystem/fdv2.py | 80 +++++++++++----------------- ldclient/impl/datasystem/store.py | 45 ++++++---------- ldclient/impl/flag_tracker.py | 5 +- ldclient/impl/rwlock.py | 31 +++++++++++ ldclient/integrations/test_data.py | 21 ++------ ldclient/integrations/test_datav2.py | 31 +++-------- 11 files changed, 124 insertions(+), 206 deletions(-) diff --git a/ldclient/__init__.py b/ldclient/__init__.py index 884c3af8..ea620afd 100644 --- a/ldclient/__init__.py +++ b/ldclient/__init__.py @@ -37,17 +37,16 @@ def set_config(config: Config): global __config global __client global __lock - try: - __lock.lock() - if __client: - log.info("Reinitializing LaunchDarkly Client " + VERSION + " with new config") - new_client = LDClient(config=config, start_wait=start_wait) - old_client = __client - __client = new_client - old_client.close() - finally: - __config = config - __lock.unlock() + with __lock.write_lock(): + try: + if __client: + log.info("Reinitializing LaunchDarkly Client " + VERSION + " with new config") + new_client = LDClient(config=config, start_wait=start_wait) + old_client = __client + __client = new_client + old_client.close() + finally: + __config = config def get() -> LDClient: @@ -63,35 +62,27 @@ def get() -> LDClient: global __config global __client global __lock - try: - __lock.rlock() + with __lock.read_lock(): if __client: return __client if __config is None: raise Exception("set_config was not called") - finally: - __lock.runlock() - try: - __lock.lock() + with __lock.write_lock(): if not __client: log.info("Initializing LaunchDarkly Client " + VERSION) __client = LDClient(config=__config, start_wait=start_wait) return __client - finally: - __lock.unlock() # for testing only def _reset_client(): global __client global __lock - try: - __lock.lock() + c = None + with __lock.write_lock(): c = __client __client = None - finally: - __lock.unlock() if c: c.close() diff --git a/ldclient/client.py b/ldclient/client.py index 1becbbad..80350989 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -111,13 +111,10 @@ def __wrapper(self, fn: Callable): raise def __update_availability(self, available: bool): - try: - self.__lock.lock() + with self.__lock.write_lock(): if available == self.__last_available: return self.__last_available = available - finally: - self.__lock.unlock() status = DataStoreStatus(available, False) @@ -127,23 +124,19 @@ def __update_availability(self, available: bool): self.__store_update_sink.update_status(status) if available: - try: - self.__lock.lock() + with self.__lock.write_lock(): if self.__poller is not None: self.__poller.stop() self.__poller = None - finally: - self.__lock.unlock() return log.warn("Detected persistent store unavailability; updates will be cached until it recovers") task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) - self.__lock.lock() - self.__poller = task - self.__poller.start() - self.__lock.unlock() + with self.__lock.write_lock(): + self.__poller = task + self.__poller.start() def __check_availability(self): try: @@ -717,9 +710,8 @@ def add_hook(self, hook: Hook): if not isinstance(hook, Hook): return - self.__hooks_lock.lock() - self.__hooks.append(hook) - self.__hooks_lock.unlock() + with self.__hooks_lock.write_lock(): + self.__hooks.append(hook) def __evaluate_with_hooks(self, key: str, context: Context, default_value: Any, method: str, block: Callable[[], _EvaluationWithHookResult]) -> _EvaluationWithHookResult: """ @@ -733,15 +725,11 @@ def __evaluate_with_hooks(self, key: str, context: Context, default_value: Any, # :return: """ hooks = [] # type: List[Hook] - try: - self.__hooks_lock.rlock() - + with self.__hooks_lock.read_lock(): if len(self.__hooks) == 0: return block() hooks = self.__hooks.copy() - finally: - self.__hooks_lock.runlock() series_context = EvaluationSeriesContext(key=key, context=context, default_value=default_value, method=method) hook_data = self.__execute_before_evaluation(hooks, series_context) diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index f4340b47..1e43c46c 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -77,8 +77,7 @@ def is_available(self) -> bool: def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: """ """ - try: - self._lock.rlock() + with self._lock.read_lock(): itemsOfKind = self._items[kind] item = itemsOfKind.get(key) if item is None: @@ -88,17 +87,12 @@ def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] log.debug("Attempted to get deleted key %s in '%s', returning None", key, kind.namespace) return callback(None) return callback(item) - finally: - self._lock.runlock() def all(self, kind, callback): """ """ - try: - self._lock.rlock() + with self._lock.read_lock(): itemsOfKind = self._items[kind] return callback(dict((k, i) for k, i in itemsOfKind.items() if ('deleted' not in i) or not i['deleted'])) - finally: - self._lock.runlock() def init(self, all_data): """ """ @@ -108,51 +102,39 @@ def init(self, all_data): for key, item in items.items(): items_decoded[key] = kind.decode(item) all_decoded[kind] = items_decoded - try: - self._lock.rlock() + with self._lock.write_lock(): self._items.clear() self._items.update(all_decoded) self._initialized = True for k in all_data: log.debug("Initialized '%s' store with %d items", k.namespace, len(all_data[k])) - finally: - self._lock.runlock() # noinspection PyShadowingNames def delete(self, kind, key: str, version: int): """ """ - try: - self._lock.rlock() + with self._lock.write_lock(): itemsOfKind = self._items[kind] i = itemsOfKind.get(key) if i is None or i['version'] < version: i = {'deleted': True, 'version': version} itemsOfKind[key] = i - finally: - self._lock.runlock() def upsert(self, kind, item): """ """ decoded_item = kind.decode(item) key = item['key'] - try: - self._lock.rlock() + with self._lock.write_lock(): itemsOfKind = self._items[kind] i = itemsOfKind.get(key) if i is None or i['version'] < item['version']: itemsOfKind[key] = decoded_item log.debug("Updated %s in '%s' to version %d", key, kind.namespace, item['version']) - finally: - self._lock.runlock() @property def initialized(self) -> bool: """ """ - try: - self._lock.rlock() + with self._lock.read_lock(): return self._initialized - finally: - self._lock.runlock() def describe_configuration(self, config): return 'memory' diff --git a/ldclient/impl/datasource/status.py b/ldclient/impl/datasource/status.py index 172ffee9..a9130a65 100644 --- a/ldclient/impl/datasource/status.py +++ b/ldclient/impl/datasource/status.py @@ -29,11 +29,8 @@ def __init__(self, store: FeatureStore, status_listeners: Listeners, flag_change @property def status(self) -> DataSourceStatus: - try: - self.__lock.rlock() + with self.__lock.read_lock(): return self.__status - finally: - self.__lock.runlock() def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): old_data = None @@ -70,8 +67,7 @@ def delete(self, kind: VersionedDataKind, key: str, version: int): def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): status_to_broadcast = None - try: - self.__lock.lock() + with self.__lock.write_lock(): old_status = self.__status if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: @@ -83,8 +79,6 @@ def update_status(self, new_state: DataSourceState, new_error: Optional[DataSour self.__status = DataSourceStatus(new_state, self.__status.since if new_state == self.__status.state else time.time(), self.__status.error if new_error is None else new_error) status_to_broadcast = self.__status - finally: - self.__lock.unlock() if status_to_broadcast is not None: self.__status_listeners.notify(status_to_broadcast) diff --git a/ldclient/impl/datastore/status.py b/ldclient/impl/datastore/status.py index ee9797dd..ead57755 100644 --- a/ldclient/impl/datastore/status.py +++ b/ldclient/impl/datastore/status.py @@ -27,16 +27,12 @@ def listeners(self) -> Listeners: return self.__listeners def status(self) -> DataStoreStatus: - self.__lock.rlock() - status = copy(self.__status) - self.__lock.runlock() - - return status + with self.__lock.read_lock(): + return copy(self.__status) def update_status(self, status: DataStoreStatus): - self.__lock.lock() - old_value, self.__status = self.__status, status - self.__lock.unlock() + with self.__lock.write_lock(): + old_value, self.__status = self.__status, status if old_value != status: self.__listeners.notify(status) diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 7482d3a9..b943e691 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -42,17 +42,13 @@ def __init__(self, listeners: Listeners): @property def status(self) -> DataSourceStatus: - self.__lock.rlock() - status = self.__status - self.__lock.runlock() - - return status + with self.__lock.read_lock(): + return self.__status def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): status_to_broadcast = None - try: - self.__lock.lock() + with self.__lock.write_lock(): old_status = self.__status if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: @@ -67,8 +63,6 @@ def update_status(self, new_state: DataSourceState, new_error: Optional[DataSour self.__status = DataSourceStatus(new_state, new_since, new_error) status_to_broadcast = self.__status - finally: - self.__lock.unlock() if status_to_broadcast is not None: self.__listeners.notify(status_to_broadcast) @@ -92,25 +86,20 @@ def update_status(self, status: DataStoreStatus): """ update_status is called from the data store to push a status update. """ - self.__lock.lock() modified = False - if self.__status != status: - self.__status = status - modified = True - - self.__lock.unlock() + with self.__lock.write_lock(): + if self.__status != status: + self.__status = status + modified = True if modified: self.__listeners.notify(status) @property def status(self) -> DataStoreStatus: - self.__lock.rlock() - status = copy(self.__status) - self.__lock.runlock() - - return status + with self.__lock.read_lock(): + return copy(self.__status) def is_monitoring_enabled(self) -> bool: if self.__store is None: @@ -174,8 +163,7 @@ def __update_availability(self, available: bool): poller_to_stop = None task_to_start = None - self.__lock.lock() - try: + with self.__lock.write_lock(): if available == self.__last_available: return @@ -188,8 +176,6 @@ def __update_availability(self, available: bool): elif self.__poller is None: task_to_start = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) self.__poller = task_to_start - finally: - self.__lock.unlock() if available: log.warning("Persistent store is available again") @@ -336,13 +322,12 @@ def stop(self): """Stop the FDv2 data system and all associated threads.""" self._stop_event.set() - self._lock.lock() - if self._active_synchronizer is not None: - try: - self._active_synchronizer.stop() - except Exception as e: - log.error("Error stopping active data source: %s", e) - self._lock.unlock() + with self._lock.write_lock(): + if self._active_synchronizer is not None: + try: + self._active_synchronizer.stop() + except Exception as e: + log.error("Error stopping active data source: %s", e) # Wait for all threads to complete for thread in self._threads: @@ -426,12 +411,11 @@ def synchronizer_loop(self: 'FDv2'): while not self._stop_event.is_set() and self._primary_synchronizer_builder is not None: # Try primary synchronizer try: - self._lock.lock() - primary_sync = self._primary_synchronizer_builder(self._config) - if isinstance(primary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: - primary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) - self._active_synchronizer = primary_sync - self._lock.unlock() + with self._lock.write_lock(): + primary_sync = self._primary_synchronizer_builder(self._config) + if isinstance(primary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: + primary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) + self._active_synchronizer = primary_sync log.info("Primary synchronizer %s is starting", primary_sync.name) @@ -462,13 +446,12 @@ def synchronizer_loop(self: 'FDv2'): if self._secondary_synchronizer_builder is None: continue - self._lock.lock() - secondary_sync = self._secondary_synchronizer_builder(self._config) - if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: - secondary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) - log.info("Secondary synchronizer %s is starting", secondary_sync.name) - self._active_synchronizer = secondary_sync - self._lock.unlock() + with self._lock.write_lock(): + secondary_sync = self._secondary_synchronizer_builder(self._config) + if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: + secondary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) + log.info("Secondary synchronizer %s is starting", secondary_sync.name) + self._active_synchronizer = secondary_sync remove_sync, fallback_v1 = self._consume_synchronizer_results( secondary_sync, set_on_ready, self._recovery_condition @@ -497,11 +480,10 @@ def synchronizer_loop(self: 'FDv2'): finally: # Ensure we always set the ready event when exiting set_on_ready.set() - self._lock.lock() - if self._active_synchronizer is not None: - self._active_synchronizer.stop() - self._active_synchronizer = None - self._lock.unlock() + with self._lock.write_lock(): + if self._active_synchronizer is not None: + self._active_synchronizer.stop() + self._active_synchronizer = None sync_thread = Thread( target=synchronizer_loop, diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 0d731e03..206ab5e0 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -50,8 +50,7 @@ def get( key: str, callback: Callable[[Any], Any] = lambda x: x, ) -> Any: - try: - self._lock.rlock() + with self._lock.read_lock(): items_of_kind = self._items[kind] item = items_of_kind.get(key) if item is None: @@ -69,12 +68,9 @@ def get( ) return callback(None) return callback(item) - finally: - self._lock.runlock() def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x) -> Any: - try: - self._lock.rlock() + with self._lock.read_lock(): items_of_kind = self._items[kind] return callback( dict( @@ -83,8 +79,6 @@ def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x if ("deleted" not in i) or not i["deleted"] ) ) - finally: - self._lock.runlock() def set_basis(self, collections: Collections) -> bool: """ @@ -95,15 +89,13 @@ def set_basis(self, collections: Collections) -> bool: return False try: - self._lock.lock() - self._items.clear() - self._items.update(all_decoded) - self._initialized = True + with self._lock.write_lock(): + self._items.clear() + self._items.update(all_decoded) + self._initialized = True except Exception as e: log.error("Failed applying set_basis", exc_info=e) return False - finally: - self._lock.unlock() return True @@ -116,20 +108,18 @@ def apply_delta(self, collections: Collections) -> bool: return False try: - self._lock.lock() - for kind, kind_data in all_decoded.items(): - items_of_kind = self._items[kind] - kind_data = all_decoded[kind] - for key, item in kind_data.items(): - items_of_kind[key] = item - log.debug( - "Updated %s in '%s' to version %d", key, kind.namespace, item["version"] - ) + with self._lock.write_lock(): + for kind, kind_data in all_decoded.items(): + items_of_kind = self._items[kind] + kind_data = all_decoded[kind] + for key, item in kind_data.items(): + items_of_kind[key] = item + log.debug( + "Updated %s in '%s' to version %d", key, kind.namespace, item["version"] + ) except Exception as e: log.error("Failed applying apply_delta", exc_info=e) return False - finally: - self._lock.unlock() return True @@ -153,11 +143,8 @@ def initialized(self) -> bool: """ Indicates whether the store has been initialized with data. """ - try: - self._lock.rlock() + with self._lock.read_lock(): return self._initialized - finally: - self._lock.runlock() class Store: diff --git a/ldclient/impl/flag_tracker.py b/ldclient/impl/flag_tracker.py index e7c9b7c2..f4c5fba4 100644 --- a/ldclient/impl/flag_tracker.py +++ b/ldclient/impl/flag_tracker.py @@ -22,9 +22,8 @@ def __call__(self, flag_change: FlagChange): new_value = self.__eval_fn(self.__key, self.__context) - self.__lock.lock() - old_value, self.__value = self.__value, new_value - self.__lock.unlock() + with self.__lock.write_lock(): + old_value, self.__value = self.__value, new_value if new_value == old_value: return diff --git a/ldclient/impl/rwlock.py b/ldclient/impl/rwlock.py index e394194b..df2f6a38 100644 --- a/ldclient/impl/rwlock.py +++ b/ldclient/impl/rwlock.py @@ -1,4 +1,5 @@ import threading +from contextlib import contextmanager class ReadWriteLock: @@ -38,3 +39,33 @@ def lock(self): def unlock(self): """Release a write lock.""" self._read_ready.release() + + @contextmanager + def read_lock(self): + """Context manager for acquiring a read lock. + + Usage: + with lock.read_lock(): + # read lock held here + pass + """ + self.rlock() + try: + yield self + finally: + self.runlock() + + @contextmanager + def write_lock(self): + """Context manager for acquiring a write lock. + + Usage: + with lock.write_lock(): + # write lock held here + pass + """ + self.lock() + try: + yield self + finally: + self.unlock() diff --git a/ldclient/integrations/test_data.py b/ldclient/integrations/test_data.py index 56e06f9a..4385fdb6 100644 --- a/ldclient/integrations/test_data.py +++ b/ldclient/integrations/test_data.py @@ -57,11 +57,8 @@ def __init__(self): def __call__(self, config, store, ready): data_source = _TestDataSource(store, self, ready) - try: - self._lock.lock() + with self._lock.write_lock(): self._instances.append(data_source) - finally: - self._lock.unlock() return data_source @@ -89,14 +86,11 @@ def flag(self, key: str) -> 'FlagBuilder': :param str key: the flag key :return: the flag configuration builder object """ - try: - self._lock.rlock() + with self._lock.read_lock(): if key in self._flag_builders and self._flag_builders[key]: return self._flag_builders[key]._copy() else: return FlagBuilder(key).boolean_flag() - finally: - self._lock.runlock() def update(self, flag_builder: 'FlagBuilder') -> 'TestData': """Updates the test data with the specified flag configuration. @@ -113,9 +107,7 @@ def update(self, flag_builder: 'FlagBuilder') -> 'TestData': :param flag_builder: a flag configuration builder :return: self (the TestData object) """ - try: - self._lock.lock() - + with self._lock.write_lock(): old_version = 0 if flag_builder._key in self._current_flags: old_flag = self._current_flags[flag_builder._key] @@ -126,8 +118,6 @@ def update(self, flag_builder: 'FlagBuilder') -> 'TestData': self._current_flags[flag_builder._key] = new_flag self._flag_builders[flag_builder._key] = flag_builder._copy() - finally: - self._lock.unlock() for instance in self._instances: instance.upsert(new_flag) @@ -138,11 +128,8 @@ def _make_init_data(self) -> dict: return {FEATURES: copy.copy(self._current_flags)} def _closed_instance(self, instance): - try: - self._lock.lock() + with self._lock.write_lock(): self._instances.remove(instance) - finally: - self._lock.unlock() class FlagBuilder: diff --git a/ldclient/integrations/test_datav2.py b/ldclient/integrations/test_datav2.py index a2da52db..fe264395 100644 --- a/ldclient/integrations/test_datav2.py +++ b/ldclient/integrations/test_datav2.py @@ -617,14 +617,11 @@ def flag(self, key: str) -> FlagBuilderV2: :param str key: the flag key :return: the flag configuration builder object """ - try: - self._lock.rlock() + with self._lock.read_lock(): if key in self._flag_builders and self._flag_builders[key]: return self._flag_builders[key]._copy() return FlagBuilderV2(key).boolean_flag() - finally: - self._lock.runlock() def update(self, flag_builder: FlagBuilderV2) -> TestDataV2: """ @@ -643,9 +640,7 @@ def update(self, flag_builder: FlagBuilderV2) -> TestDataV2: :return: self (the TestDataV2 object) """ instances_copy = [] - try: - self._lock.lock() - + with self._lock.write_lock(): old_version = 0 if flag_builder._key in self._current_flags: old_flag = self._current_flags[flag_builder._key] @@ -659,8 +654,6 @@ def update(self, flag_builder: FlagBuilderV2) -> TestDataV2: # Create a copy of instances while holding the lock to avoid race conditions instances_copy = list(self._instances) - finally: - self._lock.unlock() for instance in instances_copy: instance.upsert_flag(new_flag) @@ -668,35 +661,23 @@ def update(self, flag_builder: FlagBuilderV2) -> TestDataV2: return self def _make_init_data(self) -> Dict[str, Any]: - try: - self._lock.rlock() + with self._lock.read_lock(): return copy.copy(self._current_flags) - finally: - self._lock.runlock() def _get_version(self) -> int: - try: - self._lock.lock() + with self._lock.write_lock(): version = self._version self._version += 1 return version - finally: - self._lock.unlock() def _closed_instance(self, instance): - try: - self._lock.lock() + with self._lock.write_lock(): if instance in self._instances: self._instances.remove(instance) - finally: - self._lock.unlock() def _add_instance(self, instance): - try: - self._lock.lock() + with self._lock.write_lock(): self._instances.append(instance) - finally: - self._lock.unlock() def build_initializer(self, _: Config) -> _TestDataSourceV2: """ From 320e72338a21e15626d728cb48c4e764e80f434b Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 10:10:48 -0600 Subject: [PATCH 2/9] [sdk-1710] convert some existing lock implementation to RWlock if they benefit --- ldclient/impl/datasystem/store.py | 17 ++++++++--------- ldclient/impl/listeners.py | 12 ++++++------ 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 206ab5e0..bb56faab 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -6,7 +6,6 @@ ChangeSet applications and flag change notifications. """ -import threading from collections import defaultdict from typing import Any, Callable, Dict, List, Optional, Set @@ -194,7 +193,7 @@ def __init__( self._selector = Selector.no_selector() # Thread synchronization - self._lock = threading.RLock() + self._lock = ReadWriteLock() def with_persistence( self, @@ -213,7 +212,7 @@ def with_persistence( Returns: Self for method chaining """ - with self._lock: + with self._lock.write_lock(): self._persistent_store = persistent_store self._persistent_store_writable = writable self._persistent_store_status_provider = status_provider @@ -225,12 +224,12 @@ def with_persistence( def selector(self) -> Selector: """Returns the current selector.""" - with self._lock: + with self._lock.read_lock(): return self._selector def close(self) -> Optional[Exception]: """Close the store and any persistent store if configured.""" - with self._lock: + with self._lock.write_lock(): if self._persistent_store is not None: try: # Most FeatureStore implementations don't have close methods @@ -251,7 +250,7 @@ def apply(self, change_set: ChangeSet, persist: bool) -> None: """ collections = self._changes_to_store_data(change_set.changes) - with self._lock: + with self._lock.write_lock(): try: if change_set.intent_code == IntentCode.TRANSFER_FULL: self._set_basis(collections, change_set.selector, persist) @@ -443,7 +442,7 @@ def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: return __mapping - with self._lock: + with self._lock.write_lock(): if self._should_persist(): try: # Get all data from memory store and write to persistent store @@ -457,7 +456,7 @@ def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: def get_active_store(self) -> ReadOnlyStore: """Get the currently active store for reading data.""" - with self._lock: + with self._lock.read_lock(): return self._active_store def is_initialized(self) -> bool: @@ -466,5 +465,5 @@ def is_initialized(self) -> bool: def get_data_store_status_provider(self) -> Optional[DataStoreStatusProvider]: """Get the data store status provider for the persistent store, if configured.""" - with self._lock: + with self._lock.read_lock(): return self._persistent_store_status_provider diff --git a/ldclient/impl/listeners.py b/ldclient/impl/listeners.py index d171d80d..ea6f3a95 100644 --- a/ldclient/impl/listeners.py +++ b/ldclient/impl/listeners.py @@ -1,6 +1,6 @@ -from threading import RLock from typing import Any, Callable +from ldclient.impl.rwlock import ReadWriteLock from ldclient.impl.util import log @@ -12,25 +12,25 @@ class Listeners: def __init__(self): self.__listeners = [] - self.__lock = RLock() + self.__lock = ReadWriteLock() def has_listeners(self) -> bool: - with self.__lock: + with self.__lock.read_lock(): return len(self.__listeners) > 0 def add(self, listener: Callable): - with self.__lock: + with self.__lock.write_lock(): self.__listeners.append(listener) def remove(self, listener: Callable): - with self.__lock: + with self.__lock.write_lock(): try: self.__listeners.remove(listener) except ValueError: pass # removing a listener that wasn't in the list is a no-op def notify(self, value: Any): - with self.__lock: + with self.__lock.read_lock(): listeners_copy = self.__listeners.copy() for listener in listeners_copy: try: From ec04a4effefc3d28ab70b4fc0fe810dddfcdcc6f Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 10:13:57 -0600 Subject: [PATCH 3/9] [sdk-1710] update naming --- ldclient/__init__.py | 8 ++++---- ldclient/client.py | 10 +++++----- ldclient/feature_store.py | 12 ++++++------ ldclient/impl/datasource/status.py | 4 ++-- ldclient/impl/datastore/status.py | 4 ++-- ldclient/impl/datasystem/fdv2.py | 18 +++++++++--------- ldclient/impl/datasystem/store.py | 24 ++++++++++++------------ ldclient/impl/flag_tracker.py | 2 +- ldclient/impl/listeners.py | 8 ++++---- ldclient/impl/rwlock.py | 8 ++++---- ldclient/integrations/test_data.py | 8 ++++---- ldclient/integrations/test_datav2.py | 12 ++++++------ 12 files changed, 59 insertions(+), 59 deletions(-) diff --git a/ldclient/__init__.py b/ldclient/__init__.py index ea620afd..b1341f95 100644 --- a/ldclient/__init__.py +++ b/ldclient/__init__.py @@ -37,7 +37,7 @@ def set_config(config: Config): global __config global __client global __lock - with __lock.write_lock(): + with __lock.write(): try: if __client: log.info("Reinitializing LaunchDarkly Client " + VERSION + " with new config") @@ -62,13 +62,13 @@ def get() -> LDClient: global __config global __client global __lock - with __lock.read_lock(): + with __lock.read(): if __client: return __client if __config is None: raise Exception("set_config was not called") - with __lock.write_lock(): + with __lock.write(): if not __client: log.info("Initializing LaunchDarkly Client " + VERSION) __client = LDClient(config=__config, start_wait=start_wait) @@ -80,7 +80,7 @@ def _reset_client(): global __client global __lock c = None - with __lock.write_lock(): + with __lock.write(): c = __client __client = None if c: diff --git a/ldclient/client.py b/ldclient/client.py index 80350989..01007610 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -111,7 +111,7 @@ def __wrapper(self, fn: Callable): raise def __update_availability(self, available: bool): - with self.__lock.write_lock(): + with self.__lock.write(): if available == self.__last_available: return self.__last_available = available @@ -124,7 +124,7 @@ def __update_availability(self, available: bool): self.__store_update_sink.update_status(status) if available: - with self.__lock.write_lock(): + with self.__lock.write(): if self.__poller is not None: self.__poller.stop() self.__poller = None @@ -134,7 +134,7 @@ def __update_availability(self, available: bool): log.warn("Detected persistent store unavailability; updates will be cached until it recovers") task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) - with self.__lock.write_lock(): + with self.__lock.write(): self.__poller = task self.__poller.start() @@ -710,7 +710,7 @@ def add_hook(self, hook: Hook): if not isinstance(hook, Hook): return - with self.__hooks_lock.write_lock(): + with self.__hooks_lock.write(): self.__hooks.append(hook) def __evaluate_with_hooks(self, key: str, context: Context, default_value: Any, method: str, block: Callable[[], _EvaluationWithHookResult]) -> _EvaluationWithHookResult: @@ -725,7 +725,7 @@ def __evaluate_with_hooks(self, key: str, context: Context, default_value: Any, # :return: """ hooks = [] # type: List[Hook] - with self.__hooks_lock.read_lock(): + with self.__hooks_lock.read(): if len(self.__hooks) == 0: return block() diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index 1e43c46c..7a8912b2 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -77,7 +77,7 @@ def is_available(self) -> bool: def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: """ """ - with self._lock.read_lock(): + with self._lock.read(): itemsOfKind = self._items[kind] item = itemsOfKind.get(key) if item is None: @@ -90,7 +90,7 @@ def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] def all(self, kind, callback): """ """ - with self._lock.read_lock(): + with self._lock.read(): itemsOfKind = self._items[kind] return callback(dict((k, i) for k, i in itemsOfKind.items() if ('deleted' not in i) or not i['deleted'])) @@ -102,7 +102,7 @@ def init(self, all_data): for key, item in items.items(): items_decoded[key] = kind.decode(item) all_decoded[kind] = items_decoded - with self._lock.write_lock(): + with self._lock.write(): self._items.clear() self._items.update(all_decoded) self._initialized = True @@ -112,7 +112,7 @@ def init(self, all_data): # noinspection PyShadowingNames def delete(self, kind, key: str, version: int): """ """ - with self._lock.write_lock(): + with self._lock.write(): itemsOfKind = self._items[kind] i = itemsOfKind.get(key) if i is None or i['version'] < version: @@ -123,7 +123,7 @@ def upsert(self, kind, item): """ """ decoded_item = kind.decode(item) key = item['key'] - with self._lock.write_lock(): + with self._lock.write(): itemsOfKind = self._items[kind] i = itemsOfKind.get(key) if i is None or i['version'] < item['version']: @@ -133,7 +133,7 @@ def upsert(self, kind, item): @property def initialized(self) -> bool: """ """ - with self._lock.read_lock(): + with self._lock.read(): return self._initialized def describe_configuration(self, config): diff --git a/ldclient/impl/datasource/status.py b/ldclient/impl/datasource/status.py index a9130a65..c9813e04 100644 --- a/ldclient/impl/datasource/status.py +++ b/ldclient/impl/datasource/status.py @@ -29,7 +29,7 @@ def __init__(self, store: FeatureStore, status_listeners: Listeners, flag_change @property def status(self) -> DataSourceStatus: - with self.__lock.read_lock(): + with self.__lock.read(): return self.__status def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): @@ -67,7 +67,7 @@ def delete(self, kind: VersionedDataKind, key: str, version: int): def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): status_to_broadcast = None - with self.__lock.write_lock(): + with self.__lock.write(): old_status = self.__status if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: diff --git a/ldclient/impl/datastore/status.py b/ldclient/impl/datastore/status.py index ead57755..1e4f145b 100644 --- a/ldclient/impl/datastore/status.py +++ b/ldclient/impl/datastore/status.py @@ -27,11 +27,11 @@ def listeners(self) -> Listeners: return self.__listeners def status(self) -> DataStoreStatus: - with self.__lock.read_lock(): + with self.__lock.read(): return copy(self.__status) def update_status(self, status: DataStoreStatus): - with self.__lock.write_lock(): + with self.__lock.write(): old_value, self.__status = self.__status, status if old_value != status: diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index b943e691..b8229e68 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -42,13 +42,13 @@ def __init__(self, listeners: Listeners): @property def status(self) -> DataSourceStatus: - with self.__lock.read_lock(): + with self.__lock.read(): return self.__status def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): status_to_broadcast = None - with self.__lock.write_lock(): + with self.__lock.write(): old_status = self.__status if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: @@ -88,7 +88,7 @@ def update_status(self, status: DataStoreStatus): """ modified = False - with self.__lock.write_lock(): + with self.__lock.write(): if self.__status != status: self.__status = status modified = True @@ -98,7 +98,7 @@ def update_status(self, status: DataStoreStatus): @property def status(self) -> DataStoreStatus: - with self.__lock.read_lock(): + with self.__lock.read(): return copy(self.__status) def is_monitoring_enabled(self) -> bool: @@ -163,7 +163,7 @@ def __update_availability(self, available: bool): poller_to_stop = None task_to_start = None - with self.__lock.write_lock(): + with self.__lock.write(): if available == self.__last_available: return @@ -322,7 +322,7 @@ def stop(self): """Stop the FDv2 data system and all associated threads.""" self._stop_event.set() - with self._lock.write_lock(): + with self._lock.write(): if self._active_synchronizer is not None: try: self._active_synchronizer.stop() @@ -411,7 +411,7 @@ def synchronizer_loop(self: 'FDv2'): while not self._stop_event.is_set() and self._primary_synchronizer_builder is not None: # Try primary synchronizer try: - with self._lock.write_lock(): + with self._lock.write(): primary_sync = self._primary_synchronizer_builder(self._config) if isinstance(primary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: primary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) @@ -446,7 +446,7 @@ def synchronizer_loop(self: 'FDv2'): if self._secondary_synchronizer_builder is None: continue - with self._lock.write_lock(): + with self._lock.write(): secondary_sync = self._secondary_synchronizer_builder(self._config) if isinstance(secondary_sync, DiagnosticSource) and self._diagnostic_accumulator is not None: secondary_sync.set_diagnostic_accumulator(self._diagnostic_accumulator) @@ -480,7 +480,7 @@ def synchronizer_loop(self: 'FDv2'): finally: # Ensure we always set the ready event when exiting set_on_ready.set() - with self._lock.write_lock(): + with self._lock.write(): if self._active_synchronizer is not None: self._active_synchronizer.stop() self._active_synchronizer = None diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index bb56faab..d9ded03e 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -49,7 +49,7 @@ def get( key: str, callback: Callable[[Any], Any] = lambda x: x, ) -> Any: - with self._lock.read_lock(): + with self._lock.read(): items_of_kind = self._items[kind] item = items_of_kind.get(key) if item is None: @@ -69,7 +69,7 @@ def get( return callback(item) def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x) -> Any: - with self._lock.read_lock(): + with self._lock.read(): items_of_kind = self._items[kind] return callback( dict( @@ -88,7 +88,7 @@ def set_basis(self, collections: Collections) -> bool: return False try: - with self._lock.write_lock(): + with self._lock.write(): self._items.clear() self._items.update(all_decoded) self._initialized = True @@ -107,7 +107,7 @@ def apply_delta(self, collections: Collections) -> bool: return False try: - with self._lock.write_lock(): + with self._lock.write(): for kind, kind_data in all_decoded.items(): items_of_kind = self._items[kind] kind_data = all_decoded[kind] @@ -142,7 +142,7 @@ def initialized(self) -> bool: """ Indicates whether the store has been initialized with data. """ - with self._lock.read_lock(): + with self._lock.read(): return self._initialized @@ -212,7 +212,7 @@ def with_persistence( Returns: Self for method chaining """ - with self._lock.write_lock(): + with self._lock.write(): self._persistent_store = persistent_store self._persistent_store_writable = writable self._persistent_store_status_provider = status_provider @@ -224,12 +224,12 @@ def with_persistence( def selector(self) -> Selector: """Returns the current selector.""" - with self._lock.read_lock(): + with self._lock.read(): return self._selector def close(self) -> Optional[Exception]: """Close the store and any persistent store if configured.""" - with self._lock.write_lock(): + with self._lock.write(): if self._persistent_store is not None: try: # Most FeatureStore implementations don't have close methods @@ -250,7 +250,7 @@ def apply(self, change_set: ChangeSet, persist: bool) -> None: """ collections = self._changes_to_store_data(change_set.changes) - with self._lock.write_lock(): + with self._lock.write(): try: if change_set.intent_code == IntentCode.TRANSFER_FULL: self._set_basis(collections, change_set.selector, persist) @@ -442,7 +442,7 @@ def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: return __mapping - with self._lock.write_lock(): + with self._lock.write(): if self._should_persist(): try: # Get all data from memory store and write to persistent store @@ -456,7 +456,7 @@ def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: def get_active_store(self) -> ReadOnlyStore: """Get the currently active store for reading data.""" - with self._lock.read_lock(): + with self._lock.read(): return self._active_store def is_initialized(self) -> bool: @@ -465,5 +465,5 @@ def is_initialized(self) -> bool: def get_data_store_status_provider(self) -> Optional[DataStoreStatusProvider]: """Get the data store status provider for the persistent store, if configured.""" - with self._lock.read_lock(): + with self._lock.read(): return self._persistent_store_status_provider diff --git a/ldclient/impl/flag_tracker.py b/ldclient/impl/flag_tracker.py index f4c5fba4..8ce16b23 100644 --- a/ldclient/impl/flag_tracker.py +++ b/ldclient/impl/flag_tracker.py @@ -22,7 +22,7 @@ def __call__(self, flag_change: FlagChange): new_value = self.__eval_fn(self.__key, self.__context) - with self.__lock.write_lock(): + with self.__lock.write(): old_value, self.__value = self.__value, new_value if new_value == old_value: diff --git a/ldclient/impl/listeners.py b/ldclient/impl/listeners.py index ea6f3a95..58b88b96 100644 --- a/ldclient/impl/listeners.py +++ b/ldclient/impl/listeners.py @@ -15,22 +15,22 @@ def __init__(self): self.__lock = ReadWriteLock() def has_listeners(self) -> bool: - with self.__lock.read_lock(): + with self.__lock.read(): return len(self.__listeners) > 0 def add(self, listener: Callable): - with self.__lock.write_lock(): + with self.__lock.write(): self.__listeners.append(listener) def remove(self, listener: Callable): - with self.__lock.write_lock(): + with self.__lock.write(): try: self.__listeners.remove(listener) except ValueError: pass # removing a listener that wasn't in the list is a no-op def notify(self, value: Any): - with self.__lock.read_lock(): + with self.__lock.read(): listeners_copy = self.__listeners.copy() for listener in listeners_copy: try: diff --git a/ldclient/impl/rwlock.py b/ldclient/impl/rwlock.py index df2f6a38..9abf41be 100644 --- a/ldclient/impl/rwlock.py +++ b/ldclient/impl/rwlock.py @@ -41,11 +41,11 @@ def unlock(self): self._read_ready.release() @contextmanager - def read_lock(self): + def read(self): """Context manager for acquiring a read lock. Usage: - with lock.read_lock(): + with lock.read(): # read lock held here pass """ @@ -56,11 +56,11 @@ def read_lock(self): self.runlock() @contextmanager - def write_lock(self): + def write(self): """Context manager for acquiring a write lock. Usage: - with lock.write_lock(): + with lock.write(): # write lock held here pass """ diff --git a/ldclient/integrations/test_data.py b/ldclient/integrations/test_data.py index 4385fdb6..59d2e048 100644 --- a/ldclient/integrations/test_data.py +++ b/ldclient/integrations/test_data.py @@ -57,7 +57,7 @@ def __init__(self): def __call__(self, config, store, ready): data_source = _TestDataSource(store, self, ready) - with self._lock.write_lock(): + with self._lock.write(): self._instances.append(data_source) return data_source @@ -86,7 +86,7 @@ def flag(self, key: str) -> 'FlagBuilder': :param str key: the flag key :return: the flag configuration builder object """ - with self._lock.read_lock(): + with self._lock.read(): if key in self._flag_builders and self._flag_builders[key]: return self._flag_builders[key]._copy() else: @@ -107,7 +107,7 @@ def update(self, flag_builder: 'FlagBuilder') -> 'TestData': :param flag_builder: a flag configuration builder :return: self (the TestData object) """ - with self._lock.write_lock(): + with self._lock.write(): old_version = 0 if flag_builder._key in self._current_flags: old_flag = self._current_flags[flag_builder._key] @@ -128,7 +128,7 @@ def _make_init_data(self) -> dict: return {FEATURES: copy.copy(self._current_flags)} def _closed_instance(self, instance): - with self._lock.write_lock(): + with self._lock.write(): self._instances.remove(instance) diff --git a/ldclient/integrations/test_datav2.py b/ldclient/integrations/test_datav2.py index fe264395..3b791cf1 100644 --- a/ldclient/integrations/test_datav2.py +++ b/ldclient/integrations/test_datav2.py @@ -617,7 +617,7 @@ def flag(self, key: str) -> FlagBuilderV2: :param str key: the flag key :return: the flag configuration builder object """ - with self._lock.read_lock(): + with self._lock.read(): if key in self._flag_builders and self._flag_builders[key]: return self._flag_builders[key]._copy() @@ -640,7 +640,7 @@ def update(self, flag_builder: FlagBuilderV2) -> TestDataV2: :return: self (the TestDataV2 object) """ instances_copy = [] - with self._lock.write_lock(): + with self._lock.write(): old_version = 0 if flag_builder._key in self._current_flags: old_flag = self._current_flags[flag_builder._key] @@ -661,22 +661,22 @@ def update(self, flag_builder: FlagBuilderV2) -> TestDataV2: return self def _make_init_data(self) -> Dict[str, Any]: - with self._lock.read_lock(): + with self._lock.read(): return copy.copy(self._current_flags) def _get_version(self) -> int: - with self._lock.write_lock(): + with self._lock.write(): version = self._version self._version += 1 return version def _closed_instance(self, instance): - with self._lock.write_lock(): + with self._lock.write(): if instance in self._instances: self._instances.remove(instance) def _add_instance(self, instance): - with self._lock.write_lock(): + with self._lock.write(): self._instances.append(instance) def build_initializer(self, _: Config) -> _TestDataSourceV2: From 1cbcfd8a09ef3721ec0ef741a72204bdaa3c9e2e Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 10:15:10 -0600 Subject: [PATCH 4/9] [sdk-1710] lint fix --- ldclient/impl/rwlock.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ldclient/impl/rwlock.py b/ldclient/impl/rwlock.py index 9abf41be..a31a2624 100644 --- a/ldclient/impl/rwlock.py +++ b/ldclient/impl/rwlock.py @@ -43,7 +43,7 @@ def unlock(self): @contextmanager def read(self): """Context manager for acquiring a read lock. - + Usage: with lock.read(): # read lock held here @@ -58,7 +58,7 @@ def read(self): @contextmanager def write(self): """Context manager for acquiring a write lock. - + Usage: with lock.write(): # write lock held here From bbb62377433d9af58b3fa789a400fc5edd711519 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 18:58:27 +0000 Subject: [PATCH 5/9] fix: Stop FeatureStoreClientWrapper poller on close --- ldclient/impl/datasystem/fdv2.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index b8229e68..fef77ec3 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -228,6 +228,19 @@ def is_monitoring_enabled(self) -> bool: return False return monitoring_enabled() + + def close(self): + """ + Close the wrapper and stop the repeating task poller if it's running. + This is called by Store.close() during shutdown to ensure the poller thread is stopped. + """ + poller_to_stop = None + with self.__lock.write(): + poller_to_stop = self.__poller + self.__poller = None + + if poller_to_stop is not None: + poller_to_stop.stop() class FDv2(DataSystem): From c3faa679320ef7b974395e4103e6bd400c05b022 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 20:50:55 +0000 Subject: [PATCH 6/9] Document store close method as optional and call if it exists --- ldclient/impl/datasystem/fdv2.py | 13 ++++++------- ldclient/interfaces.py | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index fef77ec3..5cb51f28 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -232,15 +232,14 @@ def is_monitoring_enabled(self) -> bool: def close(self): """ Close the wrapper and stop the repeating task poller if it's running. - This is called by Store.close() during shutdown to ensure the poller thread is stopped. + Also forwards the close call to the underlying store if it has a close method. """ - poller_to_stop = None with self.__lock.write(): - poller_to_stop = self.__poller - self.__poller = None - - if poller_to_stop is not None: - poller_to_stop.stop() + if self.__poller is not None: + self.__poller.stop() + self.__poller = None + if hasattr(self.store, "close"): + self.store.close() class FDv2(DataSystem): diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 7a030d30..29fd114c 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -189,6 +189,28 @@ def initialized(self) -> bool: # :return: true if the underlying data store is reachable # """ + # WARN: This isn't a required method on a FeatureStore. The SDK will + # check if the provided store responds to this method, and if it does, + # will call it during shutdown to release any resources (such as database + # connections or connection pools) that the store may be using. + # + # @abstractmethod + # def close(self): + # """ + # Releases any resources used by the data store implementation. + # + # This method will be called by the SDK during shutdown to ensure proper + # cleanup of resources such as database connections, connection pools, + # network sockets, or other resources that should be explicitly released. + # + # Implementations should be idempotent - calling close() multiple times + # should be safe and have no additional effect after the first call. + # + # This is particularly important for persistent data stores that maintain + # connection pools or other long-lived resources that should be properly + # cleaned up when the SDK is shut down. + # """ + class FeatureStoreCore: """ From e44eb541a1c17c788b6f299101f6ced41f85c5bb Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Tue, 6 Jan 2026 22:06:50 +0000 Subject: [PATCH 7/9] revert the change from RLock --- ldclient/impl/datasystem/store.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index d9ded03e..6491cf97 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -6,6 +6,7 @@ ChangeSet applications and flag change notifications. """ +import threading from collections import defaultdict from typing import Any, Callable, Dict, List, Optional, Set @@ -193,7 +194,7 @@ def __init__( self._selector = Selector.no_selector() # Thread synchronization - self._lock = ReadWriteLock() + self._lock = threading.RLock() def with_persistence( self, @@ -212,7 +213,7 @@ def with_persistence( Returns: Self for method chaining """ - with self._lock.write(): + with self._lock: self._persistent_store = persistent_store self._persistent_store_writable = writable self._persistent_store_status_provider = status_provider @@ -224,12 +225,12 @@ def with_persistence( def selector(self) -> Selector: """Returns the current selector.""" - with self._lock.read(): + with self._lock: return self._selector def close(self) -> Optional[Exception]: """Close the store and any persistent store if configured.""" - with self._lock.write(): + with self._lock: if self._persistent_store is not None: try: # Most FeatureStore implementations don't have close methods @@ -250,7 +251,7 @@ def apply(self, change_set: ChangeSet, persist: bool) -> None: """ collections = self._changes_to_store_data(change_set.changes) - with self._lock.write(): + with self._lock: try: if change_set.intent_code == IntentCode.TRANSFER_FULL: self._set_basis(collections, change_set.selector, persist) @@ -442,7 +443,7 @@ def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: return __mapping - with self._lock.write(): + with self._lock: if self._should_persist(): try: # Get all data from memory store and write to persistent store @@ -456,7 +457,7 @@ def __mapping(data: Dict[str, ModelEntity]) -> Dict[str, Dict[str, Any]]: def get_active_store(self) -> ReadOnlyStore: """Get the currently active store for reading data.""" - with self._lock.read(): + with self._lock: return self._active_store def is_initialized(self) -> bool: @@ -465,5 +466,5 @@ def is_initialized(self) -> bool: def get_data_store_status_provider(self) -> Optional[DataStoreStatusProvider]: """Get the data store status provider for the persistent store, if configured.""" - with self._lock.read(): + with self._lock: return self._persistent_store_status_provider From b454f38d659294b6aa65d6084e614b0a6b5efba0 Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 7 Jan 2026 14:44:33 +0000 Subject: [PATCH 8/9] fix lint --- ldclient/impl/datasystem/fdv2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 5cb51f28..95c79ebe 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -228,7 +228,7 @@ def is_monitoring_enabled(self) -> bool: return False return monitoring_enabled() - + def close(self): """ Close the wrapper and stop the repeating task poller if it's running. From e876d4b7895490bf3c2f6c21ceccbe675879a66d Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 7 Jan 2026 16:34:07 +0000 Subject: [PATCH 9/9] Not all stores implement close so we need to track if it has been closed --- ldclient/impl/datasystem/fdv2.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 95c79ebe..3e2864c8 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -130,6 +130,7 @@ def __init__(self, store: FeatureStore, store_update_sink: DataStoreStatusProvid self.__lock = ReadWriteLock() self.__last_available = True self.__poller: Optional[RepeatingTask] = None + self.__closed = False def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))) @@ -164,6 +165,8 @@ def __update_availability(self, available: bool): task_to_start = None with self.__lock.write(): + if self.__closed: + return if available == self.__last_available: return @@ -234,12 +237,20 @@ def close(self): Close the wrapper and stop the repeating task poller if it's running. Also forwards the close call to the underlying store if it has a close method. """ + poller_to_stop = None + with self.__lock.write(): - if self.__poller is not None: - self.__poller.stop() - self.__poller = None - if hasattr(self.store, "close"): - self.store.close() + if self.__closed: + return + self.__closed = True + poller_to_stop = self.__poller + self.__poller = None + + if poller_to_stop is not None: + poller_to_stop.stop() + + if hasattr(self.store, "close"): + self.store.close() class FDv2(DataSystem):