diff --git a/frontends/desktop_pet_v2.pyw b/frontends/desktop_pet_v2.pyw index 1eb96ff..6e542f5 100644 --- a/frontends/desktop_pet_v2.pyw +++ b/frontends/desktop_pet_v2.pyw @@ -1,5 +1,6 @@ """Desktop Pet with Skin System — Cross-platform with True Transparency""" import os +import re import sys import json import threading @@ -12,6 +13,102 @@ PORT = 51983 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_DIR = os.path.dirname(SCRIPT_DIR) SKINS_DIR = os.path.join(SCRIPT_DIR, 'skins') +PET_MARGIN_X = 24 +PET_MARGIN_Y = 120 +DEFAULT_WINDOW_TITLE = 'GenericAgent' + + +def _clamp(value, lo, hi): + return max(lo, min(value, hi)) + + +def _pet_position(screen_left, screen_top, screen_width, screen_height, pet_width, pet_height): + max_x = max(screen_left, screen_left + screen_width - pet_width - PET_MARGIN_X) + max_y = max(screen_top, screen_top + screen_height - pet_height - PET_MARGIN_Y) + x_pos = _clamp(screen_left + screen_width - pet_width - PET_MARGIN_X, screen_left, max_x) + y_pos = _clamp(screen_top + screen_height - pet_height - PET_MARGIN_Y, screen_top, max_y) + return int(x_pos), int(y_pos) + + +def _parent_alive(parent_pid): + if not parent_pid: + return True + try: + os.kill(parent_pid, 0) + return True + except OSError: + return False + + +def _find_streamlit_screen_rect(window_title): + title = (window_title or DEFAULT_WINDOW_TITLE).lower() + if sys.platform == 'darwin': + try: + import Quartz + from AppKit import NSScreen + screens = list(NSScreen.screens() or []) + windows = Quartz.CGWindowListCopyWindowInfo(Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID) or [] + for window in windows: + name = str(window.get(Quartz.kCGWindowName) or '').lower() + if title not in name: + continue + bounds = window.get(Quartz.kCGWindowBounds) or {} + center_x = float(bounds.get('X', 0)) + float(bounds.get('Width', 0)) / 2 + for screen in screens: + frame = screen.visibleFrame() + left = frame.origin.x + right = left + frame.size.width + if left <= center_x <= right: + return frame.origin.x, frame.origin.y, frame.size.width, frame.size.height + if screens: + frame = screens[0].visibleFrame() + return frame.origin.x, frame.origin.y, frame.size.width, frame.size.height + except Exception: + pass + elif sys.platform == 'win32': + try: + import ctypes + from ctypes import wintypes + user32 = ctypes.windll.user32 + monitor_info_flag = 2 + target = {'rect': None} + + EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM) + + class RECT(ctypes.Structure): + _fields_ = [('left', ctypes.c_long), ('top', ctypes.c_long), ('right', ctypes.c_long), ('bottom', ctypes.c_long)] + + class MONITORINFO(ctypes.Structure): + _fields_ = [('cbSize', wintypes.DWORD), ('rcMonitor', RECT), ('rcWork', RECT), ('dwFlags', wintypes.DWORD)] + + def callback(hwnd, _): + if not user32.IsWindowVisible(hwnd): + return True + length = user32.GetWindowTextLengthW(hwnd) + if length <= 0: + return True + buf = ctypes.create_unicode_buffer(length + 1) + user32.GetWindowTextW(hwnd, buf, len(buf)) + if title not in buf.value.lower(): + return True + monitor = user32.MonitorFromWindow(hwnd, monitor_info_flag) + info = MONITORINFO() + info.cbSize = ctypes.sizeof(MONITORINFO) + if user32.GetMonitorInfoW(monitor, ctypes.byref(info)): + rect = info.rcWork + target['rect'] = (rect.left, rect.top, rect.right - rect.left, rect.bottom - rect.top) + return False + return True + + user32.EnumWindows(EnumWindowsProc(callback), 0) + return target['rect'] + except Exception: + pass + return None + + +def _rect_key(rect): + return tuple(int(v) for v in rect) if rect else None class SkinLoader: """Load and parse skin configuration""" @@ -108,9 +205,21 @@ def _load_default_font(size): return ImageFont.load_default() +def _normalize_bubble_text(text): + """Normalize text for fonts that cannot render some symbols.""" + text = (text or '').strip() + lines = text.replace('\r\n', '\n').replace('\r', '\n').split('\n') + if lines: + turn_match = re.match(r'^\s*🔄?\s*Turn\s+(\d+)\s*$', lines[0], flags=re.IGNORECASE) + if turn_match: + rest = '\n'.join(line.strip() for line in lines[1:] if line.strip()) + return f"Turn {turn_match.group(1)}: {rest}" if rest else f"Turn {turn_match.group(1)}:" + return text.replace('🔄 Turn', 'Turn').replace('🔄', '').strip() + + def _wrap_text_for_width(draw, text, font, max_width): """Wrap text to fit inside max_width.""" - text = (text or '').strip() + text = _normalize_bubble_text(text) if not text: return [''] @@ -157,11 +266,11 @@ def build_bubble_image(message, max_width=220): font = _load_default_font(font_size) draw = ImageDraw.Draw(bubble) - pad_left = max(10, bubble.width // 18) - pad_right = max(10, bubble.width // 18) + max(6, bubble.width // 14) + pad_left = max(12, bubble.width // 16) + pad_right = max(16, bubble.width // 10) pad_top = max(8, bubble.height // 10) - pad_bottom = max(18, bubble.height // 4) - text_area_width = max(40, bubble.width - pad_left - pad_right) + pad_bottom = max(20, bubble.height // 3) + text_area_width = max(36, bubble.width - pad_left - pad_right) lines = _wrap_text_for_width(draw, message, font, text_area_width) ascent, descent = font.getmetrics() if hasattr(font, 'getmetrics') else (font_size, font_size // 4) @@ -213,6 +322,12 @@ def build_bubble_image(message, max_width=220): class PetBase: """Shared logic for Mac and Windows pet implementations.""" + def __init__(self, parent_pid=None, window_title=DEFAULT_WINDOW_TITLE): + self.parent_pid = parent_pid + self.window_title = window_title or DEFAULT_WINDOW_TITLE + self._screen_timer = None + self._tracked_screen_rect = None + def _schedule_main(self, fn): """Schedule fn on the GUI main thread. Subclasses must override.""" raise NotImplementedError @@ -225,6 +340,31 @@ class PetBase: """Thread-safe wrapper for show_toast.""" self._schedule_main(lambda m=message: self.show_toast(m)) + def _close(self): + raise NotImplementedError + + def _current_position(self): + raise NotImplementedError + + def _move_to(self, x_pos, y_pos): + raise NotImplementedError + + def _sync_to_target_screen(self): + raise NotImplementedError + + def _start_screen_tracking(self): + def tick(): + if not _parent_alive(self.parent_pid): + self._close() + return + self._sync_to_target_screen() + self._screen_timer = self._schedule_delay(1000, tick) + self._tracked_screen_rect = _rect_key(_find_streamlit_screen_rect(self.window_title)) + self._screen_timer = self._schedule_delay(1000, tick) + + def _schedule_delay(self, delay_ms, fn): + raise NotImplementedError + def _start_server(self): """Start HTTP control server.""" pet = self @@ -293,7 +433,8 @@ if sys.platform == 'darwin': import objc class MacPet(PetBase): - def __init__(self, skin_name=None): + def __init__(self, skin_name=None, parent_pid=None, window_title=DEFAULT_WINDOW_TITLE): + super().__init__(parent_pid=parent_pid, window_title=window_title) self.app = NSApplication.sharedApplication() self.app.setActivationPolicy_(NSApplicationActivationPolicyAccessory) @@ -302,14 +443,20 @@ if sys.platform == 'darwin': # Get screen size from AppKit import NSScreen, NSWindowCollectionBehaviorCanJoinAllSpaces, NSWindowCollectionBehaviorStationary - screen = NSScreen.mainScreen() - screen_frame = screen.frame() - screen_width = screen_frame.size.width - screen_height = screen_frame.size.height - - # Position at right side - x_pos = screen_width - 200 - y_pos = 300 + screen = NSScreen.mainScreen() or NSScreen.screens()[0] + rect = _find_streamlit_screen_rect(self.window_title) + if rect: + x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height) + else: + screen_frame = screen.visibleFrame() + x_pos, y_pos = _pet_position( + screen_frame.origin.x, + screen_frame.origin.y, + screen_frame.size.width, + screen_frame.size.height, + self.display_width, + self.display_height, + ) # Create transparent window self.window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_( @@ -413,6 +560,7 @@ if sys.platform == 'darwin': # Show window self.window.makeKeyAndOrderFront_(None) + self._start_screen_tracking() # Start HTTP server self._start_server() @@ -501,6 +649,39 @@ if sys.platform == 'darwin': def _schedule_main(self, fn): AppHelper.callAfter(fn) + def _schedule_delay(self, delay_ms, fn): + return NSTimer.scheduledTimerWithTimeInterval_repeats_block_(delay_ms / 1000.0, False, lambda _: fn()) + + def _current_position(self): + frame = self.window.frame() + return int(frame.origin.x), int(frame.origin.y) + + def _move_to(self, x_pos, y_pos): + self.window.setFrameOrigin_(NSMakePoint(x_pos, y_pos)) + + def _close(self): + if self.toast_timer: + self.toast_timer.invalidate() + self.toast_timer = None + if self._screen_timer: + self._screen_timer.invalidate() + self._screen_timer = None + if self.window: + self.window.close() + NSApp.terminate_(None) + + def _sync_to_target_screen(self): + rect = _find_streamlit_screen_rect(self.window_title) + rect_key = _rect_key(rect) + if not rect_key: + return + current_x, current_y = self._current_position() + if rect_key != self._tracked_screen_rect: + self._tracked_screen_rect = rect_key + x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height) + if (current_x, current_y) != (x_pos, y_pos): + self._move_to(x_pos, y_pos) + def show_toast(self, message): """Show toast message above pet""" from AppKit import NSImageView @@ -588,7 +769,8 @@ else: from PIL import ImageTk class WinPet(PetBase): - def __init__(self, skin_name=None): + def __init__(self, skin_name=None, parent_pid=None, window_title=DEFAULT_WINDOW_TITLE): + super().__init__(parent_pid=parent_pid, window_title=window_title) self.root = tk.Tk() self.root.wm_attributes('-topmost', True) @@ -596,11 +778,22 @@ else: self.load_skin(skin_name) # Setup window - screen_width = self.root.winfo_screenwidth() - screen_height = self.root.winfo_screenheight() - - x_pos = screen_width - 200 - y_pos = screen_height - 300 + rect = _find_streamlit_screen_rect(self.window_title) + if rect: + x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height) + else: + screen_left = self.root.winfo_vrootx() + screen_top = self.root.winfo_vrooty() + screen_width = self.root.winfo_screenwidth() + screen_height = self.root.winfo_screenheight() + x_pos, y_pos = _pet_position( + screen_left, + screen_top, + screen_width, + screen_height, + self.display_width, + self.display_height, + ) self.root.geometry(f'{self.display_width}x{self.display_height}+{x_pos}+{y_pos}') self.root.overrideredirect(True) @@ -630,6 +823,7 @@ else: # Start animation self._animate() self._start_server() + self._start_screen_tracking() print(f"✓ Windows Pet started at ({x_pos}, {y_pos})") print(f" Animations: {', '.join(self.animations.keys())}") @@ -754,13 +948,50 @@ else: def _schedule_main(self, fn): self.root.after(0, fn) + def _schedule_delay(self, delay_ms, fn): + return self.root.after(delay_ms, fn) + + def _current_position(self): + return self.root.winfo_x(), self.root.winfo_y() + + def _move_to(self, x_pos, y_pos): + self.root.geometry(f'+{x_pos}+{y_pos}') + + def _close(self): + if self.toast_window: + try: + self.toast_window.destroy() + except: + pass + self.toast_window = None + self.root.destroy() + + def _sync_to_target_screen(self): + rect = _find_streamlit_screen_rect(self.window_title) + rect_key = _rect_key(rect) + if not rect_key: + return + current_x, current_y = self._current_position() + if rect_key != self._tracked_screen_rect: + self._tracked_screen_rect = rect_key + x_pos, y_pos = _pet_position(*rect, self.display_width, self.display_height) + if (current_x, current_y) != (x_pos, y_pos): + self._move_to(x_pos, y_pos) + def run(self): """Run the application (already in mainloop)""" pass if __name__ == '__main__': - # Singleton: if port already in use, another instance is running + import argparse import socket + + parser = argparse.ArgumentParser() + parser.add_argument('--parent-pid', type=int, default=0) + parser.add_argument('--window-title', default=DEFAULT_WINDOW_TITLE) + args = parser.parse_args() + + # Singleton: if port already in use, another instance is running _s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: _s.connect(('127.0.0.1', PORT)) @@ -771,7 +1002,7 @@ if __name__ == '__main__': pass if sys.platform == 'darwin': - pet = MacPet() + pet = MacPet(parent_pid=args.parent_pid, window_title=args.window_title) pet.run() else: - pet = WinPet('vita') + pet = WinPet(parent_pid=args.parent_pid, window_title=args.window_title) diff --git a/frontends/stapp.py b/frontends/stapp.py index baf312f..83cc745 100644 --- a/frontends/stapp.py +++ b/frontends/stapp.py @@ -47,7 +47,7 @@ def render_sidebar(): kwargs = {'creationflags': 0x08} if sys.platform == 'win32' else {} pet_script = os.path.join(os.path.dirname(__file__), 'desktop_pet_v2.pyw') if not os.path.exists(pet_script): pet_script = os.path.join(os.path.dirname(__file__), 'desktop_pet.pyw') - subprocess.Popen([sys.executable, pet_script], **kwargs) + subprocess.Popen([sys.executable, pet_script, '--parent-pid', str(os.getpid()), '--window-title', 'GenericAgent'], **kwargs) def _pet_req(q): def _do(): try: urlopen(f'http://127.0.0.1:51983/?{q}', timeout=2)