Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 254 additions & 23 deletions frontends/desktop_pet_v2.pyw
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Desktop Pet with Skin System — Cross-platform with True Transparency"""
import os
import re
import sys
import json
import threading
Expand All @@ -12,6 +13,102 @@ PORT = 51983
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_DIR = os.path.dirname(SCRIPT_DIR)
SKINS_DIR = os.path.join(SCRIPT_DIR, 'skins')
PET_MARGIN_X = 24
PET_MARGIN_Y = 120
DEFAULT_WINDOW_TITLE = 'GenericAgent'


def _clamp(value, lo, hi):
return max(lo, min(value, hi))


def _pet_position(screen_left, screen_top, screen_width, screen_height, pet_width, pet_height):
max_x = max(screen_left, screen_left + screen_width - pet_width - PET_MARGIN_X)
max_y = max(screen_top, screen_top + screen_height - pet_height - PET_MARGIN_Y)
x_pos = _clamp(screen_left + screen_width - pet_width - PET_MARGIN_X, screen_left, max_x)
y_pos = _clamp(screen_top + screen_height - pet_height - PET_MARGIN_Y, screen_top, max_y)
return int(x_pos), int(y_pos)


def _parent_alive(parent_pid):
if not parent_pid:
return True
try:
os.kill(parent_pid, 0)
return True
except OSError:
return False


def _find_streamlit_screen_rect(window_title):
title = (window_title or DEFAULT_WINDOW_TITLE).lower()
if sys.platform == 'darwin':
try:
import Quartz
from AppKit import NSScreen
screens = list(NSScreen.screens() or [])
windows = Quartz.CGWindowListCopyWindowInfo(Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID) or []
for window in windows:
name = str(window.get(Quartz.kCGWindowName) or '').lower()
if title not in name:
continue
bounds = window.get(Quartz.kCGWindowBounds) or {}
center_x = float(bounds.get('X', 0)) + float(bounds.get('Width', 0)) / 2
for screen in screens:
frame = screen.visibleFrame()
left = frame.origin.x
right = left + frame.size.width
if left <= center_x <= right:
return frame.origin.x, frame.origin.y, frame.size.width, frame.size.height
if screens:
frame = screens[0].visibleFrame()
return frame.origin.x, frame.origin.y, frame.size.width, frame.size.height
except Exception:
pass
elif sys.platform == 'win32':
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
monitor_info_flag = 2
target = {'rect': None}

EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)

class RECT(ctypes.Structure):
_fields_ = [('left', ctypes.c_long), ('top', ctypes.c_long), ('right', ctypes.c_long), ('bottom', ctypes.c_long)]

class MONITORINFO(ctypes.Structure):
_fields_ = [('cbSize', wintypes.DWORD), ('rcMonitor', RECT), ('rcWork', RECT), ('dwFlags', wintypes.DWORD)]

def callback(hwnd, _):
if not user32.IsWindowVisible(hwnd):
return True
length = user32.GetWindowTextLengthW(hwnd)
if length <= 0:
return True
buf = ctypes.create_unicode_buffer(length + 1)
user32.GetWindowTextW(hwnd, buf, len(buf))
if title not in buf.value.lower():
return True
monitor = user32.MonitorFromWindow(hwnd, monitor_info_flag)
info = MONITORINFO()
info.cbSize = ctypes.sizeof(MONITORINFO)
if user32.GetMonitorInfoW(monitor, ctypes.byref(info)):
rect = info.rcWork
target['rect'] = (rect.left, rect.top, rect.right - rect.left, rect.bottom - rect.top)
return False
return True

user32.EnumWindows(EnumWindowsProc(callback), 0)
return target['rect']
except Exception:
pass
return None


def _rect_key(rect):
return tuple(int(v) for v in rect) if rect else None

class SkinLoader:
"""Load and parse skin configuration"""
Expand Down Expand Up @@ -108,9 +205,21 @@ def _load_default_font(size):
return ImageFont.load_default()


def _normalize_bubble_text(text):
"""Normalize text for fonts that cannot render some symbols."""
text = (text or '').strip()
lines = text.replace('\r\n', '\n').replace('\r', '\n').split('\n')
if lines:
turn_match = re.match(r'^\s*🔄?\s*Turn\s+(\d+)\s*$', lines[0], flags=re.IGNORECASE)
if turn_match:
rest = '\n'.join(line.strip() for line in lines[1:] if line.strip())
return f"Turn {turn_match.group(1)}: {rest}" if rest else f"Turn {turn_match.group(1)}:"
return text.replace('🔄 Turn', 'Turn').replace('🔄', '').strip()


def _wrap_text_for_width(draw, text, font, max_width):
"""Wrap text to fit inside max_width."""
text = (text or '').strip()
text = _normalize_bubble_text(text)
if not text:
return ['']

Expand Down Expand Up @@ -157,11 +266,11 @@ def build_bubble_image(message, max_width=220):
font = _load_default_font(font_size)
draw = ImageDraw.Draw(bubble)

pad_left = max(10, bubble.width // 18)
pad_right = max(10, bubble.width // 18) + max(6, bubble.width // 14)
pad_left = max(12, bubble.width // 16)
pad_right = max(16, bubble.width // 10)
pad_top = max(8, bubble.height // 10)
pad_bottom = max(18, bubble.height // 4)
text_area_width = max(40, bubble.width - pad_left - pad_right)
pad_bottom = max(20, bubble.height // 3)
text_area_width = max(36, bubble.width - pad_left - pad_right)

lines = _wrap_text_for_width(draw, message, font, text_area_width)
ascent, descent = font.getmetrics() if hasattr(font, 'getmetrics') else (font_size, font_size // 4)
Expand Down Expand Up @@ -213,6 +322,12 @@ def build_bubble_image(message, max_width=220):
class PetBase:
"""Shared logic for Mac and Windows pet implementations."""

def __init__(self, parent_pid=None, window_title=DEFAULT_WINDOW_TITLE):
self.parent_pid = parent_pid
self.window_title = window_title or DEFAULT_WINDOW_TITLE
self._screen_timer = None
self._tracked_screen_rect = None

def _schedule_main(self, fn):
"""Schedule fn on the GUI main thread. Subclasses must override."""
raise NotImplementedError
Expand All @@ -225,6 +340,31 @@ class PetBase:
"""Thread-safe wrapper for show_toast."""
self._schedule_main(lambda m=message: self.show_toast(m))

def _close(self):
raise NotImplementedError

def _current_position(self):
raise NotImplementedError

def _move_to(self, x_pos, y_pos):
raise NotImplementedError

def _sync_to_target_screen(self):
raise NotImplementedError

def _start_screen_tracking(self):
def tick():
if not _parent_alive(self.parent_pid):
self._close()
return
self._sync_to_target_screen()
self._screen_timer = self._schedule_delay(1000, tick)
self._tracked_screen_rect = _rect_key(_find_streamlit_screen_rect(self.window_title))
self._screen_timer = self._schedule_delay(1000, tick)

def _schedule_delay(self, delay_ms, fn):
raise NotImplementedError

def _start_server(self):
"""Start HTTP control server."""
pet = self
Expand Down Expand Up @@ -293,7 +433,8 @@ if sys.platform == 'darwin':
import objc

class MacPet(PetBase):
def __init__(self, skin_name=None):
def __init__(self, skin_name=None, parent_pid=None, window_title=DEFAULT_WINDOW_TITLE):
super().__init__(parent_pid=parent_pid, window_title=window_title)
self.app = NSApplication.sharedApplication()
self.app.setActivationPolicy_(NSApplicationActivationPolicyAccessory)

Expand All @@ -302,14 +443,20 @@ if sys.platform == 'darwin':

# Get screen size
from AppKit import NSScreen, NSWindowCollectionBehaviorCanJoinAllSpaces, NSWindowCollectionBehaviorStationary
screen = NSScreen.mainScreen()
screen_frame = screen.frame()
screen_width = screen_frame.size.width
screen_height = screen_frame.size.height

# Position at right side
x_pos = screen_width - 200
y_pos = 300
screen = NSScreen.mainScreen() or NSScreen.screens()[0]
rect = _find_streamlit_screen_rect(self.window_title)
if rect:
x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height)
else:
screen_frame = screen.visibleFrame()
x_pos, y_pos = _pet_position(
screen_frame.origin.x,
screen_frame.origin.y,
screen_frame.size.width,
screen_frame.size.height,
self.display_width,
self.display_height,
)

# Create transparent window
self.window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_(
Expand Down Expand Up @@ -413,6 +560,7 @@ if sys.platform == 'darwin':

# Show window
self.window.makeKeyAndOrderFront_(None)
self._start_screen_tracking()

# Start HTTP server
self._start_server()
Expand Down Expand Up @@ -501,6 +649,39 @@ if sys.platform == 'darwin':
def _schedule_main(self, fn):
AppHelper.callAfter(fn)

def _schedule_delay(self, delay_ms, fn):
return NSTimer.scheduledTimerWithTimeInterval_repeats_block_(delay_ms / 1000.0, False, lambda _: fn())

def _current_position(self):
frame = self.window.frame()
return int(frame.origin.x), int(frame.origin.y)

def _move_to(self, x_pos, y_pos):
self.window.setFrameOrigin_(NSMakePoint(x_pos, y_pos))

def _close(self):
if self.toast_timer:
self.toast_timer.invalidate()
self.toast_timer = None
if self._screen_timer:
self._screen_timer.invalidate()
self._screen_timer = None
if self.window:
self.window.close()
NSApp.terminate_(None)

def _sync_to_target_screen(self):
rect = _find_streamlit_screen_rect(self.window_title)
rect_key = _rect_key(rect)
if not rect_key:
return
current_x, current_y = self._current_position()
if rect_key != self._tracked_screen_rect:
self._tracked_screen_rect = rect_key
x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height)
if (current_x, current_y) != (x_pos, y_pos):
self._move_to(x_pos, y_pos)

def show_toast(self, message):
"""Show toast message above pet"""
from AppKit import NSImageView
Expand Down Expand Up @@ -588,19 +769,31 @@ else:
from PIL import ImageTk

class WinPet(PetBase):
def __init__(self, skin_name=None):
def __init__(self, skin_name=None, parent_pid=None, window_title=DEFAULT_WINDOW_TITLE):
super().__init__(parent_pid=parent_pid, window_title=window_title)
self.root = tk.Tk()
self.root.wm_attributes('-topmost', True)

# Load skin
self.load_skin(skin_name)

# Setup window
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()

x_pos = screen_width - 200
y_pos = screen_height - 300
rect = _find_streamlit_screen_rect(self.window_title)
if rect:
x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height)
else:
screen_left = self.root.winfo_vrootx()
screen_top = self.root.winfo_vrooty()
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
x_pos, y_pos = _pet_position(
screen_left,
screen_top,
screen_width,
screen_height,
self.display_width,
self.display_height,
)

self.root.geometry(f'{self.display_width}x{self.display_height}+{x_pos}+{y_pos}')
self.root.overrideredirect(True)
Expand Down Expand Up @@ -630,6 +823,7 @@ else:
# Start animation
self._animate()
self._start_server()
self._start_screen_tracking()

print(f"✓ Windows Pet started at ({x_pos}, {y_pos})")
print(f" Animations: {', '.join(self.animations.keys())}")
Expand Down Expand Up @@ -754,13 +948,50 @@ else:
def _schedule_main(self, fn):
self.root.after(0, fn)

def _schedule_delay(self, delay_ms, fn):
return self.root.after(delay_ms, fn)

def _current_position(self):
return self.root.winfo_x(), self.root.winfo_y()

def _move_to(self, x_pos, y_pos):
self.root.geometry(f'+{x_pos}+{y_pos}')

def _close(self):
if self.toast_window:
try:
self.toast_window.destroy()
except:
pass
self.toast_window = None
self.root.destroy()

def _sync_to_target_screen(self):
rect = _find_streamlit_screen_rect(self.window_title)
rect_key = _rect_key(rect)
if not rect_key:
return
current_x, current_y = self._current_position()
if rect_key != self._tracked_screen_rect:
self._tracked_screen_rect = rect_key
x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height)
if (current_x, current_y) != (x_pos, y_pos):
self._move_to(x_pos, y_pos)

def run(self):
"""Run the application (already in mainloop)"""
pass

if __name__ == '__main__':
# Singleton: if port already in use, another instance is running
import argparse
import socket

parser = argparse.ArgumentParser()
parser.add_argument('--parent-pid', type=int, default=0)
parser.add_argument('--window-title', default=DEFAULT_WINDOW_TITLE)
args = parser.parse_args()

# Singleton: if port already in use, another instance is running
_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
_s.connect(('127.0.0.1', PORT))
Expand All @@ -771,7 +1002,7 @@ if __name__ == '__main__':
pass

if sys.platform == 'darwin':
pet = MacPet()
pet = MacPet(parent_pid=args.parent_pid, window_title=args.window_title)
pet.run()
else:
pet = WinPet('vita')
pet = WinPet(parent_pid=args.parent_pid, window_title=args.window_title)
2 changes: 1 addition & 1 deletion frontends/stapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def render_sidebar():
kwargs = {'creationflags': 0x08} if sys.platform == 'win32' else {}
pet_script = os.path.join(os.path.dirname(__file__), 'desktop_pet_v2.pyw')
if not os.path.exists(pet_script): pet_script = os.path.join(os.path.dirname(__file__), 'desktop_pet.pyw')
subprocess.Popen([sys.executable, pet_script], **kwargs)
subprocess.Popen([sys.executable, pet_script, '--parent-pid', str(os.getpid()), '--window-title', 'GenericAgent'], **kwargs)
def _pet_req(q):
def _do():
try: urlopen(f'http://127.0.0.1:51983/?{q}', timeout=2)
Expand Down