Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions thingsdb/misc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Lock

This lock provides distributed mutual exclusion, allowing you to synchronize
access to shared resources or critical sections of code across multiple
independent Python programs or services, even if they are running on different
machines.

It functions similarly to `asyncio.Lock()`, which is designed for concurrency
within a single process, but extends this capability to a multi-process,
multi-host environment by leveraging ThingsDB as its backend. This ensures that
only one client can acquire the lock at any given time, preventing race
conditions and maintaining data integrity in a distributed system.

The `timeout` parameter defines the maximum duration a lock can be held.
If a client fails to explicitly release the lock (e.g., due to a crash),
ThingsDB will automatically release it after this period, preventing deadlocks.
Separately, the expression `queue_size * timeout` indicates the total maximum
time a client will actively attempt to acquire the lock if it's currently
unavailable.

Example code:

```python
import asyncio
from functools import partial
from thingsdb.client import Client
from thingsdb.misc import lock


async def main():
# ThingsDB client
client = Client()

# Multiple locks may be created, make sure you give each lock a unique name
mylock = partial(lock.lock, client=client, name='my-lock', timeout=5)

await client.connect('localhost')
try:
await client.authenticate('admin', 'pass')

# This will set-up a lock collection
# It will only do work the first time, but no harm in keep calling
await lock.setup(client)

# Wait for a lock
async with mylock():
print('In here')
await asyncio.sleep(5.0) # simulate some work
print('Done here')

finally:
await client.close_and_wait()


if __name__ == '__main__':
asyncio.run(main())
```

To observe the distributed lock in action, you can execute the example Python
script simultaneously in multiple separate terminal windows.

You can determine if a specific distributed lock is currently held by using
the `lock.locked()` asynchronous function.

To check the lock's status:

```python
is_locked = await lock.locked(client, 'my-lock')
```
Empty file added thingsdb/misc/__init__.py
Empty file.
137 changes: 137 additions & 0 deletions thingsdb/misc/lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from ..client import Client
from ..room import Room, event


async def setup(client: Client, collection: str = 'lock'):
has_collection = await client.query("""//ti
has_collection(name);
""", name=collection, scope='/t')

if has_collection:
return

await client.query("""//ti
new_collection(name);
""", name=collection, scope='/t')

await client.query("""//ti

set_type('Inner', {
room: 'room',
task: 'task',
timeout: 'int',
set_task: |this, lock_id| {
this.task = task(
datetime().move('seconds', this.timeout),
|_, lock_id, room_id| wse(Lock(lock_id).release(room_id)),
[lock_id, this.room.id()],
);
nil;
},
});

set_type('Lock', {
queue: '[Inner]',
go: |this| {
if (!this.queue) return nil;
inner = this.queue.first();
inner.set_task(this.id());
inner.room.set_name('go');
inner.room.emit('go');
},
acquire: |this, timeout| {
size = this.queue.len();
immediately = size == 0;
inner = Inner{timeout:,};
this.queue.push(inner);
immediately && inner.set_task(this.id());
[immediately, inner.room.id(), size];
},
release: |this, room_id| try({
if (this.queue.first().room.id() == room_id) {
this.queue.first().task.del();
this.queue.shift();
this.go();
} else {
this.queue
.remove(|inner| inner.room.id() == room_id)
.each(|inner| inner.task.del());
};
nil;
}),
});

set_type('Root', {
lock: 'thing<Lock>',
version: 'int'
});

new_procedure('acquire', |name, timeout| {
.lock.get(name, .lock[name] = Lock{}).acquire(timeout);
});

new_procedure('test', |room_id| {
room(room_id).name() == 'go';
});

new_procedure('locked', |name| {
bool(.lock[name].queue);
});

new_procedure('release', |name, room_id| {
wse(.lock[name].release(room_id));
});

.to_type('Root');
""", scope=f'//{collection}')


class _InnerRoom(Room):

future: asyncio.Future

def on_init(self) -> None:
self.future = asyncio.Future()

async def on_join(self) -> None:
# We might have missed the event during the join. If so, set the
# future result to continue.
ok = await self.client.run('test', self.id, scope=self.scope)
if ok and not self.future.done():
self.future.set_result(None)

@event('go')
def on_go(self):
if not self.future.done():
self.future.set_result(None)


@asynccontextmanager
async def lock(client: Client, name: str,
scope: str = '//lock',
timeout: int = 60) -> AsyncGenerator[int, None]:

res: tuple[bool, int, int] = \
await client.run('acquire', name, timeout, scope=scope)

immediately, room_id, size = res
if not immediately:
room = _InnerRoom(room_id, scope=scope)
await room.join(client, wait=None)
try:
await asyncio.wait_for(room.future, timeout=timeout*size)
except asyncio.TimeoutError:
pass

try:
yield room_id # Lock Id assigned to the 'as' target (not required)
finally:
await client.run('release', name, room_id, scope=scope)


async def locked(client: Client, name: str, scope: str = '//lock') -> bool:
res: bool = await client.run('locked', name, scope=scope)
return res
2 changes: 1 addition & 1 deletion thingsdb/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.1.6'
__version__ = '1.1.7'