diff --git a/thingsdb/misc/README.md b/thingsdb/misc/README.md new file mode 100644 index 0000000..ed4e824 --- /dev/null +++ b/thingsdb/misc/README.md @@ -0,0 +1,69 @@ +# Lock + +This lock provides distributed mutual exclusion, allowing you to synchronize +access to shared resources or critical sections of code across multiple +independent Python programs or services, even if they are running on different +machines. + +It functions similarly to `asyncio.Lock()`, which is designed for concurrency +within a single process, but extends this capability to a multi-process, +multi-host environment by leveraging ThingsDB as its backend. This ensures that +only one client can acquire the lock at any given time, preventing race +conditions and maintaining data integrity in a distributed system. + +The `timeout` parameter defines the maximum duration a lock can be held. +If a client fails to explicitly release the lock (e.g., due to a crash), +ThingsDB will automatically release it after this period, preventing deadlocks. +Separately, the expression `queue_size * timeout` indicates the total maximum +time a client will actively attempt to acquire the lock if it's currently +unavailable. + +Example code: + +```python +import asyncio +from functools import partial +from thingsdb.client import Client +from thingsdb.misc import lock + + +async def main(): + # ThingsDB client + client = Client() + + # Multiple locks may be created, make sure you give each lock a unique name + mylock = partial(lock.lock, client=client, name='my-lock', timeout=5) + + await client.connect('localhost') + try: + await client.authenticate('admin', 'pass') + + # This will set-up a lock collection + # It will only do work the first time, but no harm in keep calling + await lock.setup(client) + + # Wait for a lock + async with mylock(): + print('In here') + await asyncio.sleep(5.0) # simulate some work + print('Done here') + + finally: + await client.close_and_wait() + + +if __name__ == '__main__': + asyncio.run(main()) +``` + +To observe the distributed lock in action, you can execute the example Python +script simultaneously in multiple separate terminal windows. + +You can determine if a specific distributed lock is currently held by using +the `lock.locked()` asynchronous function. + +To check the lock's status: + +```python +is_locked = await lock.locked(client, 'my-lock') +``` diff --git a/thingsdb/misc/__init__.py b/thingsdb/misc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/thingsdb/misc/lock.py b/thingsdb/misc/lock.py new file mode 100644 index 0000000..a1c042f --- /dev/null +++ b/thingsdb/misc/lock.py @@ -0,0 +1,137 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import AsyncGenerator +from ..client import Client +from ..room import Room, event + + +async def setup(client: Client, collection: str = 'lock'): + has_collection = await client.query("""//ti + has_collection(name); + """, name=collection, scope='/t') + + if has_collection: + return + + await client.query("""//ti + new_collection(name); + """, name=collection, scope='/t') + + await client.query("""//ti + + set_type('Inner', { + room: 'room', + task: 'task', + timeout: 'int', + set_task: |this, lock_id| { + this.task = task( + datetime().move('seconds', this.timeout), + |_, lock_id, room_id| wse(Lock(lock_id).release(room_id)), + [lock_id, this.room.id()], + ); + nil; + }, + }); + + set_type('Lock', { + queue: '[Inner]', + go: |this| { + if (!this.queue) return nil; + inner = this.queue.first(); + inner.set_task(this.id()); + inner.room.set_name('go'); + inner.room.emit('go'); + }, + acquire: |this, timeout| { + size = this.queue.len(); + immediately = size == 0; + inner = Inner{timeout:,}; + this.queue.push(inner); + immediately && inner.set_task(this.id()); + [immediately, inner.room.id(), size]; + }, + release: |this, room_id| try({ + if (this.queue.first().room.id() == room_id) { + this.queue.first().task.del(); + this.queue.shift(); + this.go(); + } else { + this.queue + .remove(|inner| inner.room.id() == room_id) + .each(|inner| inner.task.del()); + }; + nil; + }), + }); + + set_type('Root', { + lock: 'thing', + version: 'int' + }); + + new_procedure('acquire', |name, timeout| { + .lock.get(name, .lock[name] = Lock{}).acquire(timeout); + }); + + new_procedure('test', |room_id| { + room(room_id).name() == 'go'; + }); + + new_procedure('locked', |name| { + bool(.lock[name].queue); + }); + + new_procedure('release', |name, room_id| { + wse(.lock[name].release(room_id)); + }); + + .to_type('Root'); + """, scope=f'//{collection}') + + +class _InnerRoom(Room): + + future: asyncio.Future + + def on_init(self) -> None: + self.future = asyncio.Future() + + async def on_join(self) -> None: + # We might have missed the event during the join. If so, set the + # future result to continue. + ok = await self.client.run('test', self.id, scope=self.scope) + if ok and not self.future.done(): + self.future.set_result(None) + + @event('go') + def on_go(self): + if not self.future.done(): + self.future.set_result(None) + + +@asynccontextmanager +async def lock(client: Client, name: str, + scope: str = '//lock', + timeout: int = 60) -> AsyncGenerator[int, None]: + + res: tuple[bool, int, int] = \ + await client.run('acquire', name, timeout, scope=scope) + + immediately, room_id, size = res + if not immediately: + room = _InnerRoom(room_id, scope=scope) + await room.join(client, wait=None) + try: + await asyncio.wait_for(room.future, timeout=timeout*size) + except asyncio.TimeoutError: + pass + + try: + yield room_id # Lock Id assigned to the 'as' target (not required) + finally: + await client.run('release', name, room_id, scope=scope) + + +async def locked(client: Client, name: str, scope: str = '//lock') -> bool: + res: bool = await client.run('locked', name, scope=scope) + return res diff --git a/thingsdb/version.py b/thingsdb/version.py index 6ebd335..9910ac2 100644 --- a/thingsdb/version.py +++ b/thingsdb/version.py @@ -1 +1 @@ -__version__ = '1.1.6' +__version__ = '1.1.7'