From b38c3b0feb45d1eef0bb50302299948e2a4a6e7f Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Thu, 12 Jun 2025 17:13:09 +0200 Subject: [PATCH 1/2] lock --- thingsdb/misc/__init__.py | 1 + thingsdb/misc/lock.py | 120 ++++++++++++++++++++++++++++++++++++++ thingsdb/version.py | 2 +- 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 thingsdb/misc/__init__.py create mode 100644 thingsdb/misc/lock.py diff --git a/thingsdb/misc/__init__.py b/thingsdb/misc/__init__.py new file mode 100644 index 0000000..8a037ec --- /dev/null +++ b/thingsdb/misc/__init__.py @@ -0,0 +1 @@ +from .lock import lock as ti_lock diff --git a/thingsdb/misc/lock.py b/thingsdb/misc/lock.py new file mode 100644 index 0000000..2b20e54 --- /dev/null +++ b/thingsdb/misc/lock.py @@ -0,0 +1,120 @@ +import asyncio +from contextlib import asynccontextmanager +from ..client import Client +from ..room import Room, event + + +async def setup(client: Client, collection: str = 'lock'): + has_collection = await client.query("""//ti + has_collection(name); + """, name=collection, scope='/t') + + if has_collection: + return + + await client.query("""//ti + new_collection(name); + """, name=collection, scope='/t') + + await client.query("""//ti + + set_type('Inner', { + room: 'room', + task: 'task', + timeout: 'int', + set_task: |this, lock_id| { + this.task = task( + datetime().move('seconds', this.timeout), + |_, lock_id, room_id| wse(Lock(lock_id).release(room_id)), + [lock_id, this.room.id()], + ); + nil; + }, + }); + + set_type('Lock', { + queue: '[Inner]', + go: |this| { + if (!this.queue) return nil; + inner = this.queue.first(); + inner.set_task(this.id()); + inner.room.set_name('go'); + inner.room.emit('go'); + }, + acquire: |this, timeout| { + immediately = this.queue.len() == 0; + inner = Inner{timeout:,}; + this.queue.push(inner); + immediately ? inner.set_task(this.id()) : inner.room.id(); + }, + release: |this, room_id| try({ + if (this.queue.first().room.id() == room_id) { + this.queue.first().task.del(); + this.queue.shift(); + this.go(); + } else { + this.queue + .remove(|inner| inner.room.id() == room_id) + .each(|inner| inner.task.del()); + }; + nil; + }), + }); + + set_type('Root', { + lock: 'thing', + version: 'int' + }); + + new_procedure('acquire', |name, timeout| { + .lock.get(name, .lock[name] = Lock{}).acquire(timeout); + }); + + new_procedure('test', |room_id| { + room(room_id).name() == 'go'; + }); + + new_procedure('release', |name, room_id| { + wse(.lock[name].release(room_id)); + }); + + .to_type('Root'); + """) + + +class _InnerRoom(Room): + + future: asyncio.Future + + def on_init(self) -> None: + self.future = asyncio.Future() + + async def on_join(self) -> None: + # We might have missed the event during the join. If so, set the + # future result to continue. + ok = await self.client.run('test', self.id, scope=self.scope) + if ok: + self.future.set_result(None) + + @event('go') + def on_go(self): + self.future.set_result(None) + + +@asynccontextmanager +async def lock(client: Client, name: str, + scope: str = '//lock', + timeout: int = 60): + + room_id: int | None = \ + await client.run('acquire', name, timeout, scope=scope) + + if room_id is not None: + room = _InnerRoom(room_id, scope=scope) + await room.join(client, wait=None) + await room.future + + try: + yield room_id # Lock Id assigned to the 'as' target (not required) + finally: + await client.run('release', room_id, scope=scope) diff --git a/thingsdb/version.py b/thingsdb/version.py index 6ebd335..9910ac2 100644 --- a/thingsdb/version.py +++ b/thingsdb/version.py @@ -1 +1 @@ -__version__ = '1.1.6' +__version__ = '1.1.7' From 8265ad57997acef410726a9d03aa7d8c4b7643c8 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Fri, 13 Jun 2025 11:59:26 +0200 Subject: [PATCH 2/2] Lock --- thingsdb/misc/README.md | 69 +++++++++++++++++++++++++++++++++++++++ thingsdb/misc/__init__.py | 1 - thingsdb/misc/lock.py | 37 +++++++++++++++------ 3 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 thingsdb/misc/README.md diff --git a/thingsdb/misc/README.md b/thingsdb/misc/README.md new file mode 100644 index 0000000..ed4e824 --- /dev/null +++ b/thingsdb/misc/README.md @@ -0,0 +1,69 @@ +# Lock + +This lock provides distributed mutual exclusion, allowing you to synchronize +access to shared resources or critical sections of code across multiple +independent Python programs or services, even if they are running on different +machines. + +It functions similarly to `asyncio.Lock()`, which is designed for concurrency +within a single process, but extends this capability to a multi-process, +multi-host environment by leveraging ThingsDB as its backend. This ensures that +only one client can acquire the lock at any given time, preventing race +conditions and maintaining data integrity in a distributed system. + +The `timeout` parameter defines the maximum duration a lock can be held. +If a client fails to explicitly release the lock (e.g., due to a crash), +ThingsDB will automatically release it after this period, preventing deadlocks. +Separately, the expression `queue_size * timeout` indicates the total maximum +time a client will actively attempt to acquire the lock if it's currently +unavailable. + +Example code: + +```python +import asyncio +from functools import partial +from thingsdb.client import Client +from thingsdb.misc import lock + + +async def main(): + # ThingsDB client + client = Client() + + # Multiple locks may be created, make sure you give each lock a unique name + mylock = partial(lock.lock, client=client, name='my-lock', timeout=5) + + await client.connect('localhost') + try: + await client.authenticate('admin', 'pass') + + # This will set-up a lock collection + # It will only do work the first time, but no harm in keep calling + await lock.setup(client) + + # Wait for a lock + async with mylock(): + print('In here') + await asyncio.sleep(5.0) # simulate some work + print('Done here') + + finally: + await client.close_and_wait() + + +if __name__ == '__main__': + asyncio.run(main()) +``` + +To observe the distributed lock in action, you can execute the example Python +script simultaneously in multiple separate terminal windows. + +You can determine if a specific distributed lock is currently held by using +the `lock.locked()` asynchronous function. + +To check the lock's status: + +```python +is_locked = await lock.locked(client, 'my-lock') +``` diff --git a/thingsdb/misc/__init__.py b/thingsdb/misc/__init__.py index 8a037ec..e69de29 100644 --- a/thingsdb/misc/__init__.py +++ b/thingsdb/misc/__init__.py @@ -1 +0,0 @@ -from .lock import lock as ti_lock diff --git a/thingsdb/misc/lock.py b/thingsdb/misc/lock.py index 2b20e54..a1c042f 100644 --- a/thingsdb/misc/lock.py +++ b/thingsdb/misc/lock.py @@ -1,5 +1,6 @@ import asyncio from contextlib import asynccontextmanager +from typing import AsyncGenerator from ..client import Client from ..room import Room, event @@ -42,10 +43,12 @@ async def setup(client: Client, collection: str = 'lock'): inner.room.emit('go'); }, acquire: |this, timeout| { - immediately = this.queue.len() == 0; + size = this.queue.len(); + immediately = size == 0; inner = Inner{timeout:,}; this.queue.push(inner); - immediately ? inner.set_task(this.id()) : inner.room.id(); + immediately && inner.set_task(this.id()); + [immediately, inner.room.id(), size]; }, release: |this, room_id| try({ if (this.queue.first().room.id() == room_id) { @@ -74,12 +77,16 @@ async def setup(client: Client, collection: str = 'lock'): room(room_id).name() == 'go'; }); + new_procedure('locked', |name| { + bool(.lock[name].queue); + }); + new_procedure('release', |name, room_id| { wse(.lock[name].release(room_id)); }); .to_type('Root'); - """) + """, scope=f'//{collection}') class _InnerRoom(Room): @@ -93,28 +100,38 @@ async def on_join(self) -> None: # We might have missed the event during the join. If so, set the # future result to continue. ok = await self.client.run('test', self.id, scope=self.scope) - if ok: + if ok and not self.future.done(): self.future.set_result(None) @event('go') def on_go(self): - self.future.set_result(None) + if not self.future.done(): + self.future.set_result(None) @asynccontextmanager async def lock(client: Client, name: str, scope: str = '//lock', - timeout: int = 60): + timeout: int = 60) -> AsyncGenerator[int, None]: - room_id: int | None = \ + res: tuple[bool, int, int] = \ await client.run('acquire', name, timeout, scope=scope) - if room_id is not None: + immediately, room_id, size = res + if not immediately: room = _InnerRoom(room_id, scope=scope) await room.join(client, wait=None) - await room.future + try: + await asyncio.wait_for(room.future, timeout=timeout*size) + except asyncio.TimeoutError: + pass try: yield room_id # Lock Id assigned to the 'as' target (not required) finally: - await client.run('release', room_id, scope=scope) + await client.run('release', name, room_id, scope=scope) + + +async def locked(client: Client, name: str, scope: str = '//lock') -> bool: + res: bool = await client.run('locked', name, scope=scope) + return res