Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/backgrounds/plugins/room_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging

from backgrounds.base import Background, BackgroundConfig
from providers.room_type_location_provider import RoomTypeLocationProvider


class RoomTypeLocation(Background):
"""
Background task that starts the RoomTypeLocationProvider.

This task monitors the IOProvider's `room_type` dynamic variable and, when a
valid room type appears, posts it to the map locations API
"""

def __init__(self, config: BackgroundConfig = BackgroundConfig()):
"""
Initialize the Room Type Location background task.

Parameters
----------
config : BackgroundConfig
Configuration for the background task.
Expected (but optional) fields on config:
- base_url: str
- timeout: int
- refresh_interval: int
- map_name: str
"""
super().__init__(config)

base_url = getattr(
self.config,
"base_url",
"http://localhost:5000/maps/locations/add/slam",
)
timeout = getattr(self.config, "timeout", 5)
refresh_interval = getattr(self.config, "refresh_interval", 5)
map_name = getattr(self.config, "map_name", "map")

self.room_type_location_provider = RoomTypeLocationProvider(
base_url=base_url,
timeout=timeout,
refresh_interval=refresh_interval,
map_name=map_name,
)
self.room_type_location_provider.start()

logging.info(
"RoomTypeLocation background initialized "
f"(base_url: {base_url}, map_name: {map_name}, "
f"refresh: {refresh_interval}s)"
)
291 changes: 291 additions & 0 deletions src/inputs/plugins/vlm_openai_rtsp_room_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
import asyncio
import logging
import re
import time
from dataclasses import dataclass
from queue import Empty, Queue
from typing import List, Optional

from openai.types.chat import ChatCompletion

from inputs.base import SensorConfig
from inputs.base.loop import FuserInput
from providers.io_provider import IOProvider
from providers.vlm_openai_rtsp_provider import VLMOpenAIRTSPProvider


@dataclass
class Message:
"""
Container for timestamped messages.

Parameters
----------
timestamp : float
Unix timestamp of the message
message : str
Content of the message
"""

timestamp: float
message: str


class VLMOpenAIRTSPROOM(FuserInput[str]):
"""
Vision Language Model input handler.

A class that processes image inputs and generates text descriptions using
a vision language model. It maintains an internal buffer of processed messages
and interfaces with a VLM provider for image analysis.

The class handles asynchronous processing of images, maintains message history,
and provides formatted output of the latest processed messages.
"""

def __init__(self, config: SensorConfig = SensorConfig()):
"""
Initialize VLM input handler.

Sets up the required providers and buffers for handling VLM processing.
Initializes connection to the VLM service and registers message handlers.
"""
super().__init__(config)

# Track IO
self.io_provider = IOProvider()

# Buffer for storing the final output
self.messages: List[Message] = []

# Buffer for storing messages
self.message_buffer: Queue[str] = Queue()

# Initialize VLM provider
api_key = getattr(self.config, "api_key", None)

if api_key is None or api_key == "":
raise ValueError("config file missing api_key")

base_url = getattr(
self.config, "base_url", "https://api.openmind.org/api/core/openai"
)
rtsp_url = getattr(self.config, "rtsp_url", "rtsp://localhost:8554/top_camera")
prompt = getattr(
self.config,
"prompt",
"What is the most interesting aspect in this series of images? Also return one exact guess of the room type in format: [Room:<one of living_room, bedroom, study, kitchen, outdoor, unknown>]. Use 'unknown' if you are not at least 80 percent sure.",
)
fps = getattr(self.config, "fps", 15)
self.descriptor_for_LLM = getattr(
self.config,
"descriptor_for_LLM",
"Vision",
)

self.vlm: VLMOpenAIRTSPProvider = VLMOpenAIRTSPProvider(
base_url=base_url,
api_key=api_key,
rtsp_url=rtsp_url,
prompt=prompt,
fps=fps,
)
self.vlm.start()
self.vlm.register_message_callback(self._handle_vlm_message)

def _handle_vlm_message(self, raw_message: ChatCompletion):
"""
Process incoming VLM messages.

Parses JSON messages from the VLM service and adds valid responses
to the message buffer for further processing. Also parses the room type
(if present) into IO provider dynamic variables.

Parameters
----------
raw_message : str
Raw JSON message received from the VLM service
"""
logging.info(f"VLM OpenAI received message: {raw_message}")

# Safely get content
try:
content = raw_message.choices[0].message.content
except Exception:
logging.exception("Failed to read message content from ChatCompletion")
return

if not content:
logging.debug("Received empty content from VLM message.")
return

# Keep existing behavior: put full content on the buffer
self.message_buffer.put(content)

# --- Parse room type ---
# 1) Explicit tag like: [Room: bedroom]
room_type = None
m = re.search(r"\[\s*Room\s*:\s*([^\]]+)\]", content, flags=re.IGNORECASE)
if m:
room_type = m.group(1).strip().lower().rstrip(" .")
else:
# 2) Fallback: scan for any allowed token in plain text
allowed = {"living_room", "bedroom", "study", "kitchen", "outdoor"}
# Also accept variants
variants = {
"living room": "living_room",
"living-room": "living_room",
"bed room": "bedroom",
"office": "study",
"outside": "outdoor",
}

# Check exact allowed tokens first
for opt in allowed:
if re.search(rf"\b{re.escape(opt)}\b", content, flags=re.IGNORECASE):
room_type = opt
break

# If still not found, check variants
if room_type is None:
for k, v in variants.items():
if re.search(rf"\b{re.escape(k)}\b", content, flags=re.IGNORECASE):
room_type = v
break

# Normalize final value
if room_type:
# Map common spacing/hyphen variants
norm = room_type.replace("-", " ").replace("_", " ").strip()
normalization_map = {
"living room": "living_room",
"bedroom": "bedroom",
"study": "study",
"kitchen": "kitchen",
"outdoor": "outdoor",
"outside": "outdoor",
"office": "study",
"bed room": "bedroom",
"unknown": "unknown",
}
room_type_final = normalization_map.get(norm, room_type)

# Recognise 'unknown' but do not publish it
if room_type_final == "unknown":
logging.info(
"Parsed room type 'unknown'; not updating IOProvider room_type."
)
return

allowed = {
"living_room",
"bedroom",
"study",
"kitchen",
"outdoor",
}

if room_type_final in allowed:
try:
self.io_provider.add_dynamic_variable("room_type", room_type_final)
logging.info(f"Parsed room type: {room_type_final}")
except Exception:
logging.exception("Failed to add dynamic variable 'room_type'")
else:
logging.warning(
f"Parsed room type '{room_type}' could not be normalized to allowed set."
)
else:
logging.debug("No room type found in message content.")

async def _poll(self) -> Optional[str]:
"""
Poll for new messages from the VLM service.

Checks the message buffer for new messages with a brief delay
to prevent excessive CPU usage.

Returns
-------
Optional[str]
The next message from the buffer if available, None otherwise
"""
await asyncio.sleep(0.5)
try:
message = self.message_buffer.get_nowait()
return message
except Empty:
return None

async def _raw_to_text(self, raw_input: str) -> Message:
"""
Process raw input to generate a timestamped message.

Creates a Message object from the raw input string, adding
the current timestamp.

Parameters
----------
raw_input : str
Raw input string to be processed

Returns
-------
Message
A timestamped message containing the processed input
"""
return Message(timestamp=time.time(), message=raw_input)

async def raw_to_text(self, raw_input: Optional[str]):
"""
Convert raw input to text and update message buffer.

Processes the raw input if present and adds the resulting
message to the internal message buffer.

Parameters
----------
raw_input : Optional[str]
Raw input to be processed, or None if no input is available
"""
if raw_input is None:
return

pending_message = await self._raw_to_text(raw_input)

if pending_message is not None:
self.messages.append(pending_message)

def formatted_latest_buffer(self) -> Optional[str]:
"""
Format and clear the latest buffer contents.

Retrieves the most recent message from the buffer, formats it
with timestamp and class name, adds it to the IO provider,
and clears the buffer.

Returns
-------
Optional[str]
Formatted string containing the latest message and metadata,
or None if the buffer is empty

"""
if len(self.messages) == 0:
return None

latest_message = self.messages[-1]

result = f"""
INPUT: {self.descriptor_for_LLM}
// START
{latest_message.message}
// END
"""

self.io_provider.add_input(
self.__class__.__name__, latest_message.message, latest_message.timestamp
)
self.messages = []

return result
Loading
Loading