Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions invokeai/app/services/config/config_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ATTENTION_SLICE_SIZE = Literal["auto", "balanced", "max", 1, 2, 3, 4, 5, 6, 7, 8]
LOG_FORMAT = Literal["plain", "color", "syslog", "legacy"]
LOG_LEVEL = Literal["debug", "info", "warning", "error", "critical"]
SESSION_QUEUE_MODE = Literal["FIFO", "round_robin"]
CONFIG_SCHEMA_VERSION = "4.0.2"


Expand Down Expand Up @@ -102,6 +103,7 @@ class InvokeAIAppConfig(BaseSettings):
pil_compress_level: The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.
max_queue_size: Maximum number of items in the session queue.
clear_queue_on_startup: Empties session queue on startup.
session_queue_mode: Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.
allow_nodes: List of nodes to allow. Omit to allow all.
deny_nodes: List of nodes to deny. Omit to deny none.
node_cache_size: How many cached nodes to keep in memory.
Expand Down Expand Up @@ -191,6 +193,7 @@ class InvokeAIAppConfig(BaseSettings):
pil_compress_level: int = Field(default=1, description="The compress_level setting of PIL.Image.save(), used for PNG encoding. All settings are lossless. 0 = no compression, 1 = fastest with slightly larger filesize, 9 = slowest with smallest filesize. 1 is typically the best setting.")
max_queue_size: int = Field(default=10000, gt=0, description="Maximum number of items in the session queue.")
clear_queue_on_startup: bool = Field(default=False, description="Empties session queue on startup.")
session_queue_mode: SESSION_QUEUE_MODE = Field(default="round_robin", description="Session queue mode. Use 'FIFO' for traditional first-in-first-out, or 'round_robin' to serve each user's jobs in turn. In single-user mode, FIFO is always used regardless of this setting.")

# NODES
allow_nodes: Optional[list[str]] = Field(default=None, description="List of nodes to allow. Omit to allow all.")
Expand Down
46 changes: 42 additions & 4 deletions invokeai/app/services/session_queue/session_queue_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,45 @@ async def enqueue_batch(
return enqueue_result

def dequeue(self) -> Optional[SessionQueueItem]:
with self._db.transaction() as cursor:
cursor.execute(
"""--sql
config = self.__invoker.services.configuration
use_round_robin = config.multiuser and config.session_queue_mode == "round_robin"

if use_round_robin:
query = """--sql
WITH user_last_served AS (
-- Track when each user last had an item started, to determine whose turn it is.
SELECT user_id, MAX(started_at) AS last_served_at
FROM session_queue
WHERE started_at IS NOT NULL
GROUP BY user_id
),
user_next_item AS (
-- For each user, select their single best pending item (highest priority, then oldest).
SELECT
user_id,
item_id,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY priority DESC, item_id ASC
) AS rn
FROM session_queue
WHERE status = 'pending'
)
SELECT
sq.*,
u.display_name AS user_display_name,
u.email AS user_email
FROM session_queue sq
LEFT JOIN users u ON sq.user_id = u.user_id
JOIN user_next_item uni ON sq.item_id = uni.item_id AND uni.rn = 1
LEFT JOIN user_last_served uls ON sq.user_id = uls.user_id
ORDER BY
COALESCE(uls.last_served_at, '1970-01-01') ASC,
sq.item_id ASC
LIMIT 1
"""
else:
query = """--sql
SELECT
sq.*,
u.display_name as user_display_name,
Expand All @@ -170,7 +206,9 @@ def dequeue(self) -> Optional[SessionQueueItem]:
sq.item_id ASC
LIMIT 1
"""
)

with self._db.transaction() as cursor:
cursor.execute(query)
result = cast(Union[sqlite3.Row, None], cursor.fetchone())
if result is None:
return None
Expand Down
214 changes: 214 additions & 0 deletions tests/app/services/session_queue/test_session_queue_dequeue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""Tests for session queue dequeue() ordering: FIFO and round-robin modes."""

import json
import uuid
from typing import Optional

import pytest
from pydantic_core import to_jsonable_python

from invokeai.app.services.config.config_default import InvokeAIAppConfig
from invokeai.app.services.invoker import Invoker
from invokeai.app.services.session_queue.session_queue_sqlite import SqliteSessionQueue
from invokeai.app.services.shared.graph import Graph, GraphExecutionState

_EMPTY_SESSION_JSON = json.dumps(to_jsonable_python(GraphExecutionState(graph=Graph()).model_dump()))


@pytest.fixture
def session_queue_fifo(mock_invoker: Invoker) -> SqliteSessionQueue:
"""Queue backed by a single-user (FIFO) invoker."""
# Default config has multiuser=False, so FIFO is always used.
db = mock_invoker.services.board_records._db
queue = SqliteSessionQueue(db=db)
queue.start(mock_invoker)
return queue


@pytest.fixture
def session_queue_round_robin(mock_invoker: Invoker) -> SqliteSessionQueue:
"""Queue backed by a multiuser invoker with round_robin mode."""
mock_invoker.services.configuration = InvokeAIAppConfig(
use_memory_db=True,
node_cache_size=0,
multiuser=True,
session_queue_mode="round_robin",
)
db = mock_invoker.services.board_records._db
queue = SqliteSessionQueue(db=db)
queue.start(mock_invoker)
return queue


def _insert_queue_item(
session_queue: SqliteSessionQueue,
queue_id: str,
user_id: str,
priority: int = 0,
) -> int:
"""Directly insert a minimal queue item and return its item_id."""
session_id = str(uuid.uuid4())
batch_id = str(uuid.uuid4())
with session_queue._db.transaction() as cursor:
cursor.execute(
"""--sql
INSERT INTO session_queue (queue_id, session, session_id, batch_id, field_values, priority, workflow, origin, destination, retried_from_item_id, user_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(queue_id, _EMPTY_SESSION_JSON, session_id, batch_id, None, priority, None, None, None, None, user_id),
)
return cursor.lastrowid # type: ignore[return-value]


def _dequeue_user_ids(session_queue: SqliteSessionQueue, count: int) -> list[Optional[str]]:
"""Dequeue `count` items and return the list of user_ids in dequeue order."""
result = []
for _ in range(count):
item = session_queue.dequeue()
result.append(item.user_id if item is not None else None)
return result


# ---------------------------------------------------------------------------
# FIFO tests
# ---------------------------------------------------------------------------


def test_fifo_single_user_order(session_queue_fifo: SqliteSessionQueue) -> None:
"""FIFO: items from a single user are dequeued in insertion order."""
queue_id = "default"
_insert_queue_item(session_queue_fifo, queue_id, "user_a")
_insert_queue_item(session_queue_fifo, queue_id, "user_a")
_insert_queue_item(session_queue_fifo, queue_id, "user_a")

user_ids = _dequeue_user_ids(session_queue_fifo, 3)
assert user_ids == ["user_a", "user_a", "user_a"]


def test_fifo_multi_user_preserves_insertion_order(session_queue_fifo: SqliteSessionQueue) -> None:
"""FIFO: jobs from multiple users are dequeued in strict insertion order, not interleaved."""
queue_id = "default"
# Insert A1, A2, B1, C1, C2, A3 – FIFO should preserve this exact order.
_insert_queue_item(session_queue_fifo, queue_id, "user_a")
_insert_queue_item(session_queue_fifo, queue_id, "user_a")
_insert_queue_item(session_queue_fifo, queue_id, "user_b")
_insert_queue_item(session_queue_fifo, queue_id, "user_c")
_insert_queue_item(session_queue_fifo, queue_id, "user_c")
_insert_queue_item(session_queue_fifo, queue_id, "user_a")

user_ids = _dequeue_user_ids(session_queue_fifo, 6)
assert user_ids == ["user_a", "user_a", "user_b", "user_c", "user_c", "user_a"]


def test_fifo_priority_respected(session_queue_fifo: SqliteSessionQueue) -> None:
"""FIFO: higher-priority items are dequeued before lower-priority ones."""
queue_id = "default"
_insert_queue_item(session_queue_fifo, queue_id, "user_a", priority=0)
_insert_queue_item(session_queue_fifo, queue_id, "user_a", priority=10)

user_ids = _dequeue_user_ids(session_queue_fifo, 2)
# Both are user_a; second inserted item has higher priority and should come first.
assert user_ids == ["user_a", "user_a"]


def test_fifo_returns_none_when_empty(session_queue_fifo: SqliteSessionQueue) -> None:
"""FIFO: dequeue returns None when the queue is empty."""
assert session_queue_fifo.dequeue() is None


# ---------------------------------------------------------------------------
# Round-robin tests
# ---------------------------------------------------------------------------


def test_round_robin_interleaves_users(session_queue_round_robin: SqliteSessionQueue) -> None:
"""Round-robin: jobs from multiple users are interleaved one per user per round.

Queue insertion order (matching the issue example):
A job 1, A job 2, B job 1, C job 1, C job 2, A job 3

Expected dequeue order:
A job 1, B job 1, C job 1, A job 2, C job 2, A job 3
"""
queue_id = "default"
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")
_insert_queue_item(session_queue_round_robin, queue_id, "user_b")
_insert_queue_item(session_queue_round_robin, queue_id, "user_c")
_insert_queue_item(session_queue_round_robin, queue_id, "user_c")
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")

user_ids = _dequeue_user_ids(session_queue_round_robin, 6)
assert user_ids == ["user_a", "user_b", "user_c", "user_a", "user_c", "user_a"]


def test_round_robin_single_user_behaves_like_fifo(session_queue_round_robin: SqliteSessionQueue) -> None:
"""Round-robin with only one user produces the same order as FIFO."""
queue_id = "default"
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")

user_ids = _dequeue_user_ids(session_queue_round_robin, 3)
assert user_ids == ["user_a", "user_a", "user_a"]


def test_round_robin_handles_user_joining_mid_queue(session_queue_round_robin: SqliteSessionQueue) -> None:
"""Round-robin: a user who joins later is correctly interleaved."""
queue_id = "default"
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")
_insert_queue_item(session_queue_round_robin, queue_id, "user_a")
_insert_queue_item(session_queue_round_robin, queue_id, "user_b")

user_ids = _dequeue_user_ids(session_queue_round_robin, 3)
# Round 1: A (oldest rank-1 item), B (rank-1 item)
# Round 2: A (rank-2 item)
assert user_ids == ["user_a", "user_b", "user_a"]


def test_round_robin_returns_none_when_empty(session_queue_round_robin: SqliteSessionQueue) -> None:
"""Round-robin: dequeue returns None when the queue is empty."""
assert session_queue_round_robin.dequeue() is None


def test_round_robin_priority_within_user_respected(session_queue_round_robin: SqliteSessionQueue) -> None:
"""Round-robin: within a single user's items, higher priority is dequeued first."""
queue_id = "default"
# Insert low-priority item first, then high-priority for same user.
_insert_queue_item(session_queue_round_robin, queue_id, "user_a", priority=0)
_insert_queue_item(session_queue_round_robin, queue_id, "user_a", priority=10)
_insert_queue_item(session_queue_round_robin, queue_id, "user_b", priority=0)

# Round 1: user_a's best item (priority 10), user_b's only item.
# Round 2: user_a's remaining item (priority 0).
items = []
for _ in range(3):
item = session_queue_round_robin.dequeue()
assert item is not None
items.append((item.user_id, item.priority))

assert items[0] == ("user_a", 10)
assert items[1] == ("user_b", 0)
assert items[2] == ("user_a", 0)


def test_round_robin_ignored_in_single_user_mode(mock_invoker: Invoker) -> None:
"""When multiuser=False, round_robin config is ignored and FIFO is used."""
mock_invoker.services.configuration = InvokeAIAppConfig(
use_memory_db=True,
node_cache_size=0,
multiuser=False,
session_queue_mode="round_robin",
)
db = mock_invoker.services.board_records._db
queue = SqliteSessionQueue(db=db)
queue.start(mock_invoker)

queue_id = "default"
_insert_queue_item(queue, queue_id, "user_a")
_insert_queue_item(queue, queue_id, "user_a")
_insert_queue_item(queue, queue_id, "user_b")

# FIFO order: user_a, user_a, user_b
user_ids = _dequeue_user_ids(queue, 3)
assert user_ids == ["user_a", "user_a", "user_b"]