Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class PubSubSettings(BaseSettings):
redis_host: str = "redis"
redis_db_number: int = 1
keep_alive_period: int = 45
# MongoDB durable pub/sub settings
event_ttl_days: int = 7 # Auto-delete events after N days
max_catchup_events: int = 1000 # Max events to deliver on reconnect
subscriber_state_ttl_days: int = 30 # Cleanup unused subscriber states


# pylint: disable=too-few-public-methods
Expand Down
42 changes: 28 additions & 14 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from fastapi_users import FastAPIUsers
from beanie import PydanticObjectId
from pydantic import BaseModel
from kernelci.api.models import (

Check failure on line 44 in api/main.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
Node,
Hierarchy,
PublishEvent,
Expand All @@ -51,7 +51,7 @@
)
from .auth import Authentication
from .db import Database
from .pubsub import PubSub
from .pubsub_mongo import PubSub
from .user_manager import get_user_manager, create_user_manager
from .models import (
PageModel,
Expand Down Expand Up @@ -418,11 +418,8 @@
)


def _get_eventhistory(evdict):
"""Get EventHistory object from dictionary"""
evhist = EventHistory()
evhist.data = evdict
return evhist
# EventHistory is now stored by pubsub.publish_cloudevent()
# No need for separate _get_eventhistory function


# TBD: Restrict response by Pydantic model
Expand Down Expand Up @@ -681,9 +678,8 @@
attributes = {}
if data.get('owner', None):
attributes['owner'] = data['owner']
# publish_cloudevent now stores to eventhistory collection
await pubsub.publish_cloudevent('node', data, attributes)
evhist = _get_eventhistory(data)
await db.create(evhist)
return obj


Expand Down Expand Up @@ -751,9 +747,8 @@
if data.get('owner', None):
attributes['owner'] = data['owner']
if not noevent:
# publish_cloudevent now stores to eventhistory collection
await pubsub.publish_cloudevent('node', data, attributes)
evhist = _get_eventhistory(data)
await db.create(evhist)
return obj


Expand Down Expand Up @@ -842,9 +837,8 @@
attributes = {}
if data.get('owner', None):
attributes['owner'] = data['owner']
# publish_cloudevent now stores to eventhistory collection
await pubsub.publish_cloudevent('node', data, attributes)
evhist = _get_eventhistory(data)
await db.create(evhist)
return obj_list


Expand Down Expand Up @@ -894,12 +888,32 @@

@app.post('/subscribe/{channel}', response_model=Subscription)
async def subscribe(channel: str, user: User = Depends(get_current_user),
promisc: Optional[bool] = Query(None)):
"""Subscribe handler for Pub/Sub channel"""
promisc: Optional[bool] = Query(None),
subscriber_id: Optional[str] = Query(
None,
description="Unique subscriber ID for durable "
"delivery. If provided, missed events "
"will be delivered on reconnection. "
"Without this, events are "
"fire-and-forget."
)):
"""Subscribe handler for Pub/Sub channel

Args:
channel: Channel name to subscribe to
promisc: If true, receive all messages regardless of owner
subscriber_id: Optional unique ID for durable event delivery.
When provided, the subscriber's position is tracked and
missed events are delivered on reconnection. Use a stable
identifier like "scheduler-prod-1" or "dashboard-main".
Without subscriber_id, standard fire-and-forget pub/sub.
"""
metrics.add('http_requests_total', 1)
options = {}
if promisc:
options['promiscuous'] = promisc
if subscriber_id:
options['subscriber_id'] = subscriber_id
return await pubsub.subscribe(channel, user.username, options)


Expand Down
37 changes: 37 additions & 0 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
Document,
PydanticObjectId,
)
from kernelci.api.models_base import DatabaseModel, ModelId

Check failure on line 31 in api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models_base'


# PubSub model definitions
Expand Down Expand Up @@ -61,6 +61,43 @@
)


# MongoDB-based durable Pub/Sub models
# Note: Event storage uses EventHistory model from kernelci-core
# (stored in 'eventhistory' collection with sequence_id, channel, owner fields)

class SubscriberState(BaseModel):
"""Tracks subscriber position for durable event delivery

Only created when subscriber_id is provided during subscription.
Enables catch-up on missed events after reconnection.
"""
subscriber_id: str = Field(
description='Unique subscriber identifier (client-provided)'
)
channel: str = Field(
description='Subscribed channel name'
)
user: str = Field(
description='Username of subscriber (for ownership validation)'
)
promiscuous: bool = Field(
default=False,
description='If true, receive all messages regardless of owner'
)
last_event_id: int = Field(
default=0,
description='Last acknowledged event ID (implicit ACK on next poll)'
)
created_at: datetime = Field(
default_factory=datetime.utcnow,
description='Subscription creation timestamp'
)
last_poll: Optional[datetime] = Field(
default=None,
description='Last poll timestamp (used for stale cleanup)'
)


# User model definitions

class UserGroup(DatabaseModel):
Expand Down
Loading