From afc886ff21c80204888420ba4ad0b712f1345ca3 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 16:16:41 -0700 Subject: [PATCH 01/21] add new tsunami view type --- frontend/app/block/block.tsx | 2 ++ frontend/app/view/tsunami/tsunami.tsx | 47 +++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 frontend/app/view/tsunami/tsunami.tsx diff --git a/frontend/app/block/block.tsx b/frontend/app/block/block.tsx index 4d849de9d1..8665e8ed91 100644 --- a/frontend/app/block/block.tsx +++ b/frontend/app/block/block.tsx @@ -12,6 +12,7 @@ import { import { LauncherViewModel } from "@/app/view/launcher/launcher"; import { PreviewModel } from "@/app/view/preview/preview"; import { SysinfoViewModel } from "@/app/view/sysinfo/sysinfo"; +import { TsunamiViewModel } from "@/app/view/tsunami/tsunami"; import { VDomModel } from "@/app/view/vdom/vdom-model"; import { ErrorBoundary } from "@/element/errorboundary"; import { CenteredDiv } from "@/element/quickelems"; @@ -48,6 +49,7 @@ BlockRegistry.set("vdom", VDomModel); BlockRegistry.set("tips", QuickTipsViewModel); BlockRegistry.set("help", HelpViewModel); BlockRegistry.set("launcher", LauncherViewModel); +BlockRegistry.set("tsunami", TsunamiViewModel); function makeViewModel(blockId: string, blockView: string, nodeModel: BlockNodeModel): ViewModel { const ctor = BlockRegistry.get(blockView); diff --git a/frontend/app/view/tsunami/tsunami.tsx b/frontend/app/view/tsunami/tsunami.tsx new file mode 100644 index 0000000000..43bc88ee88 --- /dev/null +++ b/frontend/app/view/tsunami/tsunami.tsx @@ -0,0 +1,47 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { BlockNodeModel } from "@/app/block/blocktypes"; +import { WOS } from "@/app/store/global"; +import * as jotai from "jotai"; +import { memo } from "react"; + +class TsunamiViewModel implements ViewModel { + viewType: string; + blockAtom: jotai.Atom; + blockId: string; + viewIcon: jotai.Atom; + viewName: jotai.Atom; + + constructor(blockId: string, nodeModel: BlockNodeModel) { + this.viewType = "tsunami"; + this.blockId = blockId; + this.blockAtom = WOS.getWaveObjectAtom(`block:${blockId}`); + this.viewIcon = jotai.atom("cube"); + this.viewName = jotai.atom("Tsunami"); + } + + get viewComponent(): ViewComponent { + return TsunamiView; + } + + getSettingsMenuItems(): ContextMenuItem[] { + return []; + } +} + +type TsunamiViewProps = { + model: TsunamiViewModel; +}; + +const TsunamiView = memo(({ model }: TsunamiViewProps) => { + return ( +
+

Tsunami

+
+ ); +}); + +TsunamiView.displayName = "TsunamiView"; + +export { TsunamiViewModel }; \ No newline at end of file From 6b4337daa6ec40a9593252abb899996247e9f0c9 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 16:22:47 -0700 Subject: [PATCH 02/21] pull a clientid off the URL --- tsunami/frontend/src/util/clientid.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tsunami/frontend/src/util/clientid.ts b/tsunami/frontend/src/util/clientid.ts index 4217e291ae..f6dc5f2e8d 100644 --- a/tsunami/frontend/src/util/clientid.ts +++ b/tsunami/frontend/src/util/clientid.ts @@ -9,6 +9,14 @@ const CLIENT_ID_KEY = "tsunami:clientid"; * If no client ID exists, a new UUID is generated and stored. */ export function getOrCreateClientId(): string { + // First check for URL parameter + const urlParams = new URLSearchParams(window.location.search); + const urlClientId = urlParams.get("clientid"); + if (urlClientId) { + return urlClientId; + } + + // Fall back to session storage let clientId = sessionStorage.getItem(CLIENT_ID_KEY); if (!clientId) { clientId = crypto.randomUUID(); From 8fe056cced96cbd5fd9e7e3f5989d19ccd545158 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 16:25:35 -0700 Subject: [PATCH 03/21] some tsunami meta --- frontend/types/gotypes.d.ts | 4 ++++ pkg/waveobj/metaconsts.go | 5 +++++ pkg/waveobj/wtypemeta.go | 5 +++++ 3 files changed, 14 insertions(+) diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 0d70582e48..7e46356f00 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -590,6 +590,10 @@ declare global { "web:partition"?: string; "markdown:fontsize"?: number; "markdown:fixedfontsize"?: number; + "tsunami:*"?: boolean; + "tsunami:sdkreplacepath"?: string; + "tsunami:appdir"?: string; + "tsunami:scaffoldpath"?: string; "vdom:*"?: boolean; "vdom:initialized"?: boolean; "vdom:correlationid"?: string; diff --git a/pkg/waveobj/metaconsts.go b/pkg/waveobj/metaconsts.go index 0f3eb8fc09..e40558c70d 100644 --- a/pkg/waveobj/metaconsts.go +++ b/pkg/waveobj/metaconsts.go @@ -114,6 +114,11 @@ const ( MetaKey_MarkdownFontSize = "markdown:fontsize" MetaKey_MarkdownFixedFontSize = "markdown:fixedfontsize" + MetaKey_TsunamiClear = "tsunami:*" + MetaKey_TsunamiSdkReplacePath = "tsunami:sdkreplacepath" + MetaKey_TsunamiAppDir = "tsunami:appdir" + MetaKey_TsunamiScaffoldPath = "tsunami:scaffoldpath" + MetaKey_VDomClear = "vdom:*" MetaKey_VDomInitialized = "vdom:initialized" MetaKey_VDomCorrelationId = "vdom:correlationid" diff --git a/pkg/waveobj/wtypemeta.go b/pkg/waveobj/wtypemeta.go index 6fffeb9b7a..bff69545f8 100644 --- a/pkg/waveobj/wtypemeta.go +++ b/pkg/waveobj/wtypemeta.go @@ -117,6 +117,11 @@ type MetaTSType struct { MarkdownFontSize float64 `json:"markdown:fontsize,omitempty"` MarkdownFixedFontSize float64 `json:"markdown:fixedfontsize,omitempty"` + TsunamiClear bool `json:"tsunami:*,omitempty"` + TsunamiSdkReplacePath string `json:"tsunami:sdkreplacepath,omitempty"` + TsunamiAppDir string `json:"tsunami:appdir,omitempty"` + TsunamiScaffoldPath string `json:"tsunami:scaffoldpath,omitempty"` + VDomClear bool `json:"vdom:*,omitempty"` VDomInitialized bool `json:"vdom:initialized,omitempty"` VDomCorrelationId string `json:"vdom:correlationid,omitempty"` From d0c94c8c2f5616dd1c69e56ec8a69cdf3fff7555 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 16:47:53 -0700 Subject: [PATCH 04/21] nodepath option in tsunami. use nodepath to run tailwind instead of npx --- Taskfile.yml | 2 +- tsunami/build/build.go | 102 +++++++++++++++++++----------------- tsunami/cmd/main-tsunami.go | 7 +++ 3 files changed, 62 insertions(+), 49 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index 27ae3a03ac..1a724bbd4d 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -498,7 +498,7 @@ tasks: - cd scaffold && npm pkg delete author - cd scaffold && npm pkg set author.name="Command Line Inc" - cd scaffold && npm pkg set author.email="info@commandline.dev" - - cd scaffold && npm --no-workspaces install tailwindcss @tailwindcss/cli + - cd scaffold && npm --no-workspaces install tailwindcss@4.1.12 @tailwindcss/cli@4.1.12 - cp -r dist scaffold/ - cp ../templates/app-main.go.tmpl scaffold/app-main.go - cp ../templates/tailwind.css scaffold/ diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 7ac516760e..84ceef0568 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -34,6 +34,7 @@ type BuildOpts struct { OutputFile string ScaffoldPath string SdkReplacePath string + NodePath string } type BuildEnv struct { @@ -42,6 +43,13 @@ type BuildEnv struct { cleanupOnce *sync.Once } +func (opts BuildOpts) getNodePath() string { + if opts.NodePath != "" { + return opts.NodePath + } + return "node" +} + func findGoExecutable() (string, error) { // First try the standard PATH lookup if goPath, err := exec.LookPath("go"); err == nil { @@ -50,7 +58,7 @@ func findGoExecutable() (string, error) { // Define platform-specific paths to check var pathsToCheck []string - + if runtime.GOOS == "windows" { pathsToCheck = []string{ `c:\go\bin\go.exe`, @@ -59,10 +67,10 @@ func findGoExecutable() (string, error) { } else { // Unix-like systems (macOS, Linux, etc.) pathsToCheck = []string{ - "/opt/homebrew/bin/go", // Homebrew on Apple Silicon - "/usr/local/bin/go", // Traditional Homebrew or manual install - "/usr/local/go/bin/go", // Official Go installation - "/usr/bin/go", // System package manager + "/opt/homebrew/bin/go", // Homebrew on Apple Silicon + "/usr/local/bin/go", // Traditional Homebrew or manual install + "/usr/local/go/bin/go", // Official Go installation + "/usr/bin/go", // System package manager } } @@ -79,7 +87,7 @@ func findGoExecutable() (string, error) { return "", fmt.Errorf("go command not found in PATH or common installation locations") } -func verifyEnvironment(verbose bool) (*BuildEnv, error) { +func verifyEnvironment(verbose bool, opts BuildOpts) (*BuildEnv, error) { // Find Go executable using enhanced search goPath, err := findGoExecutable() if err != nil { @@ -120,45 +128,42 @@ func verifyEnvironment(verbose bool) (*BuildEnv, error) { return nil, fmt.Errorf("go version 1.%d or higher required, found: %s", MinSupportedGoMinorVersion, versionStr) } - // Check if npx is in PATH - _, err = exec.LookPath("npx") - if err != nil { - return nil, fmt.Errorf("npx command not found in PATH: %w", err) - } - - if verbose { - log.Printf("Found npx in PATH") - } - - // Check Tailwind CSS version - tailwindCmd := exec.Command("npx", "@tailwindcss/cli") - tailwindOutput, err := tailwindCmd.CombinedOutput() - if err != nil { - return nil, fmt.Errorf("failed to run 'npx @tailwindcss/cli': %w", err) - } - - tailwindStr := strings.TrimSpace(string(tailwindOutput)) - lines := strings.Split(tailwindStr, "\n") - if len(lines) == 0 { - return nil, fmt.Errorf("no output from tailwindcss command") - } - - firstLine := lines[0] - if verbose { - log.Printf("Found %s", firstLine) - } - - // Check for v4 (format: "≈ tailwindcss v4.1.12") - tailwindRegex := regexp.MustCompile(`tailwindcss v(\d+)`) - tailwindMatches := tailwindRegex.FindStringSubmatch(firstLine) - if len(tailwindMatches) < 2 { - return nil, fmt.Errorf("unable to parse tailwindcss version from: %s", firstLine) + // Check if node is available + if opts.NodePath != "" { + // Custom node path specified - verify it's absolute and executable + if !filepath.IsAbs(opts.NodePath) { + return nil, fmt.Errorf("NodePath must be an absolute path, got: %s", opts.NodePath) + } + + info, err := os.Stat(opts.NodePath) + if err != nil { + return nil, fmt.Errorf("NodePath does not exist: %s: %w", opts.NodePath, err) + } + + if info.IsDir() { + return nil, fmt.Errorf("NodePath is a directory, not an executable: %s", opts.NodePath) + } + + // Check if file is executable (Unix-like systems) + if runtime.GOOS != "windows" && info.Mode()&0111 == 0 { + return nil, fmt.Errorf("NodePath is not executable: %s", opts.NodePath) + } + + if verbose { + log.Printf("Using custom node path: %s", opts.NodePath) + } + } else { + // Use standard PATH lookup + _, err = exec.LookPath("node") + if err != nil { + return nil, fmt.Errorf("node command not found in PATH: %w", err) + } + + if verbose { + log.Printf("Found node in PATH") + } } - majorVersion, err := strconv.Atoi(tailwindMatches[1]) - if err != nil || majorVersion != 4 { - return nil, fmt.Errorf("tailwindcss v4 required, found: %s", firstLine) - } return &BuildEnv{ GoVersion: goVersion, @@ -444,7 +449,7 @@ func TsunamiBuild(opts BuildOpts) error { } func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { - buildEnv, err := verifyEnvironment(opts.Verbose) + buildEnv, err := verifyEnvironment(opts.Verbose, opts) if err != nil { return nil, err } @@ -467,7 +472,7 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { log.Printf("Building tsunami app from %s\n", opts.Dir) - if opts.Verbose { + if opts.Verbose || opts.KeepTemp { log.Printf("Temp dir: %s\n", tempDir) } @@ -529,7 +534,7 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { } // Generate Tailwind CSS - if err := generateAppTailwindCss(tempDir, opts.Verbose); err != nil { + if err := generateAppTailwindCss(tempDir, opts.Verbose, opts); err != nil { return buildEnv, fmt.Errorf("failed to generate tailwind css: %w", err) } @@ -643,14 +648,15 @@ func runGoBuild(tempDir string, opts BuildOpts) error { return nil } -func generateAppTailwindCss(tempDir string, verbose bool) error { +func generateAppTailwindCss(tempDir string, verbose bool, opts BuildOpts) error { // tailwind.css is already in tempDir from scaffold copy tailwindOutput := filepath.Join(tempDir, "static", "tw.css") - tailwindCmd := exec.Command("npx", "@tailwindcss/cli", + tailwindCmd := exec.Command(opts.getNodePath(), "node_modules/@tailwindcss/cli/dist/index.mjs", "-i", "./tailwind.css", "-o", tailwindOutput) tailwindCmd.Dir = tempDir + tailwindCmd.Env = append(os.Environ(), "ELECTRON_RUN_AS_NODE=1") if verbose { log.Printf("Running: %s", strings.Join(tailwindCmd.Args, " ")) diff --git a/tsunami/cmd/main-tsunami.go b/tsunami/cmd/main-tsunami.go index 1ca4ffefd8..04343884cd 100644 --- a/tsunami/cmd/main-tsunami.go +++ b/tsunami/cmd/main-tsunami.go @@ -12,6 +12,7 @@ import ( const ( EnvTsunamiScaffoldPath = "TSUNAMI_SCAFFOLDPATH" EnvTsunamiSdkReplacePath = "TSUNAMI_SDKREPLACEPATH" + EnvTsunamiNodePath = "TSUNAMI_NODEPATH" ) // these are set at build time @@ -46,6 +47,12 @@ func validateEnvironmentVars(opts *build.BuildOpts) error { opts.ScaffoldPath = scaffoldPath opts.SdkReplacePath = sdkReplacePath + + // NodePath is optional + if nodePath := os.Getenv(EnvTsunamiNodePath); nodePath != "" { + opts.NodePath = nodePath + } + return nil } From be316574887859b34eee35dff755461433cf9341 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 17:10:12 -0700 Subject: [PATCH 05/21] pass the electron exec path through to wavesrv, use for tsunami --- emain/emain-util.ts | 14 ++++++- emain/emain-wavesrv.ts | 3 +- go.mod | 3 ++ pkg/blockcontroller/tsunamicontroller.go | 47 ++++++++++++++++++++++++ pkg/wavebase/wavebase.go | 28 +++++++++----- tsunami/build/build.go | 11 +++--- 6 files changed, 87 insertions(+), 19 deletions(-) create mode 100644 pkg/blockcontroller/tsunamicontroller.go diff --git a/emain/emain-util.ts b/emain/emain-util.ts index a24a4e3c62..ce805c2725 100644 --- a/emain/emain-util.ts +++ b/emain/emain-util.ts @@ -5,6 +5,11 @@ import * as electron from "electron"; import { getWebServerEndpoint } from "../frontend/util/endpoints"; export const WaveAppPathVarName = "WAVETERM_APP_PATH"; +export const WaveAppElectronExecPath = "WAVETERM_ELECTRONEXECPATH"; + +export function getElectronExecPath(): string { + return process.execPath; +} // not necessarily exact, but we use this to help get us unstuck in certain cases let lastCtrlShiftSate: boolean = false; @@ -57,8 +62,13 @@ export function handleCtrlShiftState(sender: Electron.WebContents, waveEvent: Wa export function shNavHandler(event: Electron.Event, url: string) { const isDev = !electron.app.isPackaged; - if (isDev && (url.startsWith("http://127.0.0.1:5173/index.html") || url.startsWith("http://localhost:5173/index.html") || - url.startsWith("http://127.0.0.1:5174/index.html") || url.startsWith("http://localhost:5174/index.html"))) { + if ( + isDev && + (url.startsWith("http://127.0.0.1:5173/index.html") || + url.startsWith("http://localhost:5173/index.html") || + url.startsWith("http://127.0.0.1:5174/index.html") || + url.startsWith("http://localhost:5174/index.html")) + ) { // this is a dev-mode hot-reload, ignore it console.log("allowing hot-reload of index.html"); return; diff --git a/emain/emain-wavesrv.ts b/emain/emain-wavesrv.ts index beca7d684a..686ee87f76 100644 --- a/emain/emain-wavesrv.ts +++ b/emain/emain-wavesrv.ts @@ -7,7 +7,7 @@ import * as readline from "readline"; import { WebServerEndpointVarName, WSServerEndpointVarName } from "../frontend/util/endpoints"; import { AuthKey, WaveAuthKeyEnv } from "./authkey"; import { setForceQuit } from "./emain-activity"; -import { WaveAppPathVarName } from "./emain-util"; +import { WaveAppPathVarName, WaveAppElectronExecPath, getElectronExecPath } from "./emain-util"; import { getElectronAppUnpackedBasePath, getWaveConfigDir, @@ -59,6 +59,7 @@ export function runWaveSrv(handleWSEvent: (evtMsg: WSEventType) => void): Promis envCopy["XDG_CURRENT_DESKTOP"] = xdgCurrentDesktop; } envCopy[WaveAppPathVarName] = getElectronAppUnpackedBasePath(); + envCopy[WaveAppElectronExecPath] = getElectronExecPath(); envCopy[WaveAuthKeyEnv] = AuthKey; envCopy[WaveDataHomeVarName] = getWaveDataDir(); envCopy[WaveConfigHomeVarName] = getWaveConfigDir(); diff --git a/go.mod b/go.mod index bd93069362..3cf3dc45e4 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/spf13/cobra v1.10.1 github.com/ubuntu/gowsl v0.0.0-20240906163211-049fd49bd93b github.com/wavetermdev/htmltoken v0.2.0 + github.com/wavetermdev/waveterm/tsunami v0.0.0-00010101000000-000000000000 golang.org/x/crypto v0.42.0 golang.org/x/mod v0.28.0 golang.org/x/sync v0.17.0 @@ -112,3 +113,5 @@ require ( replace github.com/kevinburke/ssh_config => github.com/wavetermdev/ssh_config v0.0.0-20241219203747-6409e4292f34 replace github.com/creack/pty => github.com/photostorm/pty v1.1.19-0.20230903182454-31354506054b + +replace github.com/wavetermdev/waveterm/tsunami => ./tsunami diff --git a/pkg/blockcontroller/tsunamicontroller.go b/pkg/blockcontroller/tsunamicontroller.go new file mode 100644 index 0000000000..52158984e2 --- /dev/null +++ b/pkg/blockcontroller/tsunamicontroller.go @@ -0,0 +1,47 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockcontroller + +import ( + "fmt" + + "github.com/wavetermdev/waveterm/pkg/wavebase" + "github.com/wavetermdev/waveterm/pkg/waveobj" + "github.com/wavetermdev/waveterm/tsunami/build" +) + +func runTsunami(blockMeta waveobj.MetaMapType) error { + scaffoldPath := blockMeta.GetString(waveobj.MetaKey_TsunamiScaffoldPath, "") + if scaffoldPath == "" { + return fmt.Errorf("tsunami:scaffoldpath is required") + } + + sdkReplacePath := blockMeta.GetString(waveobj.MetaKey_TsunamiSdkReplacePath, "") + if sdkReplacePath == "" { + return fmt.Errorf("tsunami:sdkreplacepath is required") + } + + appDir := blockMeta.GetString(waveobj.MetaKey_TsunamiAppDir, "") + if appDir == "" { + return fmt.Errorf("tsunami:appdir is required") + } + + // Get Electron executable path + nodePath := wavebase.GetWaveAppElectronExecPath() + if nodePath == "" { + return fmt.Errorf("electron executable path not set") + } + + opts := build.BuildOpts{ + Dir: appDir, + Verbose: true, + Open: false, + KeepTemp: false, + ScaffoldPath: scaffoldPath, + SdkReplacePath: sdkReplacePath, + NodePath: nodePath, + } + + return build.TsunamiRun(opts) +} diff --git a/pkg/wavebase/wavebase.go b/pkg/wavebase/wavebase.go index 351426060c..1757429956 100644 --- a/pkg/wavebase/wavebase.go +++ b/pkg/wavebase/wavebase.go @@ -26,12 +26,13 @@ var WaveVersion = "0.0.0" var BuildTime = "0" const ( - WaveConfigHomeEnvVar = "WAVETERM_CONFIG_HOME" - WaveDataHomeEnvVar = "WAVETERM_DATA_HOME" - WaveAppPathVarName = "WAVETERM_APP_PATH" - WaveDevVarName = "WAVETERM_DEV" - WaveDevViteVarName = "WAVETERM_DEV_VITE" - WaveWshForceUpdateVarName = "WAVETERM_WSHFORCEUPDATE" + WaveConfigHomeEnvVar = "WAVETERM_CONFIG_HOME" + WaveDataHomeEnvVar = "WAVETERM_DATA_HOME" + WaveAppPathVarName = "WAVETERM_APP_PATH" + WaveAppElectronExecPathVarName = "WAVETERM_ELECTRONEXECPATH" + WaveDevVarName = "WAVETERM_DEV" + WaveDevViteVarName = "WAVETERM_DEV_VITE" + WaveWshForceUpdateVarName = "WAVETERM_WSHFORCEUPDATE" WaveJwtTokenVarName = "WAVETERM_JWT" WaveSwapTokenVarName = "WAVETERM_SWAPTOKEN" @@ -46,10 +47,11 @@ const ( const NeedJwtConst = "NEED-JWT" -var ConfigHome_VarCache string // caches WAVETERM_CONFIG_HOME -var DataHome_VarCache string // caches WAVETERM_DATA_HOME -var AppPath_VarCache string // caches WAVETERM_APP_PATH -var Dev_VarCache string // caches WAVETERM_DEV +var ConfigHome_VarCache string // caches WAVETERM_CONFIG_HOME +var DataHome_VarCache string // caches WAVETERM_DATA_HOME +var AppPath_VarCache string // caches WAVETERM_APP_PATH +var AppElectronExecPath_VarCache string // caches WAVETERM_ELECTRONEXECPATH +var Dev_VarCache string // caches WAVETERM_DEV const WaveLockFile = "wave.lock" const DomainSocketBaseName = "wave.sock" @@ -93,6 +95,8 @@ func CacheAndRemoveEnvVars() error { os.Unsetenv(WaveDataHomeEnvVar) AppPath_VarCache = os.Getenv(WaveAppPathVarName) os.Unsetenv(WaveAppPathVarName) + AppElectronExecPath_VarCache = os.Getenv(WaveAppElectronExecPathVarName) + os.Unsetenv(WaveAppElectronExecPathVarName) Dev_VarCache = os.Getenv(WaveDevVarName) os.Unsetenv(WaveDevVarName) os.Unsetenv(WaveDevViteVarName) @@ -119,6 +123,10 @@ func GetWaveAppBinPath() string { return filepath.Join(GetWaveAppPath(), AppPathBinDir) } +func GetWaveAppElectronExecPath() string { + return AppElectronExecPath_VarCache +} + func GetHomeDir() string { homeVar, err := os.UserHomeDir() if err != nil { diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 84ceef0568..e9680e0dcc 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -134,21 +134,21 @@ func verifyEnvironment(verbose bool, opts BuildOpts) (*BuildEnv, error) { if !filepath.IsAbs(opts.NodePath) { return nil, fmt.Errorf("NodePath must be an absolute path, got: %s", opts.NodePath) } - + info, err := os.Stat(opts.NodePath) if err != nil { return nil, fmt.Errorf("NodePath does not exist: %s: %w", opts.NodePath, err) } - + if info.IsDir() { return nil, fmt.Errorf("NodePath is a directory, not an executable: %s", opts.NodePath) } - + // Check if file is executable (Unix-like systems) if runtime.GOOS != "windows" && info.Mode()&0111 == 0 { return nil, fmt.Errorf("NodePath is not executable: %s", opts.NodePath) } - + if verbose { log.Printf("Using custom node path: %s", opts.NodePath) } @@ -158,13 +158,12 @@ func verifyEnvironment(verbose bool, opts BuildOpts) (*BuildEnv, error) { if err != nil { return nil, fmt.Errorf("node command not found in PATH: %w", err) } - + if verbose { log.Printf("Found node in PATH") } } - return &BuildEnv{ GoVersion: goVersion, cleanupOnce: &sync.Once{}, From 3d96c1c1a8b8a1497bbce6f11bfd25362e8b216b Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 17:55:58 -0700 Subject: [PATCH 06/21] switch to apppath (from appdir) --- frontend/types/gotypes.d.ts | 2 +- pkg/blockcontroller/tsunamicontroller.go | 8 ++++---- pkg/waveobj/metaconsts.go | 2 +- pkg/waveobj/wtypemeta.go | 2 +- tsunami/build/build.go | 20 ++++++++++---------- tsunami/cmd/main-tsunami.go | 16 ++++++++-------- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 7e46356f00..9130afa27c 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -592,7 +592,7 @@ declare global { "markdown:fixedfontsize"?: number; "tsunami:*"?: boolean; "tsunami:sdkreplacepath"?: string; - "tsunami:appdir"?: string; + "tsunami:apppath"?: string; "tsunami:scaffoldpath"?: string; "vdom:*"?: boolean; "vdom:initialized"?: boolean; diff --git a/pkg/blockcontroller/tsunamicontroller.go b/pkg/blockcontroller/tsunamicontroller.go index 52158984e2..523fe518f4 100644 --- a/pkg/blockcontroller/tsunamicontroller.go +++ b/pkg/blockcontroller/tsunamicontroller.go @@ -22,9 +22,9 @@ func runTsunami(blockMeta waveobj.MetaMapType) error { return fmt.Errorf("tsunami:sdkreplacepath is required") } - appDir := blockMeta.GetString(waveobj.MetaKey_TsunamiAppDir, "") - if appDir == "" { - return fmt.Errorf("tsunami:appdir is required") + appPath := blockMeta.GetString(waveobj.MetaKey_TsunamiAppPath, "") + if appPath == "" { + return fmt.Errorf("tsunami:apppath is required") } // Get Electron executable path @@ -34,7 +34,7 @@ func runTsunami(blockMeta waveobj.MetaMapType) error { } opts := build.BuildOpts{ - Dir: appDir, + Dir: appPath, Verbose: true, Open: false, KeepTemp: false, diff --git a/pkg/waveobj/metaconsts.go b/pkg/waveobj/metaconsts.go index e40558c70d..3cbbb3d44d 100644 --- a/pkg/waveobj/metaconsts.go +++ b/pkg/waveobj/metaconsts.go @@ -116,7 +116,7 @@ const ( MetaKey_TsunamiClear = "tsunami:*" MetaKey_TsunamiSdkReplacePath = "tsunami:sdkreplacepath" - MetaKey_TsunamiAppDir = "tsunami:appdir" + MetaKey_TsunamiAppPath = "tsunami:apppath" MetaKey_TsunamiScaffoldPath = "tsunami:scaffoldpath" MetaKey_VDomClear = "vdom:*" diff --git a/pkg/waveobj/wtypemeta.go b/pkg/waveobj/wtypemeta.go index bff69545f8..ef3e18b566 100644 --- a/pkg/waveobj/wtypemeta.go +++ b/pkg/waveobj/wtypemeta.go @@ -119,7 +119,7 @@ type MetaTSType struct { TsunamiClear bool `json:"tsunami:*,omitempty"` TsunamiSdkReplacePath string `json:"tsunami:sdkreplacepath,omitempty"` - TsunamiAppDir string `json:"tsunami:appdir,omitempty"` + TsunamiAppPath string `json:"tsunami:apppath,omitempty"` TsunamiScaffoldPath string `json:"tsunami:scaffoldpath,omitempty"` VDomClear bool `json:"vdom:*,omitempty"` diff --git a/tsunami/build/build.go b/tsunami/build/build.go index e9680e0dcc..857e08b9d3 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -27,7 +27,7 @@ const MinSupportedGoMinorVersion = 22 const TsunamiUIImportPath = "github.com/wavetermdev/waveterm/tsunami/ui" type BuildOpts struct { - Dir string + AppPath string Verbose bool Open bool KeepTemp bool @@ -174,7 +174,7 @@ func createGoMod(tempDir, appDirName, goVersion string, opts BuildOpts, verbose modulePath := fmt.Sprintf("tsunami/app/%s", appDirName) // Check if go.mod already exists in original directory - originalGoModPath := filepath.Join(opts.Dir, "go.mod") + originalGoModPath := filepath.Join(opts.AppPath, "go.mod") var modFile *modfile.File var err error @@ -191,7 +191,7 @@ func createGoMod(tempDir, appDirName, goVersion string, opts BuildOpts, verbose } // Also copy go.sum if it exists - originalGoSumPath := filepath.Join(opts.Dir, "go.sum") + originalGoSumPath := filepath.Join(opts.AppPath, "go.sum") if _, err := os.Stat(originalGoSumPath); err == nil { tempGoSumPath := filepath.Join(tempDir, "go.sum") if err := copyFile(originalGoSumPath, tempGoSumPath); err != nil { @@ -453,7 +453,7 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { return nil, err } - if err := verifyTsunamiDir(opts.Dir); err != nil { + if err := verifyTsunamiDir(opts.AppPath); err != nil { return nil, err } @@ -469,20 +469,20 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { buildEnv.TempDir = tempDir - log.Printf("Building tsunami app from %s\n", opts.Dir) + log.Printf("Building tsunami app from %s\n", opts.AppPath) if opts.Verbose || opts.KeepTemp { log.Printf("Temp dir: %s\n", tempDir) } // Copy all *.go files from the root directory - goCount, err := copyGoFiles(opts.Dir, tempDir) + goCount, err := copyGoFiles(opts.AppPath, tempDir) if err != nil { return buildEnv, fmt.Errorf("failed to copy go files: %w", err) } // Copy static directory - staticSrcDir := filepath.Join(opts.Dir, "static") + staticSrcDir := filepath.Join(opts.AppPath, "static") staticDestDir := filepath.Join(tempDir, "static") staticCount, err := copyDirRecursive(staticSrcDir, staticDestDir, true) if err != nil { @@ -507,7 +507,7 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { } // Create go.mod file - appDirName := filepath.Base(opts.Dir) + appDirName := filepath.Base(opts.AppPath) if err := createGoMod(tempDir, appDirName, buildEnv.GoVersion, opts, opts.Verbose); err != nil { return buildEnv, fmt.Errorf("failed to create go.mod: %w", err) } @@ -543,7 +543,7 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { } // Move generated files back to original directory - if err := moveFilesBack(tempDir, opts.Dir, opts.Verbose); err != nil { + if err := moveFilesBack(tempDir, opts.AppPath, opts.Verbose); err != nil { return buildEnv, fmt.Errorf("failed to move files back: %w", err) } @@ -711,7 +711,7 @@ func TsunamiRun(opts BuildOpts) error { runCmd := exec.Command(appPath) runCmd.Dir = buildEnv.TempDir - log.Printf("Running tsunami app from %s", opts.Dir) + log.Printf("Running tsunami app from %s", opts.AppPath) runCmd.Stdin = os.Stdin diff --git a/tsunami/cmd/main-tsunami.go b/tsunami/cmd/main-tsunami.go index 04343884cd..2ff2f2ac21 100644 --- a/tsunami/cmd/main-tsunami.go +++ b/tsunami/cmd/main-tsunami.go @@ -47,19 +47,19 @@ func validateEnvironmentVars(opts *build.BuildOpts) error { opts.ScaffoldPath = scaffoldPath opts.SdkReplacePath = sdkReplacePath - + // NodePath is optional if nodePath := os.Getenv(EnvTsunamiNodePath); nodePath != "" { opts.NodePath = nodePath } - + return nil } var buildCmd = &cobra.Command{ - Use: "build [directory]", + Use: "build [apppath]", Short: "Build a Tsunami application", - Long: `Build a Tsunami application from the specified directory.`, + Long: `Build a Tsunami application.`, Args: cobra.ExactArgs(1), SilenceUsage: true, Run: func(cmd *cobra.Command, args []string) { @@ -67,7 +67,7 @@ var buildCmd = &cobra.Command{ keepTemp, _ := cmd.Flags().GetBool("keeptemp") output, _ := cmd.Flags().GetString("output") opts := build.BuildOpts{ - Dir: args[0], + AppPath: args[0], Verbose: verbose, KeepTemp: keepTemp, OutputFile: output, @@ -84,9 +84,9 @@ var buildCmd = &cobra.Command{ } var runCmd = &cobra.Command{ - Use: "run [directory]", + Use: "run [apppath]", Short: "Build and run a Tsunami application", - Long: `Build and run a Tsunami application from the specified directory.`, + Long: `Build and run a Tsunami application.`, Args: cobra.ExactArgs(1), SilenceUsage: true, Run: func(cmd *cobra.Command, args []string) { @@ -94,7 +94,7 @@ var runCmd = &cobra.Command{ open, _ := cmd.Flags().GetBool("open") keepTemp, _ := cmd.Flags().GetBool("keeptemp") opts := build.BuildOpts{ - Dir: args[0], + AppPath: args[0], Verbose: verbose, Open: open, KeepTemp: keepTemp, From 3b309ffb2f6acd06b05d1beb09c5976b9d4e6699 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 12 Sep 2025 22:26:29 -0700 Subject: [PATCH 07/21] big refactor of ugly blockcontroller code. MUCH cleaner surface to implement the tsunami controller into... --- pkg/blockcontroller/blockcontroller.go | 1161 ++++------------------ pkg/blockcontroller/shellcontroller.go | 906 +++++++++++++++++ pkg/blockcontroller/tsunamicontroller.go | 31 +- pkg/service/blockservice/blockservice.go | 6 +- pkg/wshrpc/wshserver/wshserver.go | 12 +- 5 files changed, 1135 insertions(+), 981 deletions(-) create mode 100644 pkg/blockcontroller/shellcontroller.go diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index 23a3ffdbb6..19b4113138 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -4,50 +4,30 @@ package blockcontroller import ( - "bytes" "context" "encoding/base64" "fmt" - "io" "io/fs" "log" - "os" "strings" "sync" - "sync/atomic" "time" - "github.com/google/uuid" "github.com/wavetermdev/waveterm/pkg/blocklogger" "github.com/wavetermdev/waveterm/pkg/filestore" - "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote" "github.com/wavetermdev/waveterm/pkg/remote/conncontroller" - "github.com/wavetermdev/waveterm/pkg/shellexec" - "github.com/wavetermdev/waveterm/pkg/util/envutil" - "github.com/wavetermdev/waveterm/pkg/util/fileutil" - "github.com/wavetermdev/waveterm/pkg/util/shellutil" - "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/waveobj" - "github.com/wavetermdev/waveterm/pkg/wconfig" "github.com/wavetermdev/waveterm/pkg/wps" - "github.com/wavetermdev/waveterm/pkg/wshrpc" - "github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient" - "github.com/wavetermdev/waveterm/pkg/wshutil" "github.com/wavetermdev/waveterm/pkg/wslconn" "github.com/wavetermdev/waveterm/pkg/wstore" ) const ( - BlockController_Shell = "shell" - BlockController_Cmd = "cmd" -) - -const ( - ConnType_Local = "local" - ConnType_Wsl = "wsl" - ConnType_Ssh = "ssh" + BlockController_Shell = "shell" + BlockController_Cmd = "cmd" + BlockController_Tsunami = "tsunami" ) const ( @@ -63,9 +43,7 @@ const ( ) const DefaultTimeout = 2 * time.Second - -var globalLock = &sync.Mutex{} -var blockControllerMap = make(map[string]*BlockController) +const DefaultGracefulKillWait = 400 * time.Millisecond type BlockInputUnion struct { InputData []byte `json:"inputdata,omitempty"` @@ -73,21 +51,6 @@ type BlockInputUnion struct { TermSize *waveobj.TermSize `json:"termsize,omitempty"` } -type BlockController struct { - Lock *sync.Mutex - ControllerType string - TabId string - BlockId string - BlockDef *waveobj.BlockDef - CreatedHtmlFile bool - ShellProc *shellexec.ShellProc - ShellInputCh chan *BlockInputUnion - ShellProcStatus string - ShellProcExitCode int - RunLock *atomic.Bool - StatusVersion int -} - type BlockControllerRuntimeStatus struct { BlockId string `json:"blockid"` Version int `json:"version"` @@ -96,717 +59,210 @@ type BlockControllerRuntimeStatus struct { ShellProcExitCode int `json:"shellprocexitcode"` } -func (bc *BlockController) WithLock(f func()) { - bc.Lock.Lock() - defer bc.Lock.Unlock() - f() +// Controller interface that all block controllers must implement +type Controller interface { + Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error + Stop(graceful bool, newStatus string) error + GetRuntimeStatus() *BlockControllerRuntimeStatus + SendInput(input *BlockInputUnion) error } -func (bc *BlockController) GetRuntimeStatus() *BlockControllerRuntimeStatus { - var rtn BlockControllerRuntimeStatus - bc.WithLock(func() { - bc.StatusVersion++ - rtn.Version = bc.StatusVersion - rtn.BlockId = bc.BlockId - rtn.ShellProcStatus = bc.ShellProcStatus - if bc.ShellProc != nil { - rtn.ShellProcConnName = bc.ShellProc.ConnName - } - rtn.ShellProcExitCode = bc.ShellProcExitCode - }) - return &rtn -} - -func (bc *BlockController) getShellProc() *shellexec.ShellProc { - bc.Lock.Lock() - defer bc.Lock.Unlock() - return bc.ShellProc -} +// Registry for all controllers +var ( + controllerRegistry = make(map[string]Controller) + registryLock sync.RWMutex +) -type RunShellOpts struct { - TermSize waveobj.TermSize `json:"termsize,omitempty"` +// Registry operations +func getController(blockId string) Controller { + registryLock.RLock() + defer registryLock.RUnlock() + return controllerRegistry[blockId] } -func (bc *BlockController) UpdateControllerAndSendUpdate(updateFn func() bool) { - var sendUpdate bool - bc.WithLock(func() { - sendUpdate = updateFn() - }) - if sendUpdate { - rtStatus := bc.GetRuntimeStatus() - log.Printf("sending blockcontroller update %#v\n", rtStatus) - wps.Broker.Publish(wps.WaveEvent{ - Event: wps.Event_ControllerStatus, - Scopes: []string{ - waveobj.MakeORef(waveobj.OType_Tab, bc.TabId).String(), - waveobj.MakeORef(waveobj.OType_Block, bc.BlockId).String(), - }, - Data: rtStatus, - }) +func registerController(blockId string, controller Controller) { + registryLock.Lock() + defer registryLock.Unlock() + if existing, exists := controllerRegistry[blockId]; exists { + // Stop existing controller before replacing + existing.Stop(false, Status_Done) } + controllerRegistry[blockId] = controller } -func HandleTruncateBlockFile(blockId string) error { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - err := filestore.WFS.WriteFile(ctx, blockId, wavebase.BlockFile_Term, nil) - if err == fs.ErrNotExist { - return nil - } - if err != nil { - return fmt.Errorf("error truncating blockfile: %w", err) - } - err = filestore.WFS.DeleteFile(ctx, blockId, wavebase.BlockFile_Cache) - if err == fs.ErrNotExist { - err = nil - } - if err != nil { - log.Printf("error deleting cache file (continuing): %v\n", err) - } - wps.Broker.Publish(wps.WaveEvent{ - Event: wps.Event_BlockFile, - Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, blockId).String()}, - Data: &wps.WSFileEventData{ - ZoneId: blockId, - FileName: wavebase.BlockFile_Term, - FileOp: wps.FileOp_Truncate, - }, - }) - return nil - +func deleteController(blockId string) { + registryLock.Lock() + defer registryLock.Unlock() + delete(controllerRegistry, blockId) } -func HandleAppendBlockFile(blockId string, blockFile string, data []byte) error { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - err := filestore.WFS.AppendData(ctx, blockId, blockFile, data) - if err != nil { - return fmt.Errorf("error appending to blockfile: %w", err) +func getAllControllers() map[string]Controller { + registryLock.RLock() + defer registryLock.RUnlock() + // Return a copy to avoid lock issues + result := make(map[string]Controller) + for k, v := range controllerRegistry { + result[k] = v } - wps.Broker.Publish(wps.WaveEvent{ - Event: wps.Event_BlockFile, - Scopes: []string{ - waveobj.MakeORef(waveobj.OType_Block, blockId).String(), - }, - Data: &wps.WSFileEventData{ - ZoneId: blockId, - FileName: blockFile, - FileOp: wps.FileOp_Append, - Data64: base64.StdEncoding.EncodeToString(data), - }, - }) - return nil + return result } -func (bc *BlockController) resetTerminalState(logCtx context.Context) { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - wfile, statErr := filestore.WFS.Stat(ctx, bc.BlockId, wavebase.BlockFile_Term) - if statErr == fs.ErrNotExist || wfile.Size == 0 { - return - } - blocklogger.Debugf(logCtx, "[conndebug] resetTerminalState: resetting terminal state\n") - // controller type = "shell" - var buf bytes.Buffer - // buf.WriteString("\x1b[?1049l") // disable alternative buffer - buf.WriteString("\x1b[0m") // reset attributes - buf.WriteString("\x1b[?25h") // show cursor - buf.WriteString("\x1b[?1000l") // disable mouse tracking - buf.WriteString("\r\n\r\n") - err := HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, buf.Bytes()) - if err != nil { - log.Printf("error appending to blockfile (terminal reset): %v\n", err) - } -} +// Public API Functions -func getCustomInitScriptKeyCascade(shellType string) []string { - if shellType == "bash" { - return []string{waveobj.MetaKey_CmdInitScriptBash, waveobj.MetaKey_CmdInitScriptSh, waveobj.MetaKey_CmdInitScript} - } - if shellType == "zsh" { - return []string{waveobj.MetaKey_CmdInitScriptZsh, waveobj.MetaKey_CmdInitScriptSh, waveobj.MetaKey_CmdInitScript} - } - if shellType == "pwsh" { - return []string{waveobj.MetaKey_CmdInitScriptPwsh, waveobj.MetaKey_CmdInitScript} - } - if shellType == "fish" { - return []string{waveobj.MetaKey_CmdInitScriptFish, waveobj.MetaKey_CmdInitScript} +func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts, force bool) error { + if tabId == "" || blockId == "" { + return fmt.Errorf("invalid tabId or blockId passed to ResyncController") } - return []string{waveobj.MetaKey_CmdInitScript} -} -func getCustomInitScript(logCtx context.Context, meta waveobj.MetaMapType, connName string, shellType string) string { - initScriptVal, metaKeyName := getCustomInitScriptValue(meta, connName, shellType) - if initScriptVal == "" { - return "" - } - if !fileutil.IsInitScriptPath(initScriptVal) { - blocklogger.Infof(logCtx, "[conndebug] inline initScript (size=%d) found in meta key: %s\n", len(initScriptVal), metaKeyName) - return initScriptVal - } - blocklogger.Infof(logCtx, "[conndebug] initScript detected as a file %q from meta key: %s\n", initScriptVal, metaKeyName) - initScriptVal, err := wavebase.ExpandHomeDir(initScriptVal) - if err != nil { - blocklogger.Infof(logCtx, "[conndebug] cannot expand home dir in Wave initscript file: %v\n", err) - return fmt.Sprintf("echo \"cannot expand home dir in Wave initscript file, from key %s\";\n", metaKeyName) - } - fileData, err := os.ReadFile(initScriptVal) + blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) if err != nil { - blocklogger.Infof(logCtx, "[conndebug] cannot open Wave initscript file: %v\n", err) - return fmt.Sprintf("echo \"cannot open Wave initscript file, from key %s\";\n", metaKeyName) - } - if len(fileData) > MaxInitScriptSize { - blocklogger.Infof(logCtx, "[conndebug] initscript file too large, size=%d, max=%d\n", len(fileData), MaxInitScriptSize) - return fmt.Sprintf("echo \"initscript file too large, from key %s\";\n", metaKeyName) - } - if utilfn.HasBinaryData(fileData) { - blocklogger.Infof(logCtx, "[conndebug] initscript file contains binary data\n") - return fmt.Sprintf("echo \"initscript file contains binary data, from key %s\";\n", metaKeyName) + return fmt.Errorf("error getting block: %w", err) } - blocklogger.Infof(logCtx, "[conndebug] initscript file read successfully, size=%d\n", len(fileData)) - return string(fileData) -} -// returns (value, metakey) -func getCustomInitScriptValue(meta waveobj.MetaMapType, connName string, shellType string) (string, string) { - keys := getCustomInitScriptKeyCascade(shellType) - connMeta := meta.GetConnectionOverride(connName) - if connMeta != nil { - for _, key := range keys { - if connMeta.HasKey(key) { - return connMeta.GetString(key, ""), "blockmeta/[" + connName + "]/" + key - } - } - } - for _, key := range keys { - if meta.HasKey(key) { - return meta.GetString(key, ""), "blockmeta/" + key - } - } - fullConfig := wconfig.GetWatcher().GetFullConfig() - connKeywords := fullConfig.Connections[connName] - connKeywordsMap := make(map[string]any) - err := utilfn.ReUnmarshal(&connKeywordsMap, connKeywords) - if err != nil { - log.Printf("error re-unmarshalling connKeywords: %v\n", err) - return "", "" - } - ckMeta := waveobj.MetaMapType(connKeywordsMap) - for _, key := range keys { - if ckMeta.HasKey(key) { - return ckMeta.GetString(key, ""), "connections.json/" + connName + "/" + key - } - } - return "", "" -} + controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "") -func resolveEnvMap(blockId string, blockMeta waveobj.MetaMapType, connName string) (map[string]string, error) { - rtn := make(map[string]string) - config := wconfig.GetWatcher().GetFullConfig() - connKeywords := config.Connections[connName] - ckEnv := connKeywords.CmdEnv - for k, v := range ckEnv { - rtn[k] = v - } - ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) - defer cancelFn() - _, envFileData, err := filestore.WFS.ReadFile(ctx, blockId, wavebase.BlockFile_Env) - if err == fs.ErrNotExist { - err = nil - } - if err != nil { - return nil, fmt.Errorf("error reading command env file: %w", err) - } - if len(envFileData) > 0 { - envMap := envutil.EnvToMap(string(envFileData)) - for k, v := range envMap { - rtn[k] = v - } - } - cmdEnv := blockMeta.GetStringMap(waveobj.MetaKey_CmdEnv, true) - for k, v := range cmdEnv { - if v == waveobj.MetaMap_DeleteSentinel { - delete(rtn, k) - continue - } - rtn[k] = v - } - connEnv := blockMeta.GetConnectionOverride(connName).GetStringMap(waveobj.MetaKey_CmdEnv, true) - for k, v := range connEnv { - if v == waveobj.MetaMap_DeleteSentinel { - delete(rtn, k) - continue - } - rtn[k] = v - } - return rtn, nil -} + // Get existing controller + existing := getController(blockId) -// for "cmd" type blocks -func createCmdStrAndOpts(blockId string, blockMeta waveobj.MetaMapType, connName string) (string, *shellexec.CommandOptsType, error) { - var cmdStr string - var cmdOpts shellexec.CommandOptsType - cmdStr = blockMeta.GetString(waveobj.MetaKey_Cmd, "") - if cmdStr == "" { - return "", nil, fmt.Errorf("missing cmd in block meta") - } - cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "") - if cmdOpts.Cwd != "" { - cwdPath, err := wavebase.ExpandHomeDir(cmdOpts.Cwd) - if err != nil { - return "", nil, err - } - cmdOpts.Cwd = cwdPath - } - useShell := blockMeta.GetBool(waveobj.MetaKey_CmdShell, true) - if !useShell { - if strings.Contains(cmdStr, " ") { - return "", nil, fmt.Errorf("cmd should not have spaces if cmd:shell is false (use cmd:args)") - } - cmdArgs := blockMeta.GetStringList(waveobj.MetaKey_CmdArgs) - // shell escape the args - for _, arg := range cmdArgs { - cmdStr = cmdStr + " " + utilfn.ShellQuote(arg, false, -1) + // If no controller needed, stop existing if present + if controllerName == "" { + if existing != nil { + StopBlockController(blockId) + deleteController(blockId) } + return nil } - cmdOpts.ForceJwt = blockMeta.GetBool(waveobj.MetaKey_CmdJwt, false) - return cmdStr, &cmdOpts, nil -} -func (bc *BlockController) DoRunShellCommand(logCtx context.Context, rc *RunShellOpts, blockMeta waveobj.MetaMapType) error { - blocklogger.Debugf(logCtx, "[conndebug] DoRunShellCommand\n") - shellProc, err := bc.setupAndStartShellProcess(logCtx, rc, blockMeta) - if err != nil { - return err - } - return bc.manageRunningShellProcess(shellProc, rc, blockMeta) -} + // Check if we need to morph controller type + if existing != nil { + existingStatus := existing.GetRuntimeStatus() + needsReplace := false -func (bc *BlockController) makeSwapToken(ctx context.Context, logCtx context.Context, blockMeta waveobj.MetaMapType, remoteName string, shellType string) *shellutil.TokenSwapEntry { - token := &shellutil.TokenSwapEntry{ - Token: uuid.New().String(), - Env: make(map[string]string), - Exp: time.Now().Add(5 * time.Minute), - } - token.Env["TERM_PROGRAM"] = "waveterm" - token.Env["WAVETERM_BLOCKID"] = bc.BlockId - token.Env["WAVETERM_VERSION"] = wavebase.WaveVersion - token.Env["WAVETERM"] = "1" - tabId, err := wstore.DBFindTabForBlockId(ctx, bc.BlockId) - if err != nil { - log.Printf("error finding tab for block: %v\n", err) - } else { - token.Env["WAVETERM_TABID"] = tabId - } - if tabId != "" { - wsId, err := wstore.DBFindWorkspaceForTabId(ctx, tabId) - if err != nil { - log.Printf("error finding workspace for tab: %v\n", err) - } else { - token.Env["WAVETERM_WORKSPACEID"] = wsId + // Determine if existing controller type matches what we need + switch existing.(type) { + case *ShellController: + if controllerName != BlockController_Shell && controllerName != BlockController_Cmd { + needsReplace = true + } + case *TsunamiController: + if controllerName != BlockController_Tsunami { + needsReplace = true + } } - } - clientData, err := wstore.DBGetSingleton[*waveobj.Client](ctx) - if err != nil { - log.Printf("error getting client data: %v\n", err) - } else { - token.Env["WAVETERM_CLIENTID"] = clientData.OID - } - token.Env["WAVETERM_CONN"] = remoteName - envMap, err := resolveEnvMap(bc.BlockId, blockMeta, remoteName) - if err != nil { - log.Printf("error resolving env map: %v\n", err) - } - for k, v := range envMap { - token.Env[k] = v - } - token.ScriptText = getCustomInitScript(logCtx, blockMeta, remoteName, shellType) - return token -} - -type ConnUnion struct { - ConnName string - ConnType string - SshConn *conncontroller.SSHConn - WslConn *wslconn.WslConn - WshEnabled bool - ShellPath string - ShellOpts []string - ShellType string -} -func getLocalShellPath(blockMeta waveobj.MetaMapType) string { - shellPath := blockMeta.GetString(waveobj.MetaKey_TermLocalShellPath, "") - if shellPath != "" { - return shellPath - } - settings := wconfig.GetWatcher().GetFullConfig().Settings - if settings.TermLocalShellPath != "" { - return settings.TermLocalShellPath + if needsReplace { + log.Printf("stopping blockcontroller %s due to controller type change\n", blockId) + StopBlockController(blockId) + time.Sleep(100 * time.Millisecond) + deleteController(blockId) + existing = nil + } + + // For shell/cmd, check if connection changed + if !needsReplace && (controllerName == BlockController_Shell || controllerName == BlockController_Cmd) { + connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") + if existingStatus.ShellProcStatus == Status_Running && existingStatus.ShellProcConnName != connName { + log.Printf("stopping blockcontroller %s due to conn change\n", blockId) + StopBlockControllerAndSetStatus(blockId, Status_Init) + time.Sleep(100 * time.Millisecond) + // Don't delete, will reuse same controller type + existing = getController(blockId) + } + } } - return shellutil.DetectLocalShellPath() -} -func getLocalShellOpts(blockMeta waveobj.MetaMapType) []string { - if blockMeta.HasKey(waveobj.MetaKey_TermLocalShellOpts) { - opts := blockMeta.GetStringList(waveobj.MetaKey_TermLocalShellOpts) - return append([]string{}, opts...) - } - settings := wconfig.GetWatcher().GetFullConfig().Settings - if len(settings.TermLocalShellOpts) > 0 { - return append([]string{}, settings.TermLocalShellOpts...) + // Force restart if requested + if force && existing != nil { + StopBlockController(blockId) + time.Sleep(100 * time.Millisecond) + existing = getController(blockId) } - return nil -} -func (union *ConnUnion) getRemoteInfoAndShellType(blockMeta waveobj.MetaMapType) error { - if !union.WshEnabled { - return nil - } - if union.ConnType == ConnType_Ssh || union.ConnType == ConnType_Wsl { - connRoute := wshutil.MakeConnectionRouteId(union.ConnName) - remoteInfo, err := wshclient.RemoteGetInfoCommand(wshclient.GetBareRpcClient(), &wshrpc.RpcOpts{Route: connRoute, Timeout: 2000}) - if err != nil { - // weird error, could flip the wshEnabled flag and allow it to go forward, but the connection should have already been vetted - return fmt.Errorf("unable to obtain remote info from connserver: %w", err) - } - // TODO allow overriding remote shell path - union.ShellPath = remoteInfo.Shell + // Create or restart controller + var controller Controller + if existing != nil { + controller = existing } else { - union.ShellPath = getLocalShellPath(blockMeta) - } - union.ShellType = shellutil.GetShellTypeFromShellPath(union.ShellPath) - return nil -} + // Create new controller based on type + switch controllerName { + case BlockController_Shell, BlockController_Cmd: + controller = MakeShellController(tabId, blockId, controllerName) + registerController(blockId, controller) -func (bc *BlockController) getConnUnion(logCtx context.Context, remoteName string, blockMeta waveobj.MetaMapType) (ConnUnion, error) { - rtn := ConnUnion{ConnName: remoteName} - wshEnabled := !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false) - if strings.HasPrefix(remoteName, "wsl://") { - wslName := strings.TrimPrefix(remoteName, "wsl://") - wslConn := wslconn.GetWslConn(wslName) - if wslConn == nil { - return ConnUnion{}, fmt.Errorf("wsl connection not found: %s", remoteName) - } - connStatus := wslConn.DeriveConnStatus() - if connStatus.Status != conncontroller.Status_Connected { - return ConnUnion{}, fmt.Errorf("wsl connection %s not connected, cannot start shellproc", remoteName) - } - rtn.ConnType = ConnType_Wsl - rtn.WslConn = wslConn - rtn.WshEnabled = wshEnabled && wslConn.WshEnabled.Load() - } else if remoteName != "" { - opts, err := remote.ParseOpts(remoteName) - if err != nil { - return ConnUnion{}, fmt.Errorf("invalid ssh remote name (%s): %w", remoteName, err) - } - conn := conncontroller.GetConn(opts) - if conn == nil { - return ConnUnion{}, fmt.Errorf("ssh connection not found: %s", remoteName) - } - connStatus := conn.DeriveConnStatus() - if connStatus.Status != conncontroller.Status_Connected { - return ConnUnion{}, fmt.Errorf("ssh connection %s not connected, cannot start shellproc", remoteName) - } - rtn.ConnType = ConnType_Ssh - rtn.SshConn = conn - rtn.WshEnabled = wshEnabled && conn.WshEnabled.Load() - } else { - rtn.ConnType = ConnType_Local - rtn.WshEnabled = wshEnabled - } - err := rtn.getRemoteInfoAndShellType(blockMeta) - if err != nil { - return ConnUnion{}, err - } - return rtn, nil -} + case BlockController_Tsunami: + controller = MakeTsunamiController(tabId, blockId) + registerController(blockId, controller) -func (bc *BlockController) setupAndStartShellProcess(logCtx context.Context, rc *RunShellOpts, blockMeta waveobj.MetaMapType) (*shellexec.ShellProc, error) { - // create a circular blockfile for the output - ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) - defer cancelFn() - fsErr := filestore.WFS.MakeFile(ctx, bc.BlockId, wavebase.BlockFile_Term, nil, wshrpc.FileOpts{MaxSize: DefaultTermMaxFileSize, Circular: true}) - if fsErr != nil && fsErr != fs.ErrExist { - return nil, fmt.Errorf("error creating blockfile: %w", fsErr) - } - if fsErr == fs.ErrExist { - // reset the terminal state - bc.resetTerminalState(logCtx) - } - bcInitStatus := bc.GetRuntimeStatus() - if bcInitStatus.ShellProcStatus == Status_Running { - return nil, nil - } - // TODO better sync here (don't let two starts happen at the same times) - remoteName := blockMeta.GetString(waveobj.MetaKey_Connection, "") - connUnion, err := bc.getConnUnion(logCtx, remoteName, blockMeta) - if err != nil { - return nil, err - } - blocklogger.Infof(logCtx, "[conndebug] remoteName: %q, connType: %s, wshEnabled: %v, shell: %q, shellType: %s\n", remoteName, connUnion.ConnType, connUnion.WshEnabled, connUnion.ShellPath, connUnion.ShellType) - var cmdStr string - var cmdOpts shellexec.CommandOptsType - if bc.ControllerType == BlockController_Shell { - cmdOpts.Interactive = true - cmdOpts.Login = true - cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "") - if cmdOpts.Cwd != "" { - cwdPath, err := wavebase.ExpandHomeDir(cmdOpts.Cwd) - if err != nil { - return nil, err - } - cmdOpts.Cwd = cwdPath + default: + return fmt.Errorf("unknown controller type %q", controllerName) } - } else if bc.ControllerType == BlockController_Cmd { - var cmdOptsPtr *shellexec.CommandOptsType - cmdStr, cmdOptsPtr, err = createCmdStrAndOpts(bc.BlockId, blockMeta, remoteName) - if err != nil { - return nil, err - } - cmdOpts = *cmdOptsPtr - } else { - return nil, fmt.Errorf("unknown controller type %q", bc.ControllerType) } - var shellProc *shellexec.ShellProc - swapToken := bc.makeSwapToken(ctx, logCtx, blockMeta, remoteName, connUnion.ShellType) - cmdOpts.SwapToken = swapToken - blocklogger.Debugf(logCtx, "[conndebug] created swaptoken: %s\n", swapToken.Token) - if connUnion.ConnType == ConnType_Wsl { - wslConn := connUnion.WslConn - if !connUnion.WshEnabled { - shellProc, err = shellexec.StartWslShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn) - if err != nil { - return nil, err - } - } else { - sockName := wslConn.GetDomainSocketName() - rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: wslConn.GetName()} - jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName) - if err != nil { - return nil, fmt.Errorf("error making jwt token: %w", err) - } - swapToken.SockName = sockName - swapToken.RpcContext = &rpcContext - swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr - shellProc, err = shellexec.StartWslShellProc(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn) - if err != nil { - wslConn.SetWshError(err) - wslConn.WshEnabled.Store(false) - blocklogger.Infof(logCtx, "[conndebug] error starting wsl shell proc with wsh: %v\n", err) - blocklogger.Infof(logCtx, "[conndebug] attempting install without wsh\n") - shellProc, err = shellexec.StartWslShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn) - if err != nil { - return nil, err - } - } - } - } else if connUnion.ConnType == ConnType_Ssh { - conn := connUnion.SshConn - if !connUnion.WshEnabled { - shellProc, err = shellexec.StartRemoteShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, conn) - if err != nil { - return nil, err - } - } else { - sockName := conn.GetDomainSocketName() - rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: conn.Opts.String()} - jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName) - if err != nil { - return nil, fmt.Errorf("error making jwt token: %w", err) - } - swapToken.SockName = sockName - swapToken.RpcContext = &rpcContext - swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr - shellProc, err = shellexec.StartRemoteShellProc(ctx, logCtx, rc.TermSize, cmdStr, cmdOpts, conn) - if err != nil { - conn.SetWshError(err) - conn.WshEnabled.Store(false) - blocklogger.Infof(logCtx, "[conndebug] error starting remote shell proc with wsh: %v\n", err) - blocklogger.Infof(logCtx, "[conndebug] attempting install without wsh\n") - shellProc, err = shellexec.StartRemoteShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, conn) + + // Check if we need to start/restart + status := controller.GetRuntimeStatus() + if status.ShellProcStatus == Status_Init || status.ShellProcStatus == Status_Done { + // For shell/cmd, check connection status first + if controllerName == BlockController_Shell || controllerName == BlockController_Cmd { + connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") + if connName != "" { + err = CheckConnStatus(blockId) if err != nil { - return nil, err + return fmt.Errorf("cannot start shellproc: %w", err) } } } - } else if connUnion.ConnType == ConnType_Local { - if connUnion.WshEnabled { - sockName := wavebase.GetDomainSocketName() - rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId} - jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName) - if err != nil { - return nil, fmt.Errorf("error making jwt token: %w", err) - } - swapToken.SockName = sockName - swapToken.RpcContext = &rpcContext - swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr - } - cmdOpts.ShellPath = connUnion.ShellPath - cmdOpts.ShellOpts = getLocalShellOpts(blockMeta) - shellProc, err = shellexec.StartLocalShellProc(logCtx, rc.TermSize, cmdStr, cmdOpts) + + // Start controller + err = controller.Start(ctx, blockData.Meta, rtOpts, force) if err != nil { - return nil, err + return fmt.Errorf("error starting controller: %w", err) } - } else { - return nil, fmt.Errorf("unknown connection type for conn %q: %s", remoteName, connUnion.ConnType) - } - bc.UpdateControllerAndSendUpdate(func() bool { - bc.ShellProc = shellProc - bc.ShellProcStatus = Status_Running - return true - }) - return shellProc, nil -} - -func (bc *BlockController) getBlockData_noErr() *waveobj.Block { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - blockData, err := wstore.DBGet[*waveobj.Block](ctx, bc.BlockId) - if err != nil { - log.Printf("error getting block data (getBlockData_noErr): %v\n", err) - return nil } - return blockData -} - -func (bc *BlockController) manageRunningShellProcess(shellProc *shellexec.ShellProc, rc *RunShellOpts, blockMeta waveobj.MetaMapType) error { - shellInputCh := make(chan *BlockInputUnion, 32) - bc.ShellInputCh = shellInputCh - // make esc sequence wshclient wshProxy - // we don't need to authenticate this wshProxy since it is coming direct - wshProxy := wshutil.MakeRpcProxy() - wshProxy.SetRpcContext(&wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId}) - wshutil.DefaultRouter.RegisterRoute(wshutil.MakeControllerRouteId(bc.BlockId), wshProxy, true) - ptyBuffer := wshutil.MakePtyBuffer(wshutil.WaveOSCPrefix, shellProc.Cmd, wshProxy.FromRemoteCh) - go func() { - // handles regular output from the pty (goes to the blockfile and xterm) - defer func() { - panichandler.PanicHandler("blockcontroller:shellproc-pty-read-loop", recover()) - }() - defer func() { - log.Printf("[shellproc] pty-read loop done\n") - shellProc.Close() - bc.WithLock(func() { - // so no other events are sent - bc.ShellInputCh = nil - }) - shellProc.Cmd.Wait() - exitCode := shellProc.Cmd.ExitCode() - blockData := bc.getBlockData_noErr() - if blockData != nil && blockData.Meta.GetString(waveobj.MetaKey_Controller, "") == BlockController_Cmd { - termMsg := fmt.Sprintf("\r\nprocess finished with exit code = %d\r\n\r\n", exitCode) - HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, []byte(termMsg)) - } - // to stop the inputCh loop - time.Sleep(100 * time.Millisecond) - close(shellInputCh) // don't use bc.ShellInputCh (it's nil) - }() - buf := make([]byte, 4096) - for { - nr, err := ptyBuffer.Read(buf) - if nr > 0 { - err := HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, buf[:nr]) - if err != nil { - log.Printf("error appending to blockfile: %v\n", err) - } - } - if err == io.EOF { - break - } - if err != nil { - log.Printf("error reading from shell: %v\n", err) - break - } - } - }() - go func() { - // handles input from the shellInputCh, sent to pty - // use shellInputCh instead of bc.ShellInputCh (because we want to be attached to *this* ch. bc.ShellInputCh can be updated) - defer func() { - panichandler.PanicHandler("blockcontroller:shellproc-input-loop", recover()) - }() - for ic := range shellInputCh { - if len(ic.InputData) > 0 { - shellProc.Cmd.Write(ic.InputData) - } - if ic.TermSize != nil { - updateTermSize(shellProc, bc.BlockId, *ic.TermSize) - } - } - }() - go func() { - defer func() { - panichandler.PanicHandler("blockcontroller:shellproc-output-loop", recover()) - }() - // handles outputCh -> shellInputCh - for msg := range wshProxy.ToRemoteCh { - encodedMsg, err := wshutil.EncodeWaveOSCBytes(wshutil.WaveServerOSC, msg) - if err != nil { - log.Printf("error encoding OSC message: %v\n", err) - } - shellInputCh <- &BlockInputUnion{InputData: encodedMsg} - } - }() - go func() { - defer func() { - panichandler.PanicHandler("blockcontroller:shellproc-wait-loop", recover()) - }() - // wait for the shell to finish - var exitCode int - defer func() { - wshutil.DefaultRouter.UnregisterRoute(wshutil.MakeControllerRouteId(bc.BlockId)) - bc.UpdateControllerAndSendUpdate(func() bool { - if bc.ShellProcStatus == Status_Running { - bc.ShellProcStatus = Status_Done - } - bc.ShellProcExitCode = exitCode - return true - }) - log.Printf("[shellproc] shell process wait loop done\n") - }() - waitErr := shellProc.Cmd.Wait() - exitCode = shellProc.Cmd.ExitCode() - shellProc.SetWaitErrorAndSignalDone(waitErr) - go checkCloseOnExit(bc.BlockId, exitCode) - }() return nil } -func updateTermSize(shellProc *shellexec.ShellProc, blockId string, termSize waveobj.TermSize) { - err := setTermSizeInDB(blockId, termSize) - if err != nil { - log.Printf("error setting pty size: %v\n", err) - } - err = shellProc.Cmd.SetSize(termSize.Rows, termSize.Cols) - if err != nil { - log.Printf("error setting pty size: %v\n", err) +func GetBlockControllerRuntimeStatus(blockId string) *BlockControllerRuntimeStatus { + controller := getController(blockId) + if controller == nil { + return nil } + return controller.GetRuntimeStatus() } -func checkCloseOnExit(blockId string, exitCode int) { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) - if err != nil { - log.Printf("error getting block data: %v\n", err) +func StopBlockController(blockId string) { + controller := getController(blockId) + if controller == nil { return } - closeOnExit := blockData.Meta.GetBool(waveobj.MetaKey_CmdCloseOnExit, false) - closeOnExitForce := blockData.Meta.GetBool(waveobj.MetaKey_CmdCloseOnExitForce, false) - if !closeOnExitForce && !(closeOnExit && exitCode == 0) { + controller.Stop(true, Status_Done) +} + +func StopBlockControllerAndSetStatus(blockId string, newStatus string) { + controller := getController(blockId) + if controller == nil { return } - delayMs := blockData.Meta.GetFloat(waveobj.MetaKey_CmdCloseOnExitDelay, 2000) - if delayMs < 0 { - delayMs = 0 + controller.Stop(true, newStatus) +} + +func SendInput(blockId string, inputUnion *BlockInputUnion) error { + controller := getController(blockId) + if controller == nil { + return fmt.Errorf("no controller found for block %s", blockId) } - time.Sleep(time.Duration(delayMs) * time.Millisecond) - rpcClient := wshclient.GetBareRpcClient() - err = wshclient.DeleteBlockCommand(rpcClient, wshrpc.CommandDeleteBlockData{BlockId: blockId}, nil) - if err != nil { - log.Printf("error deleting block data (close on exit): %v\n", err) + return controller.SendInput(inputUnion) +} + +func StopAllBlockControllers() { + controllers := getAllControllers() + for blockId, controller := range controllers { + status := controller.GetRuntimeStatus() + if status != nil && status.ShellProcStatus == Status_Running { + go func(id string, c Controller) { + c.Stop(true, Status_Done) + }(blockId, controller) + } } } @@ -832,111 +288,61 @@ func getTermSize(bdata *waveobj.Block) waveobj.TermSize { } } -func setTermSizeInDB(blockId string, termSize waveobj.TermSize) error { - ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) +func HandleAppendBlockFile(blockId string, blockFile string, data []byte) error { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) defer cancelFn() - ctx = waveobj.ContextWithUpdates(ctx) - bdata, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) - if err != nil { - return fmt.Errorf("error getting block data: %v", err) - } - if bdata.RuntimeOpts == nil { - bdata.RuntimeOpts = &waveobj.RuntimeOpts{} - } - bdata.RuntimeOpts.TermSize = termSize - err = wstore.DBUpdate(ctx, bdata) + err := filestore.WFS.AppendData(ctx, blockId, blockFile, data) if err != nil { - return fmt.Errorf("error updating block data: %v", err) + return fmt.Errorf("error appending to blockfile: %w", err) } - updates := waveobj.ContextGetUpdatesRtn(ctx) - wps.Broker.SendUpdateEvents(updates) + wps.Broker.Publish(wps.WaveEvent{ + Event: wps.Event_BlockFile, + Scopes: []string{ + waveobj.MakeORef(waveobj.OType_Block, blockId).String(), + }, + Data: &wps.WSFileEventData{ + ZoneId: blockId, + FileName: blockFile, + FileOp: wps.FileOp_Append, + Data64: base64.StdEncoding.EncodeToString(data), + }, + }) return nil } -func (bc *BlockController) LockRunLock() bool { - rtn := bc.RunLock.CompareAndSwap(false, true) - if rtn { - log.Printf("block %q run() lock\n", bc.BlockId) +func HandleTruncateBlockFile(blockId string) error { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + err := filestore.WFS.WriteFile(ctx, blockId, wavebase.BlockFile_Term, nil) + if err == fs.ErrNotExist { + return nil } - return rtn -} - -func (bc *BlockController) UnlockRunLock() { - bc.RunLock.Store(false) - log.Printf("block %q run() unlock\n", bc.BlockId) -} - -func (bc *BlockController) run(logCtx context.Context, bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts, force bool) { - blocklogger.Debugf(logCtx, "[conndebug] BlockController.run() %q\n", bc.BlockId) - runningShellCommand := false - ok := bc.LockRunLock() - if !ok { - log.Printf("block %q is already executing run()\n", bc.BlockId) - return + if err != nil { + return fmt.Errorf("error truncating blockfile: %w", err) } - defer func() { - if !runningShellCommand { - bc.UnlockRunLock() - } - }() - curStatus := bc.GetRuntimeStatus() - controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "") - if controllerName != BlockController_Shell && controllerName != BlockController_Cmd { - log.Printf("unknown controller %q\n", controllerName) - return + err = filestore.WFS.DeleteFile(ctx, blockId, wavebase.BlockFile_Cache) + if err == fs.ErrNotExist { + err = nil } - runOnce := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnce, false) - runOnStart := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnStart, true) - if ((runOnStart || runOnce) && curStatus.ShellProcStatus == Status_Init) || force { - if getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdClearOnStart, false) { - err := HandleTruncateBlockFile(bc.BlockId) - if err != nil { - log.Printf("error truncating term blockfile: %v\n", err) - } - } - if runOnce { - ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) - defer cancelFn() - metaUpdate := map[string]any{ - waveobj.MetaKey_CmdRunOnce: false, - waveobj.MetaKey_CmdRunOnStart: false, - } - err := wstore.UpdateObjectMeta(ctx, waveobj.MakeORef(waveobj.OType_Block, bc.BlockId), metaUpdate, false) - if err != nil { - log.Printf("error updating block meta (in blockcontroller.run): %v\n", err) - return - } - } - runningShellCommand = true - go func() { - defer func() { - panichandler.PanicHandler("blockcontroller:run-shell-command", recover()) - }() - defer bc.UnlockRunLock() - var termSize waveobj.TermSize - if rtOpts != nil { - termSize = rtOpts.TermSize - } else { - termSize = getTermSize(bdata) - } - err := bc.DoRunShellCommand(logCtx, &RunShellOpts{TermSize: termSize}, bdata.Meta) - if err != nil { - debugLog(logCtx, "error running shell: %v\n", err) - } - }() + if err != nil { + log.Printf("error deleting cache file (continuing): %v\n", err) } -} - -func (bc *BlockController) SendInput(inputUnion *BlockInputUnion) error { - var shellInputCh chan *BlockInputUnion - bc.WithLock(func() { - shellInputCh = bc.ShellInputCh + wps.Broker.Publish(wps.WaveEvent{ + Event: wps.Event_BlockFile, + Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, blockId).String()}, + Data: &wps.WSFileEventData{ + ZoneId: blockId, + FileName: wavebase.BlockFile_Term, + FileOp: wps.FileOp_Truncate, + }, }) - if shellInputCh == nil { - return fmt.Errorf("no shell input chan") - } - shellInputCh <- inputUnion return nil + +} + +func debugLog(ctx context.Context, fmtStr string, args ...interface{}) { + blocklogger.Infof(ctx, "[conndebug] "+fmtStr, args...) + log.Printf(fmtStr, args...) } func CheckConnStatus(blockId string) error { @@ -968,178 +374,3 @@ func CheckConnStatus(blockId string) error { } return nil } - -func (bc *BlockController) StopShellProc(shouldWait bool) { - bc.Lock.Lock() - defer bc.Lock.Unlock() - if bc.ShellProc == nil || bc.ShellProcStatus == Status_Done || bc.ShellProcStatus == Status_Init { - return - } - bc.ShellProc.Close() - if shouldWait { - doneCh := bc.ShellProc.DoneCh - <-doneCh - } -} - -func getOrCreateBlockController(tabId string, blockId string, controllerName string) *BlockController { - var createdController bool - var bc *BlockController - defer func() { - if !createdController || bc == nil { - return - } - bc.UpdateControllerAndSendUpdate(func() bool { - return true - }) - }() - globalLock.Lock() - defer globalLock.Unlock() - bc = blockControllerMap[blockId] - if bc == nil { - bc = &BlockController{ - Lock: &sync.Mutex{}, - ControllerType: controllerName, - TabId: tabId, - BlockId: blockId, - ShellProcStatus: Status_Init, - RunLock: &atomic.Bool{}, - } - blockControllerMap[blockId] = bc - createdController = true - } - return bc -} - -func formatConnNameForLog(connName string) string { - if connName == "" { - return "local" - } - return connName -} - -func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts, force bool) error { - if tabId == "" || blockId == "" { - return fmt.Errorf("invalid tabId or blockId passed to ResyncController") - } - blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) - if err != nil { - return fmt.Errorf("error getting block: %w", err) - } - if force { - StopBlockController(blockId) - time.Sleep(100 * time.Millisecond) // TODO see if we can remove this (the "process finished with exit code" message comes out after we start reconnecting otherwise) - } - connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") - controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "") - curBc := GetBlockController(blockId) - if controllerName == "" { - if curBc != nil { - StopBlockController(blockId) - } - return nil - } - log.Printf("resync controller %s %q (%q) (force %v)\n", blockId, controllerName, connName, force) - // check if conn is different, if so, stop the current controller, and set status back to init - if curBc != nil { - bcStatus := curBc.GetRuntimeStatus() - if bcStatus.ShellProcStatus == Status_Running && bcStatus.ShellProcConnName != connName { - blocklogger.Infof(ctx, "\n[conndebug] stopping blockcontroller due to conn change %q => %q\n", formatConnNameForLog(bcStatus.ShellProcConnName), formatConnNameForLog(connName)) - log.Printf("stopping blockcontroller %s due to conn change\n", blockId) - StopBlockControllerAndSetStatus(blockId, Status_Init) - time.Sleep(100 * time.Millisecond) // TODO see if we can remove this (the "process finished with exit code" message comes out after we start reconnecting otherwise) - } - } - // now if there is a conn, ensure it is connected - if connName != "" { - err = CheckConnStatus(blockId) - if err != nil { - return fmt.Errorf("cannot start shellproc: %w", err) - } - } - if curBc == nil { - return startBlockController(ctx, tabId, blockId, rtOpts, force) - } - bcStatus := curBc.GetRuntimeStatus() - if bcStatus.ShellProcStatus == Status_Init || bcStatus.ShellProcStatus == Status_Done { - return startBlockController(ctx, tabId, blockId, rtOpts, force) - } - return nil -} - -func debugLog(ctx context.Context, fmtStr string, args ...interface{}) { - blocklogger.Infof(ctx, "[conndebug] "+fmtStr, args...) - log.Printf(fmtStr, args...) -} - -func startBlockController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts, force bool) error { - blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) - if err != nil { - return fmt.Errorf("error getting block: %w", err) - } - controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "") - if controllerName == "" { - // nothing to start - return nil - } - if controllerName != BlockController_Shell && controllerName != BlockController_Cmd { - return fmt.Errorf("unknown controller %q", controllerName) - } - connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") - err = CheckConnStatus(blockId) - if err != nil { - return fmt.Errorf("cannot start shellproc: %w", err) - } - bc := getOrCreateBlockController(tabId, blockId, controllerName) - bcStatus := bc.GetRuntimeStatus() - debugLog(ctx, "start blockcontroller %s %q (%q) (curstatus %s) (force %v)\n", blockId, controllerName, connName, bcStatus.ShellProcStatus, force) - if bcStatus.ShellProcStatus == Status_Init || bcStatus.ShellProcStatus == Status_Done { - go bc.run(ctx, blockData, blockData.Meta, rtOpts, force) - } - return nil -} - -func StopBlockControllerAndSetStatus(blockId string, newStatus string) { - bc := GetBlockController(blockId) - if bc == nil { - return - } - if bc.getShellProc() != nil { - bc.ShellProc.Close() - <-bc.ShellProc.DoneCh - bc.UpdateControllerAndSendUpdate(func() bool { - bc.ShellProcStatus = newStatus - return true - }) - } - -} - -func StopBlockController(blockId string) { - StopBlockControllerAndSetStatus(blockId, Status_Done) -} - -func getControllerList() []*BlockController { - globalLock.Lock() - defer globalLock.Unlock() - var rtn []*BlockController - for _, bc := range blockControllerMap { - rtn = append(rtn, bc) - } - return rtn -} - -func StopAllBlockControllers() { - clist := getControllerList() - for _, bc := range clist { - if bc.ShellProcStatus == Status_Running { - go StopBlockController(bc.BlockId) - } - } -} - -func GetBlockController(blockId string) *BlockController { - globalLock.Lock() - defer globalLock.Unlock() - return blockControllerMap[blockId] -} diff --git a/pkg/blockcontroller/shellcontroller.go b/pkg/blockcontroller/shellcontroller.go new file mode 100644 index 0000000000..d67dae8b26 --- /dev/null +++ b/pkg/blockcontroller/shellcontroller.go @@ -0,0 +1,906 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockcontroller + +import ( + "bytes" + "context" + "fmt" + "io" + "io/fs" + "log" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/wavetermdev/waveterm/pkg/blocklogger" + "github.com/wavetermdev/waveterm/pkg/filestore" + "github.com/wavetermdev/waveterm/pkg/panichandler" + "github.com/wavetermdev/waveterm/pkg/remote" + "github.com/wavetermdev/waveterm/pkg/remote/conncontroller" + "github.com/wavetermdev/waveterm/pkg/shellexec" + "github.com/wavetermdev/waveterm/pkg/util/envutil" + "github.com/wavetermdev/waveterm/pkg/util/fileutil" + "github.com/wavetermdev/waveterm/pkg/util/shellutil" + "github.com/wavetermdev/waveterm/pkg/util/utilfn" + "github.com/wavetermdev/waveterm/pkg/wavebase" + "github.com/wavetermdev/waveterm/pkg/waveobj" + "github.com/wavetermdev/waveterm/pkg/wconfig" + "github.com/wavetermdev/waveterm/pkg/wps" + "github.com/wavetermdev/waveterm/pkg/wshrpc" + "github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient" + "github.com/wavetermdev/waveterm/pkg/wshutil" + "github.com/wavetermdev/waveterm/pkg/wslconn" + "github.com/wavetermdev/waveterm/pkg/wstore" +) + +const ( + ConnType_Local = "local" + ConnType_Wsl = "wsl" + ConnType_Ssh = "ssh" +) + +type ShellController struct { + Lock *sync.Mutex + + // shared fields + ControllerType string + TabId string + BlockId string + BlockDef *waveobj.BlockDef + RunLock *atomic.Bool + ProcStatus string + ProcExitCode int + StatusVersion int + + // for shell/cmd + ShellProc *shellexec.ShellProc + ShellInputCh chan *BlockInputUnion +} + +// Constructor that returns the Controller interface +func MakeShellController(tabId string, blockId string, controllerType string) Controller { + return &ShellController{ + Lock: &sync.Mutex{}, + ControllerType: controllerType, + TabId: tabId, + BlockId: blockId, + ProcStatus: Status_Init, + RunLock: &atomic.Bool{}, + } +} + +// Implement Controller interface methods + +func (sc *ShellController) Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error { + // Get the block data + blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, sc.BlockId) + if err != nil { + return fmt.Errorf("error getting block: %w", err) + } + + // Use the existing run method which handles all the start logic + go sc.run(ctx, blockData, blockData.Meta, rtOpts, force) + return nil +} + +func (sc *ShellController) Stop(graceful bool, newStatus string) error { + sc.Lock.Lock() + defer sc.Lock.Unlock() + + if sc.ShellProc == nil || sc.ProcStatus == Status_Done || sc.ProcStatus == Status_Init { + if newStatus != sc.ProcStatus { + sc.ProcStatus = newStatus + sc.sendUpdate_nolock() + } + return nil + } + + sc.ShellProc.Close() + if graceful { + doneCh := sc.ShellProc.DoneCh + sc.Lock.Unlock() // Unlock before waiting + <-doneCh + sc.Lock.Lock() // Re-lock after waiting + } + + // Update status + sc.ProcStatus = newStatus + sc.sendUpdate_nolock() + return nil +} + +func (sc *ShellController) getRuntimeStatus_nolock() BlockControllerRuntimeStatus { + var rtn BlockControllerRuntimeStatus + sc.StatusVersion++ + rtn.Version = sc.StatusVersion + rtn.BlockId = sc.BlockId + rtn.ShellProcStatus = sc.ProcStatus + if sc.ShellProc != nil { + rtn.ShellProcConnName = sc.ShellProc.ConnName + } + rtn.ShellProcExitCode = sc.ProcExitCode + return rtn +} + +func (sc *ShellController) GetRuntimeStatus() *BlockControllerRuntimeStatus { + var rtn BlockControllerRuntimeStatus + sc.WithLock(func() { + rtn = sc.getRuntimeStatus_nolock() + }) + return &rtn +} + +func (sc *ShellController) SendInput(inputUnion *BlockInputUnion) error { + var shellInputCh chan *BlockInputUnion + sc.WithLock(func() { + shellInputCh = sc.ShellInputCh + }) + if shellInputCh == nil { + return fmt.Errorf("no shell input chan") + } + shellInputCh <- inputUnion + return nil +} + +// All the existing private methods remain unchanged + +func (sc *ShellController) WithLock(f func()) { + sc.Lock.Lock() + defer sc.Lock.Unlock() + f() +} + +type RunShellOpts struct { + TermSize waveobj.TermSize `json:"termsize,omitempty"` +} + +// only call when holding the lock +func (sc *ShellController) sendUpdate_nolock() { + rtStatus := sc.getRuntimeStatus_nolock() + log.Printf("sending blockcontroller update %#v\n", rtStatus) + wps.Broker.Publish(wps.WaveEvent{ + Event: wps.Event_ControllerStatus, + Scopes: []string{ + waveobj.MakeORef(waveobj.OType_Tab, sc.TabId).String(), + waveobj.MakeORef(waveobj.OType_Block, sc.BlockId).String(), + }, + Data: rtStatus, + }) +} + +func (sc *ShellController) UpdateControllerAndSendUpdate(updateFn func() bool) { + var sendUpdate bool + sc.WithLock(func() { + sendUpdate = updateFn() + }) + if sendUpdate { + rtStatus := sc.GetRuntimeStatus() + log.Printf("sending blockcontroller update %#v\n", rtStatus) + wps.Broker.Publish(wps.WaveEvent{ + Event: wps.Event_ControllerStatus, + Scopes: []string{ + waveobj.MakeORef(waveobj.OType_Tab, sc.TabId).String(), + waveobj.MakeORef(waveobj.OType_Block, sc.BlockId).String(), + }, + Data: rtStatus, + }) + } +} + +func (sc *ShellController) resetTerminalState(logCtx context.Context) { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + wfile, statErr := filestore.WFS.Stat(ctx, sc.BlockId, wavebase.BlockFile_Term) + if statErr == fs.ErrNotExist || wfile.Size == 0 { + return + } + blocklogger.Debugf(logCtx, "[conndebug] resetTerminalState: resetting terminal state\n") + // controller type = "shell" + var buf bytes.Buffer + // buf.WriteString("\x1b[?1049l") // disable alternative buffer + buf.WriteString("\x1b[0m") // reset attributes + buf.WriteString("\x1b[?25h") // show cursor + buf.WriteString("\x1b[?1000l") // disable mouse tracking + buf.WriteString("\r\n\r\n") + err := HandleAppendBlockFile(sc.BlockId, wavebase.BlockFile_Term, buf.Bytes()) + if err != nil { + log.Printf("error appending to blockfile (terminal reset): %v\n", err) + } +} + +// [All the other existing private methods remain exactly the same - I'm not including them all here for brevity, but they would all be copied over with sc. replacing bc. throughout] + +func (sc *ShellController) DoRunShellCommand(logCtx context.Context, rc *RunShellOpts, blockMeta waveobj.MetaMapType) error { + blocklogger.Debugf(logCtx, "[conndebug] DoRunShellCommand\n") + shellProc, err := sc.setupAndStartShellProcess(logCtx, rc, blockMeta) + if err != nil { + return err + } + return sc.manageRunningShellProcess(shellProc, rc, blockMeta) +} + +// [Continue with all other methods, replacing bc with sc throughout...] + +func (sc *ShellController) LockRunLock() bool { + rtn := sc.RunLock.CompareAndSwap(false, true) + if rtn { + log.Printf("block %q run() lock\n", sc.BlockId) + } + return rtn +} + +func (sc *ShellController) UnlockRunLock() { + sc.RunLock.Store(false) + log.Printf("block %q run() unlock\n", sc.BlockId) +} + +func (sc *ShellController) run(logCtx context.Context, bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts, force bool) { + blocklogger.Debugf(logCtx, "[conndebug] ShellController.run() %q\n", sc.BlockId) + runningShellCommand := false + ok := sc.LockRunLock() + if !ok { + log.Printf("block %q is already executing run()\n", sc.BlockId) + return + } + defer func() { + if !runningShellCommand { + sc.UnlockRunLock() + } + }() + curStatus := sc.GetRuntimeStatus() + controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "") + if controllerName != BlockController_Shell && controllerName != BlockController_Cmd { + log.Printf("unknown controller %q\n", controllerName) + return + } + runOnce := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnce, false) + runOnStart := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnStart, true) + if ((runOnStart || runOnce) && curStatus.ShellProcStatus == Status_Init) || force { + if getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdClearOnStart, false) { + err := HandleTruncateBlockFile(sc.BlockId) + if err != nil { + log.Printf("error truncating term blockfile: %v\n", err) + } + } + if runOnce { + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + metaUpdate := map[string]any{ + waveobj.MetaKey_CmdRunOnce: false, + waveobj.MetaKey_CmdRunOnStart: false, + } + err := wstore.UpdateObjectMeta(ctx, waveobj.MakeORef(waveobj.OType_Block, sc.BlockId), metaUpdate, false) + if err != nil { + log.Printf("error updating block meta (in blockcontroller.run): %v\n", err) + return + } + } + runningShellCommand = true + go func() { + defer func() { + panichandler.PanicHandler("blockcontroller:run-shell-command", recover()) + }() + defer sc.UnlockRunLock() + var termSize waveobj.TermSize + if rtOpts != nil { + termSize = rtOpts.TermSize + } else { + termSize = getTermSize(bdata) + } + err := sc.DoRunShellCommand(logCtx, &RunShellOpts{TermSize: termSize}, bdata.Meta) + if err != nil { + debugLog(logCtx, "error running shell: %v\n", err) + } + }() + } +} + +// [Include all the remaining private methods with bc replaced by sc] + +type ConnUnion struct { + ConnName string + ConnType string + SshConn *conncontroller.SSHConn + WslConn *wslconn.WslConn + WshEnabled bool + ShellPath string + ShellOpts []string + ShellType string +} + +func (bc *ShellController) getConnUnion(logCtx context.Context, remoteName string, blockMeta waveobj.MetaMapType) (ConnUnion, error) { + rtn := ConnUnion{ConnName: remoteName} + wshEnabled := !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false) + if strings.HasPrefix(remoteName, "wsl://") { + wslName := strings.TrimPrefix(remoteName, "wsl://") + wslConn := wslconn.GetWslConn(wslName) + if wslConn == nil { + return ConnUnion{}, fmt.Errorf("wsl connection not found: %s", remoteName) + } + connStatus := wslConn.DeriveConnStatus() + if connStatus.Status != conncontroller.Status_Connected { + return ConnUnion{}, fmt.Errorf("wsl connection %s not connected, cannot start shellproc", remoteName) + } + rtn.ConnType = ConnType_Wsl + rtn.WslConn = wslConn + rtn.WshEnabled = wshEnabled && wslConn.WshEnabled.Load() + } else if remoteName != "" { + opts, err := remote.ParseOpts(remoteName) + if err != nil { + return ConnUnion{}, fmt.Errorf("invalid ssh remote name (%s): %w", remoteName, err) + } + conn := conncontroller.GetConn(opts) + if conn == nil { + return ConnUnion{}, fmt.Errorf("ssh connection not found: %s", remoteName) + } + connStatus := conn.DeriveConnStatus() + if connStatus.Status != conncontroller.Status_Connected { + return ConnUnion{}, fmt.Errorf("ssh connection %s not connected, cannot start shellproc", remoteName) + } + rtn.ConnType = ConnType_Ssh + rtn.SshConn = conn + rtn.WshEnabled = wshEnabled && conn.WshEnabled.Load() + } else { + rtn.ConnType = ConnType_Local + rtn.WshEnabled = wshEnabled + } + err := rtn.getRemoteInfoAndShellType(blockMeta) + if err != nil { + return ConnUnion{}, err + } + return rtn, nil +} + +func (bc *ShellController) setupAndStartShellProcess(logCtx context.Context, rc *RunShellOpts, blockMeta waveobj.MetaMapType) (*shellexec.ShellProc, error) { + // create a circular blockfile for the output + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + fsErr := filestore.WFS.MakeFile(ctx, bc.BlockId, wavebase.BlockFile_Term, nil, wshrpc.FileOpts{MaxSize: DefaultTermMaxFileSize, Circular: true}) + if fsErr != nil && fsErr != fs.ErrExist { + return nil, fmt.Errorf("error creating blockfile: %w", fsErr) + } + if fsErr == fs.ErrExist { + // reset the terminal state + bc.resetTerminalState(logCtx) + } + bcInitStatus := bc.GetRuntimeStatus() + if bcInitStatus.ShellProcStatus == Status_Running { + return nil, nil + } + // TODO better sync here (don't let two starts happen at the same times) + remoteName := blockMeta.GetString(waveobj.MetaKey_Connection, "") + connUnion, err := bc.getConnUnion(logCtx, remoteName, blockMeta) + if err != nil { + return nil, err + } + blocklogger.Infof(logCtx, "[conndebug] remoteName: %q, connType: %s, wshEnabled: %v, shell: %q, shellType: %s\n", remoteName, connUnion.ConnType, connUnion.WshEnabled, connUnion.ShellPath, connUnion.ShellType) + var cmdStr string + var cmdOpts shellexec.CommandOptsType + if bc.ControllerType == BlockController_Shell { + cmdOpts.Interactive = true + cmdOpts.Login = true + cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "") + if cmdOpts.Cwd != "" { + cwdPath, err := wavebase.ExpandHomeDir(cmdOpts.Cwd) + if err != nil { + return nil, err + } + cmdOpts.Cwd = cwdPath + } + } else if bc.ControllerType == BlockController_Cmd { + var cmdOptsPtr *shellexec.CommandOptsType + cmdStr, cmdOptsPtr, err = createCmdStrAndOpts(bc.BlockId, blockMeta, remoteName) + if err != nil { + return nil, err + } + cmdOpts = *cmdOptsPtr + } else { + return nil, fmt.Errorf("unknown controller type %q", bc.ControllerType) + } + var shellProc *shellexec.ShellProc + swapToken := bc.makeSwapToken(ctx, logCtx, blockMeta, remoteName, connUnion.ShellType) + cmdOpts.SwapToken = swapToken + blocklogger.Debugf(logCtx, "[conndebug] created swaptoken: %s\n", swapToken.Token) + if connUnion.ConnType == ConnType_Wsl { + wslConn := connUnion.WslConn + if !connUnion.WshEnabled { + shellProc, err = shellexec.StartWslShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn) + if err != nil { + return nil, err + } + } else { + sockName := wslConn.GetDomainSocketName() + rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: wslConn.GetName()} + jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName) + if err != nil { + return nil, fmt.Errorf("error making jwt token: %w", err) + } + swapToken.SockName = sockName + swapToken.RpcContext = &rpcContext + swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr + shellProc, err = shellexec.StartWslShellProc(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn) + if err != nil { + wslConn.SetWshError(err) + wslConn.WshEnabled.Store(false) + blocklogger.Infof(logCtx, "[conndebug] error starting wsl shell proc with wsh: %v\n", err) + blocklogger.Infof(logCtx, "[conndebug] attempting install without wsh\n") + shellProc, err = shellexec.StartWslShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn) + if err != nil { + return nil, err + } + } + } + } else if connUnion.ConnType == ConnType_Ssh { + conn := connUnion.SshConn + if !connUnion.WshEnabled { + shellProc, err = shellexec.StartRemoteShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, conn) + if err != nil { + return nil, err + } + } else { + sockName := conn.GetDomainSocketName() + rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: conn.Opts.String()} + jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName) + if err != nil { + return nil, fmt.Errorf("error making jwt token: %w", err) + } + swapToken.SockName = sockName + swapToken.RpcContext = &rpcContext + swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr + shellProc, err = shellexec.StartRemoteShellProc(ctx, logCtx, rc.TermSize, cmdStr, cmdOpts, conn) + if err != nil { + conn.SetWshError(err) + conn.WshEnabled.Store(false) + blocklogger.Infof(logCtx, "[conndebug] error starting remote shell proc with wsh: %v\n", err) + blocklogger.Infof(logCtx, "[conndebug] attempting install without wsh\n") + shellProc, err = shellexec.StartRemoteShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, conn) + if err != nil { + return nil, err + } + } + } + } else if connUnion.ConnType == ConnType_Local { + if connUnion.WshEnabled { + sockName := wavebase.GetDomainSocketName() + rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId} + jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName) + if err != nil { + return nil, fmt.Errorf("error making jwt token: %w", err) + } + swapToken.SockName = sockName + swapToken.RpcContext = &rpcContext + swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr + } + cmdOpts.ShellPath = connUnion.ShellPath + cmdOpts.ShellOpts = getLocalShellOpts(blockMeta) + shellProc, err = shellexec.StartLocalShellProc(logCtx, rc.TermSize, cmdStr, cmdOpts) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("unknown connection type for conn %q: %s", remoteName, connUnion.ConnType) + } + bc.UpdateControllerAndSendUpdate(func() bool { + bc.ShellProc = shellProc + bc.ProcStatus = Status_Running + return true + }) + return shellProc, nil +} + +func (bc *ShellController) manageRunningShellProcess(shellProc *shellexec.ShellProc, rc *RunShellOpts, blockMeta waveobj.MetaMapType) error { + shellInputCh := make(chan *BlockInputUnion, 32) + bc.ShellInputCh = shellInputCh + + // make esc sequence wshclient wshProxy + // we don't need to authenticate this wshProxy since it is coming direct + wshProxy := wshutil.MakeRpcProxy() + wshProxy.SetRpcContext(&wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId}) + wshutil.DefaultRouter.RegisterRoute(wshutil.MakeControllerRouteId(bc.BlockId), wshProxy, true) + ptyBuffer := wshutil.MakePtyBuffer(wshutil.WaveOSCPrefix, shellProc.Cmd, wshProxy.FromRemoteCh) + go func() { + // handles regular output from the pty (goes to the blockfile and xterm) + defer func() { + panichandler.PanicHandler("blockcontroller:shellproc-pty-read-loop", recover()) + }() + defer func() { + log.Printf("[shellproc] pty-read loop done\n") + shellProc.Close() + bc.WithLock(func() { + // so no other events are sent + bc.ShellInputCh = nil + }) + shellProc.Cmd.Wait() + exitCode := shellProc.Cmd.ExitCode() + blockData := bc.getBlockData_noErr() + if blockData != nil && blockData.Meta.GetString(waveobj.MetaKey_Controller, "") == BlockController_Cmd { + termMsg := fmt.Sprintf("\r\nprocess finished with exit code = %d\r\n\r\n", exitCode) + HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, []byte(termMsg)) + } + // to stop the inputCh loop + time.Sleep(100 * time.Millisecond) + close(shellInputCh) // don't use bc.ShellInputCh (it's nil) + }() + buf := make([]byte, 4096) + for { + nr, err := ptyBuffer.Read(buf) + if nr > 0 { + err := HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, buf[:nr]) + if err != nil { + log.Printf("error appending to blockfile: %v\n", err) + } + } + if err == io.EOF { + break + } + if err != nil { + log.Printf("error reading from shell: %v\n", err) + break + } + } + }() + go func() { + // handles input from the shellInputCh, sent to pty + // use shellInputCh instead of bc.ShellInputCh (because we want to be attached to *this* ch. bc.ShellInputCh can be updated) + defer func() { + panichandler.PanicHandler("blockcontroller:shellproc-input-loop", recover()) + }() + for ic := range shellInputCh { + if len(ic.InputData) > 0 { + shellProc.Cmd.Write(ic.InputData) + } + if ic.TermSize != nil { + updateTermSize(shellProc, bc.BlockId, *ic.TermSize) + } + } + }() + go func() { + defer func() { + panichandler.PanicHandler("blockcontroller:shellproc-output-loop", recover()) + }() + // handles outputCh -> shellInputCh + for msg := range wshProxy.ToRemoteCh { + encodedMsg, err := wshutil.EncodeWaveOSCBytes(wshutil.WaveServerOSC, msg) + if err != nil { + log.Printf("error encoding OSC message: %v\n", err) + } + shellInputCh <- &BlockInputUnion{InputData: encodedMsg} + } + }() + go func() { + defer func() { + panichandler.PanicHandler("blockcontroller:shellproc-wait-loop", recover()) + }() + // wait for the shell to finish + var exitCode int + defer func() { + wshutil.DefaultRouter.UnregisterRoute(wshutil.MakeControllerRouteId(bc.BlockId)) + bc.UpdateControllerAndSendUpdate(func() bool { + if bc.ProcStatus == Status_Running { + bc.ProcStatus = Status_Done + } + bc.ProcExitCode = exitCode + return true + }) + log.Printf("[shellproc] shell process wait loop done\n") + }() + waitErr := shellProc.Cmd.Wait() + exitCode = shellProc.Cmd.ExitCode() + shellProc.SetWaitErrorAndSignalDone(waitErr) + go checkCloseOnExit(bc.BlockId, exitCode) + }() + return nil +} + +func (union *ConnUnion) getRemoteInfoAndShellType(blockMeta waveobj.MetaMapType) error { + if !union.WshEnabled { + return nil + } + if union.ConnType == ConnType_Ssh || union.ConnType == ConnType_Wsl { + connRoute := wshutil.MakeConnectionRouteId(union.ConnName) + remoteInfo, err := wshclient.RemoteGetInfoCommand(wshclient.GetBareRpcClient(), &wshrpc.RpcOpts{Route: connRoute, Timeout: 2000}) + if err != nil { + // weird error, could flip the wshEnabled flag and allow it to go forward, but the connection should have already been vetted + return fmt.Errorf("unable to obtain remote info from connserver: %w", err) + } + // TODO allow overriding remote shell path + union.ShellPath = remoteInfo.Shell + } else { + union.ShellPath = getLocalShellPath(blockMeta) + } + union.ShellType = shellutil.GetShellTypeFromShellPath(union.ShellPath) + return nil +} + +func checkCloseOnExit(blockId string, exitCode int) { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) + if err != nil { + log.Printf("error getting block data: %v\n", err) + return + } + closeOnExit := blockData.Meta.GetBool(waveobj.MetaKey_CmdCloseOnExit, false) + closeOnExitForce := blockData.Meta.GetBool(waveobj.MetaKey_CmdCloseOnExitForce, false) + if !closeOnExitForce && !(closeOnExit && exitCode == 0) { + return + } + delayMs := blockData.Meta.GetFloat(waveobj.MetaKey_CmdCloseOnExitDelay, 2000) + if delayMs < 0 { + delayMs = 0 + } + time.Sleep(time.Duration(delayMs) * time.Millisecond) + rpcClient := wshclient.GetBareRpcClient() + err = wshclient.DeleteBlockCommand(rpcClient, wshrpc.CommandDeleteBlockData{BlockId: blockId}, nil) + if err != nil { + log.Printf("error deleting block data (close on exit): %v\n", err) + } +} + +func getLocalShellPath(blockMeta waveobj.MetaMapType) string { + shellPath := blockMeta.GetString(waveobj.MetaKey_TermLocalShellPath, "") + if shellPath != "" { + return shellPath + } + settings := wconfig.GetWatcher().GetFullConfig().Settings + if settings.TermLocalShellPath != "" { + return settings.TermLocalShellPath + } + return shellutil.DetectLocalShellPath() +} + +func getLocalShellOpts(blockMeta waveobj.MetaMapType) []string { + if blockMeta.HasKey(waveobj.MetaKey_TermLocalShellOpts) { + opts := blockMeta.GetStringList(waveobj.MetaKey_TermLocalShellOpts) + return append([]string{}, opts...) + } + settings := wconfig.GetWatcher().GetFullConfig().Settings + if len(settings.TermLocalShellOpts) > 0 { + return append([]string{}, settings.TermLocalShellOpts...) + } + return nil +} + +// for "cmd" type blocks +func createCmdStrAndOpts(blockId string, blockMeta waveobj.MetaMapType, connName string) (string, *shellexec.CommandOptsType, error) { + var cmdStr string + var cmdOpts shellexec.CommandOptsType + cmdStr = blockMeta.GetString(waveobj.MetaKey_Cmd, "") + if cmdStr == "" { + return "", nil, fmt.Errorf("missing cmd in block meta") + } + cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "") + if cmdOpts.Cwd != "" { + cwdPath, err := wavebase.ExpandHomeDir(cmdOpts.Cwd) + if err != nil { + return "", nil, err + } + cmdOpts.Cwd = cwdPath + } + useShell := blockMeta.GetBool(waveobj.MetaKey_CmdShell, true) + if !useShell { + if strings.Contains(cmdStr, " ") { + return "", nil, fmt.Errorf("cmd should not have spaces if cmd:shell is false (use cmd:args)") + } + cmdArgs := blockMeta.GetStringList(waveobj.MetaKey_CmdArgs) + // shell escape the args + for _, arg := range cmdArgs { + cmdStr = cmdStr + " " + utilfn.ShellQuote(arg, false, -1) + } + } + cmdOpts.ForceJwt = blockMeta.GetBool(waveobj.MetaKey_CmdJwt, false) + return cmdStr, &cmdOpts, nil +} + +func (bc *ShellController) makeSwapToken(ctx context.Context, logCtx context.Context, blockMeta waveobj.MetaMapType, remoteName string, shellType string) *shellutil.TokenSwapEntry { + token := &shellutil.TokenSwapEntry{ + Token: uuid.New().String(), + Env: make(map[string]string), + Exp: time.Now().Add(5 * time.Minute), + } + token.Env["TERM_PROGRAM"] = "waveterm" + token.Env["WAVETERM_BLOCKID"] = bc.BlockId + token.Env["WAVETERM_VERSION"] = wavebase.WaveVersion + token.Env["WAVETERM"] = "1" + tabId, err := wstore.DBFindTabForBlockId(ctx, bc.BlockId) + if err != nil { + log.Printf("error finding tab for block: %v\n", err) + } else { + token.Env["WAVETERM_TABID"] = tabId + } + if tabId != "" { + wsId, err := wstore.DBFindWorkspaceForTabId(ctx, tabId) + if err != nil { + log.Printf("error finding workspace for tab: %v\n", err) + } else { + token.Env["WAVETERM_WORKSPACEID"] = wsId + } + } + clientData, err := wstore.DBGetSingleton[*waveobj.Client](ctx) + if err != nil { + log.Printf("error getting client data: %v\n", err) + } else { + token.Env["WAVETERM_CLIENTID"] = clientData.OID + } + token.Env["WAVETERM_CONN"] = remoteName + envMap, err := resolveEnvMap(bc.BlockId, blockMeta, remoteName) + if err != nil { + log.Printf("error resolving env map: %v\n", err) + } + for k, v := range envMap { + token.Env[k] = v + } + token.ScriptText = getCustomInitScript(logCtx, blockMeta, remoteName, shellType) + return token +} + +func (bc *ShellController) getBlockData_noErr() *waveobj.Block { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + blockData, err := wstore.DBGet[*waveobj.Block](ctx, bc.BlockId) + if err != nil { + log.Printf("error getting block data (getBlockData_noErr): %v\n", err) + return nil + } + return blockData +} + +func resolveEnvMap(blockId string, blockMeta waveobj.MetaMapType, connName string) (map[string]string, error) { + rtn := make(map[string]string) + config := wconfig.GetWatcher().GetFullConfig() + connKeywords := config.Connections[connName] + ckEnv := connKeywords.CmdEnv + for k, v := range ckEnv { + rtn[k] = v + } + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + _, envFileData, err := filestore.WFS.ReadFile(ctx, blockId, wavebase.BlockFile_Env) + if err == fs.ErrNotExist { + err = nil + } + if err != nil { + return nil, fmt.Errorf("error reading command env file: %w", err) + } + if len(envFileData) > 0 { + envMap := envutil.EnvToMap(string(envFileData)) + for k, v := range envMap { + rtn[k] = v + } + } + cmdEnv := blockMeta.GetStringMap(waveobj.MetaKey_CmdEnv, true) + for k, v := range cmdEnv { + if v == waveobj.MetaMap_DeleteSentinel { + delete(rtn, k) + continue + } + rtn[k] = v + } + connEnv := blockMeta.GetConnectionOverride(connName).GetStringMap(waveobj.MetaKey_CmdEnv, true) + for k, v := range connEnv { + if v == waveobj.MetaMap_DeleteSentinel { + delete(rtn, k) + continue + } + rtn[k] = v + } + return rtn, nil +} + +func getCustomInitScriptKeyCascade(shellType string) []string { + if shellType == "bash" { + return []string{waveobj.MetaKey_CmdInitScriptBash, waveobj.MetaKey_CmdInitScriptSh, waveobj.MetaKey_CmdInitScript} + } + if shellType == "zsh" { + return []string{waveobj.MetaKey_CmdInitScriptZsh, waveobj.MetaKey_CmdInitScriptSh, waveobj.MetaKey_CmdInitScript} + } + if shellType == "pwsh" { + return []string{waveobj.MetaKey_CmdInitScriptPwsh, waveobj.MetaKey_CmdInitScript} + } + if shellType == "fish" { + return []string{waveobj.MetaKey_CmdInitScriptFish, waveobj.MetaKey_CmdInitScript} + } + return []string{waveobj.MetaKey_CmdInitScript} +} + +func getCustomInitScript(logCtx context.Context, meta waveobj.MetaMapType, connName string, shellType string) string { + initScriptVal, metaKeyName := getCustomInitScriptValue(meta, connName, shellType) + if initScriptVal == "" { + return "" + } + if !fileutil.IsInitScriptPath(initScriptVal) { + blocklogger.Infof(logCtx, "[conndebug] inline initScript (size=%d) found in meta key: %s\n", len(initScriptVal), metaKeyName) + return initScriptVal + } + blocklogger.Infof(logCtx, "[conndebug] initScript detected as a file %q from meta key: %s\n", initScriptVal, metaKeyName) + initScriptVal, err := wavebase.ExpandHomeDir(initScriptVal) + if err != nil { + blocklogger.Infof(logCtx, "[conndebug] cannot expand home dir in Wave initscript file: %v\n", err) + return fmt.Sprintf("echo \"cannot expand home dir in Wave initscript file, from key %s\";\n", metaKeyName) + } + fileData, err := os.ReadFile(initScriptVal) + if err != nil { + blocklogger.Infof(logCtx, "[conndebug] cannot open Wave initscript file: %v\n", err) + return fmt.Sprintf("echo \"cannot open Wave initscript file, from key %s\";\n", metaKeyName) + } + if len(fileData) > MaxInitScriptSize { + blocklogger.Infof(logCtx, "[conndebug] initscript file too large, size=%d, max=%d\n", len(fileData), MaxInitScriptSize) + return fmt.Sprintf("echo \"initscript file too large, from key %s\";\n", metaKeyName) + } + if utilfn.HasBinaryData(fileData) { + blocklogger.Infof(logCtx, "[conndebug] initscript file contains binary data\n") + return fmt.Sprintf("echo \"initscript file contains binary data, from key %s\";\n", metaKeyName) + } + blocklogger.Infof(logCtx, "[conndebug] initscript file read successfully, size=%d\n", len(fileData)) + return string(fileData) +} + +// returns (value, metakey) +func getCustomInitScriptValue(meta waveobj.MetaMapType, connName string, shellType string) (string, string) { + keys := getCustomInitScriptKeyCascade(shellType) + connMeta := meta.GetConnectionOverride(connName) + if connMeta != nil { + for _, key := range keys { + if connMeta.HasKey(key) { + return connMeta.GetString(key, ""), "blockmeta/[" + connName + "]/" + key + } + } + } + for _, key := range keys { + if meta.HasKey(key) { + return meta.GetString(key, ""), "blockmeta/" + key + } + } + fullConfig := wconfig.GetWatcher().GetFullConfig() + connKeywords := fullConfig.Connections[connName] + connKeywordsMap := make(map[string]any) + err := utilfn.ReUnmarshal(&connKeywordsMap, connKeywords) + if err != nil { + log.Printf("error re-unmarshalling connKeywords: %v\n", err) + return "", "" + } + ckMeta := waveobj.MetaMapType(connKeywordsMap) + for _, key := range keys { + if ckMeta.HasKey(key) { + return ckMeta.GetString(key, ""), "connections.json/" + connName + "/" + key + } + } + return "", "" +} + +func updateTermSize(shellProc *shellexec.ShellProc, blockId string, termSize waveobj.TermSize) { + err := setTermSizeInDB(blockId, termSize) + if err != nil { + log.Printf("error setting pty size: %v\n", err) + } + err = shellProc.Cmd.SetSize(termSize.Rows, termSize.Cols) + if err != nil { + log.Printf("error setting pty size: %v\n", err) + } +} + +func setTermSizeInDB(blockId string, termSize waveobj.TermSize) error { + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + ctx = waveobj.ContextWithUpdates(ctx) + bdata, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) + if err != nil { + return fmt.Errorf("error getting block data: %v", err) + } + if bdata.RuntimeOpts == nil { + bdata.RuntimeOpts = &waveobj.RuntimeOpts{} + } + bdata.RuntimeOpts.TermSize = termSize + err = wstore.DBUpdate(ctx, bdata) + if err != nil { + return fmt.Errorf("error updating block data: %v", err) + } + updates := waveobj.ContextGetUpdatesRtn(ctx) + wps.Broker.SendUpdateEvents(updates) + return nil +} diff --git a/pkg/blockcontroller/tsunamicontroller.go b/pkg/blockcontroller/tsunamicontroller.go index 523fe518f4..17c1ea71ad 100644 --- a/pkg/blockcontroller/tsunamicontroller.go +++ b/pkg/blockcontroller/tsunamicontroller.go @@ -4,6 +4,7 @@ package blockcontroller import ( + "context" "fmt" "github.com/wavetermdev/waveterm/pkg/wavebase" @@ -11,6 +12,34 @@ import ( "github.com/wavetermdev/waveterm/tsunami/build" ) +type TsunamiController struct { + blockId string + tabId string +} + +func (tc *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error { + return fmt.Errorf("tsunami controller start not implemented") +} + +func (tc *TsunamiController) Stop(graceful bool, newStatus string) error { + return fmt.Errorf("tsunami controller stop not implemented") +} + +func (tc *TsunamiController) GetRuntimeStatus() *BlockControllerRuntimeStatus { + return nil +} + +func (tc *TsunamiController) SendInput(input *BlockInputUnion) error { + return fmt.Errorf("tsunami controller send input not implemented") +} + +func MakeTsunamiController(tabId string, blockId string) Controller { + return &TsunamiController{ + blockId: blockId, + tabId: tabId, + } +} + func runTsunami(blockMeta waveobj.MetaMapType) error { scaffoldPath := blockMeta.GetString(waveobj.MetaKey_TsunamiScaffoldPath, "") if scaffoldPath == "" { @@ -34,7 +63,7 @@ func runTsunami(blockMeta waveobj.MetaMapType) error { } opts := build.BuildOpts{ - Dir: appPath, + AppPath: appPath, Verbose: true, Open: false, KeepTemp: false, diff --git a/pkg/service/blockservice/blockservice.go b/pkg/service/blockservice/blockservice.go index 22caff961b..68d8bc9980 100644 --- a/pkg/service/blockservice/blockservice.go +++ b/pkg/service/blockservice/blockservice.go @@ -31,11 +31,7 @@ func (bs *BlockService) SendCommand_Meta() tsgenmeta.MethodMeta { } func (bs *BlockService) GetControllerStatus(ctx context.Context, blockId string) (*blockcontroller.BlockControllerRuntimeStatus, error) { - bc := blockcontroller.GetBlockController(blockId) - if bc == nil { - return nil, nil - } - return bc.GetRuntimeStatus(), nil + return blockcontroller.GetBlockControllerRuntimeStatus(blockId), nil } func (*BlockService) SaveTerminalState_Meta() tsgenmeta.MethodMeta { diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index 70029392bc..82740fc645 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -301,11 +301,7 @@ func (ws *WshServer) SetViewCommand(ctx context.Context, data wshrpc.CommandBloc } func (ws *WshServer) ControllerStopCommand(ctx context.Context, blockId string) error { - bc := blockcontroller.GetBlockController(blockId) - if bc == nil { - return nil - } - bc.StopShellProc(true) + blockcontroller.StopBlockController(blockId) return nil } @@ -316,10 +312,6 @@ func (ws *WshServer) ControllerResyncCommand(ctx context.Context, data wshrpc.Co } func (ws *WshServer) ControllerInputCommand(ctx context.Context, data wshrpc.CommandBlockInputData) error { - bc := blockcontroller.GetBlockController(data.BlockId) - if bc == nil { - return fmt.Errorf("block controller not found for block %q", data.BlockId) - } inputUnion := &blockcontroller.BlockInputUnion{ SigName: data.SigName, TermSize: data.TermSize, @@ -332,7 +324,7 @@ func (ws *WshServer) ControllerInputCommand(ctx context.Context, data wshrpc.Com } inputUnion.InputData = inputBuf[:nw] } - return bc.SendInput(inputUnion) + return blockcontroller.SendInput(data.BlockId, inputUnion) } func (ws *WshServer) ControllerAppendOutputCommand(ctx context.Context, data wshrpc.CommandControllerAppendOutputData) error { From 83070899aca2c17a888e298bc1b5b3a8b7642a30 Mon Sep 17 00:00:00 2001 From: sawka Date: Sat, 13 Sep 2025 10:34:29 -0700 Subject: [PATCH 08/21] movefilesback flag and TsunamiBuildInternal exposed --- tsunami/build/build.go | 13 ++++++++----- tsunami/cmd/main-tsunami.go | 18 ++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 857e08b9d3..4d0c91e2b0 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -35,6 +35,7 @@ type BuildOpts struct { ScaffoldPath string SdkReplacePath string NodePath string + MoveFileBack bool } type BuildEnv struct { @@ -438,7 +439,7 @@ func setupSignalCleanup(buildEnv *BuildEnv, keepTemp, verbose bool) { } func TsunamiBuild(opts BuildOpts) error { - buildEnv, err := tsunamiBuildInternal(opts) + buildEnv, err := TsunamiBuildInternal(opts) defer buildEnv.cleanupTempDir(opts.KeepTemp, opts.Verbose) if err != nil { return err @@ -447,7 +448,7 @@ func TsunamiBuild(opts BuildOpts) error { return nil } -func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { +func TsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { buildEnv, err := verifyEnvironment(opts.Verbose, opts) if err != nil { return nil, err @@ -543,8 +544,10 @@ func tsunamiBuildInternal(opts BuildOpts) (*BuildEnv, error) { } // Move generated files back to original directory - if err := moveFilesBack(tempDir, opts.AppPath, opts.Verbose); err != nil { - return buildEnv, fmt.Errorf("failed to move files back: %w", err) + if opts.MoveFileBack { + if err := moveFilesBack(tempDir, opts.AppPath, opts.Verbose); err != nil { + return buildEnv, fmt.Errorf("failed to move files back: %w", err) + } } return buildEnv, nil @@ -699,7 +702,7 @@ func copyGoFiles(srcDir, destDir string) (int, error) { } func TsunamiRun(opts BuildOpts) error { - buildEnv, err := tsunamiBuildInternal(opts) + buildEnv, err := TsunamiBuildInternal(opts) defer buildEnv.cleanupTempDir(opts.KeepTemp, opts.Verbose) if err != nil { return err diff --git a/tsunami/cmd/main-tsunami.go b/tsunami/cmd/main-tsunami.go index 2ff2f2ac21..7fdc42c40b 100644 --- a/tsunami/cmd/main-tsunami.go +++ b/tsunami/cmd/main-tsunami.go @@ -67,10 +67,11 @@ var buildCmd = &cobra.Command{ keepTemp, _ := cmd.Flags().GetBool("keeptemp") output, _ := cmd.Flags().GetString("output") opts := build.BuildOpts{ - AppPath: args[0], - Verbose: verbose, - KeepTemp: keepTemp, - OutputFile: output, + AppPath: args[0], + Verbose: verbose, + KeepTemp: keepTemp, + OutputFile: output, + MoveFileBack: true, } if err := validateEnvironmentVars(&opts); err != nil { fmt.Println(err) @@ -94,10 +95,11 @@ var runCmd = &cobra.Command{ open, _ := cmd.Flags().GetBool("open") keepTemp, _ := cmd.Flags().GetBool("keeptemp") opts := build.BuildOpts{ - AppPath: args[0], - Verbose: verbose, - Open: open, - KeepTemp: keepTemp, + AppPath: args[0], + Verbose: verbose, + Open: open, + KeepTemp: keepTemp, + MoveFileBack: true, } if err := validateEnvironmentVars(&opts); err != nil { fmt.Println(err) From 074d6710b3984fca29de909833714c73148f3fe8 Mon Sep 17 00:00:00 2001 From: sawka Date: Sat, 13 Sep 2025 13:38:37 -0700 Subject: [PATCH 09/21] checkpoint on running tsunami apps --- pkg/blockcontroller/shell_controller.go | 4 - pkg/blockcontroller/shellcontroller.go | 2 - pkg/blockcontroller/tsunamicontroller.go | 378 +++++++++++++++++++++-- pkg/utilds/readerlinebuffer.go | 114 +++++++ tsunami/app/defaultclient.go | 14 + tsunami/build/build.go | 52 +++- 6 files changed, 520 insertions(+), 44 deletions(-) delete mode 100644 pkg/blockcontroller/shell_controller.go create mode 100644 pkg/utilds/readerlinebuffer.go diff --git a/pkg/blockcontroller/shell_controller.go b/pkg/blockcontroller/shell_controller.go deleted file mode 100644 index 14bb0fa65b..0000000000 --- a/pkg/blockcontroller/shell_controller.go +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright 2025, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -package blockcontroller diff --git a/pkg/blockcontroller/shellcontroller.go b/pkg/blockcontroller/shellcontroller.go index d67dae8b26..2aabec9eaf 100644 --- a/pkg/blockcontroller/shellcontroller.go +++ b/pkg/blockcontroller/shellcontroller.go @@ -147,8 +147,6 @@ func (sc *ShellController) SendInput(inputUnion *BlockInputUnion) error { return nil } -// All the existing private methods remain unchanged - func (sc *ShellController) WithLock(f func()) { sc.Lock.Lock() defer sc.Lock.Unlock() diff --git a/pkg/blockcontroller/tsunamicontroller.go b/pkg/blockcontroller/tsunamicontroller.go index 17c1ea71ad..9aa58c1c05 100644 --- a/pkg/blockcontroller/tsunamicontroller.go +++ b/pkg/blockcontroller/tsunamicontroller.go @@ -6,41 +6,129 @@ package blockcontroller import ( "context" "fmt" + "io" + "log" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "sync" + "github.com/wavetermdev/waveterm/pkg/utilds" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/waveobj" + "github.com/wavetermdev/waveterm/pkg/wps" "github.com/wavetermdev/waveterm/tsunami/build" ) -type TsunamiController struct { - blockId string - tabId string +type TsunamiAppProc struct { + Cmd *exec.Cmd + StdoutBuffer *utilds.ReaderLineBuffer + StderrBuffer *utilds.ReaderLineBuffer // May be nil if stderr was consumed for port detection + StdinWriter io.WriteCloser + Port int // Port the tsunami app is listening on + WaitCh chan struct{} // Channel that gets closed when cmd.Wait() returns + WaitRtn error // Error returned by cmd.Wait() } -func (tc *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error { - return fmt.Errorf("tsunami controller start not implemented") +type TsunamiController struct { + blockId string + tabId string + runLock sync.Mutex + tsunamiProc *TsunamiAppProc + statusLock sync.Mutex + status string + statusVersion int + exitCode int } -func (tc *TsunamiController) Stop(graceful bool, newStatus string) error { - return fmt.Errorf("tsunami controller stop not implemented") -} +func getCachesDir() string { + var cacheDir string + appBundle := "waveterm" + if wavebase.IsDevMode() { + appBundle = "waveterm-dev" + } -func (tc *TsunamiController) GetRuntimeStatus() *BlockControllerRuntimeStatus { - return nil + switch runtime.GOOS { + case "darwin": + // macOS: ~/Library/Caches/ + homeDir := wavebase.GetHomeDir() + cacheDir = filepath.Join(homeDir, "Library", "Caches", appBundle) + case "linux": + // Linux: XDG_CACHE_HOME or ~/.cache/ + xdgCache := os.Getenv("XDG_CACHE_HOME") + if xdgCache != "" { + cacheDir = filepath.Join(xdgCache, appBundle) + } else { + homeDir := wavebase.GetHomeDir() + cacheDir = filepath.Join(homeDir, ".cache", appBundle) + } + case "windows": + localAppData := os.Getenv("LOCALAPPDATA") + if localAppData != "" { + cacheDir = filepath.Join(localAppData, appBundle, "Cache") + } + } + + if cacheDir == "" { + tmpDir := os.TempDir() + cacheDir = filepath.Join(tmpDir, appBundle) + } + + return cacheDir } -func (tc *TsunamiController) SendInput(input *BlockInputUnion) error { - return fmt.Errorf("tsunami controller send input not implemented") +func getTsunamiAppCachePath(scope string, appName string, osArch string) (string, error) { + cachesDir := getCachesDir() + tsunamiCacheDir := filepath.Join(cachesDir, "tsunami-build-cache") + fullAppName := appName + "." + osArch + if strings.HasPrefix(osArch, "windows") { + fullAppName = fullAppName + ".exe" + } + fullPath := filepath.Join(tsunamiCacheDir, scope, fullAppName) + + // Create the directory if it doesn't exist + dirPath := filepath.Dir(fullPath) + err := wavebase.TryMkdirs(dirPath, 0755, "tsunami cache directory") + if err != nil { + return "", fmt.Errorf("failed to create tsunami cache directory: %w", err) + } + + return fullPath, nil } -func MakeTsunamiController(tabId string, blockId string) Controller { - return &TsunamiController{ - blockId: blockId, - tabId: tabId, +func isBuildCacheUpToDate(appPath string) (bool, error) { + appName := filepath.Base(appPath) + + osArch := runtime.GOOS + "-" + runtime.GOARCH + + cachePath, err := getTsunamiAppCachePath("local", appName, osArch) + if err != nil { + return false, err } + + cacheInfo, err := os.Stat(cachePath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + appModTime, err := build.GetAppModTime(appPath) + if err != nil { + return false, err + } + + cacheModTime := cacheInfo.ModTime() + return !cacheModTime.Before(appModTime), nil } -func runTsunami(blockMeta waveobj.MetaMapType) error { +func (c *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error { + c.runLock.Lock() + defer c.runLock.Unlock() + scaffoldPath := blockMeta.GetString(waveobj.MetaKey_TsunamiScaffoldPath, "") if scaffoldPath == "" { return fmt.Errorf("tsunami:scaffoldpath is required") @@ -56,21 +144,251 @@ func runTsunami(blockMeta waveobj.MetaMapType) error { return fmt.Errorf("tsunami:apppath is required") } - // Get Electron executable path - nodePath := wavebase.GetWaveAppElectronExecPath() - if nodePath == "" { - return fmt.Errorf("electron executable path not set") + appName := filepath.Base(appPath) + osArch := runtime.GOOS + "-" + runtime.GOARCH + + cachePath, err := getTsunamiAppCachePath("local", appName, osArch) + if err != nil { + return fmt.Errorf("failed to get cache path: %w", err) + } + + upToDate, err := isBuildCacheUpToDate(appPath) + if err != nil { + return fmt.Errorf("failed to check build cache: %w", err) + } + + if !upToDate || force { + nodePath := wavebase.GetWaveAppElectronExecPath() + if nodePath == "" { + return fmt.Errorf("electron executable path not set") + } + + opts := build.BuildOpts{ + AppPath: appPath, + Verbose: true, + Open: false, + KeepTemp: false, + OutputFile: cachePath, + ScaffoldPath: scaffoldPath, + SdkReplacePath: sdkReplacePath, + NodePath: nodePath, + } + + err = build.TsunamiBuild(opts) + if err != nil { + return fmt.Errorf("failed to build tsunami app: %w", err) + } + } + + info, err := os.Stat(cachePath) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("app cache does not exist: %s", cachePath) + } + return fmt.Errorf("failed to stat app cache: %w", err) + } + + if runtime.GOOS != "windows" && info.Mode()&0111 == 0 { + return fmt.Errorf("app cache is not executable: %s", cachePath) + } + + tsunamiProc, err := runTsunamiAppBinary(ctx, cachePath) + if err != nil { + return fmt.Errorf("failed to run tsunami app: %w", err) + } + + c.tsunamiProc = tsunamiProc + c.updateStatus(Status_Running) + go c.sendStatusUpdate() + + // Monitor process completion + go func() { + <-tsunamiProc.WaitCh + c.runLock.Lock() + if c.tsunamiProc == tsunamiProc { + c.tsunamiProc = nil + c.updateStatusWithExitCode(Status_Done, tsunamiProc.WaitRtn) + go c.sendStatusUpdate() + } + c.runLock.Unlock() + }() + + return nil +} + +func (c *TsunamiController) Stop(graceful bool, newStatus string) error { + c.runLock.Lock() + defer c.runLock.Unlock() + + if c.tsunamiProc == nil { + return nil + } + + if c.tsunamiProc.Cmd.Process != nil { + c.tsunamiProc.Cmd.Process.Kill() + } + + if c.tsunamiProc.StdinWriter != nil { + c.tsunamiProc.StdinWriter.Close() + } + + c.tsunamiProc = nil + if newStatus == "" { + newStatus = Status_Done } + c.updateStatus(newStatus) + go c.sendStatusUpdate() + return nil +} - opts := build.BuildOpts{ - AppPath: appPath, - Verbose: true, - Open: false, - KeepTemp: false, - ScaffoldPath: scaffoldPath, - SdkReplacePath: sdkReplacePath, - NodePath: nodePath, +func (c *TsunamiController) GetRuntimeStatus() *BlockControllerRuntimeStatus { + c.statusLock.Lock() + defer c.statusLock.Unlock() + + c.statusVersion++ + return &BlockControllerRuntimeStatus{ + BlockId: c.blockId, + Version: c.statusVersion, + ShellProcStatus: c.status, + ShellProcExitCode: c.exitCode, } +} + +func (c *TsunamiController) SendInput(input *BlockInputUnion) error { + return fmt.Errorf("tsunami controller send input not implemented") +} - return build.TsunamiRun(opts) +func runTsunamiAppBinary(ctx context.Context, appBinPath string) (*TsunamiAppProc, error) { + cmd := exec.CommandContext(ctx, appBinPath, "--close-on-stdin") + + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdout pipe: %w", err) + } + + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stderr pipe: %w", err) + } + + stdinPipe, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdin pipe: %w", err) + } + + stdoutBuffer := utilds.MakeReaderLineBuffer(stdoutPipe, 1000) + stderrBuffer := utilds.MakeReaderLineBuffer(stderrPipe, 1000) + + err = cmd.Start() + if err != nil { + return nil, fmt.Errorf("failed to start tsunami app: %w", err) + } + + // Create wait channel and tsunami proc first + waitCh := make(chan struct{}) + tsunamiProc := &TsunamiAppProc{ + Cmd: cmd, + StdoutBuffer: stdoutBuffer, + StderrBuffer: stderrBuffer, + StdinWriter: stdinPipe, + WaitCh: waitCh, + } + + // Start goroutine to handle cmd.Wait() + go func() { + tsunamiProc.WaitRtn = cmd.Wait() + close(waitCh) + }() + + go stdoutBuffer.ReadAll() + + // Monitor stderr for port information + portChan := make(chan int, 1) + errChan := make(chan error, 1) + + go func() { + for { + line, err := stderrBuffer.ReadLine() + if err != nil { + errChan <- fmt.Errorf("stderr buffer error: %w", err) + return + } + + port := build.ParseTsunamiPort(line) + if port > 0 { + portChan <- port + return + } + } + }() + + // Wait for either port detection, process death, or context timeout + go func() { + <-tsunamiProc.WaitCh + select { + case <-portChan: + // Port already found, nothing to do + default: + errChan <- fmt.Errorf("tsunami process died before emitting listening message") + } + }() + + select { + case port := <-portChan: + // Start the stderr ReadAll goroutine now that we have the port + go stderrBuffer.ReadAll() + + tsunamiProc.Port = port + return tsunamiProc, nil + case err := <-errChan: + cmd.Process.Kill() + return nil, err + case <-ctx.Done(): + cmd.Process.Kill() + return nil, fmt.Errorf("timeout waiting for tsunami port: %w", ctx.Err()) + } +} + +func MakeTsunamiController(tabId string, blockId string) Controller { + return &TsunamiController{ + blockId: blockId, + tabId: tabId, + status: Status_Init, + } +} + +// requires the lock (so do not call while holding statusLock) +func (c *TsunamiController) sendStatusUpdate() { + rtStatus := c.GetRuntimeStatus() + log.Printf("sending blockcontroller update %#v\n", rtStatus) + wps.Broker.Publish(wps.WaveEvent{ + Event: wps.Event_ControllerStatus, + Scopes: []string{ + waveobj.MakeORef(waveobj.OType_Tab, c.tabId).String(), + waveobj.MakeORef(waveobj.OType_Block, c.blockId).String(), + }, + Data: rtStatus, + }) +} + +func (c *TsunamiController) updateStatus(newStatus string) { + c.statusLock.Lock() + defer c.statusLock.Unlock() + c.status = newStatus +} + +func (c *TsunamiController) updateStatusWithExitCode(newStatus string, waitErr error) { + c.statusLock.Lock() + defer c.statusLock.Unlock() + + c.status = newStatus + if waitErr != nil { + if exitError, ok := waitErr.(*exec.ExitError); ok { + c.exitCode = exitError.ExitCode() + } else { + c.exitCode = 1 + } + } else { + c.exitCode = 0 + } } diff --git a/pkg/utilds/readerlinebuffer.go b/pkg/utilds/readerlinebuffer.go new file mode 100644 index 0000000000..c100b15d02 --- /dev/null +++ b/pkg/utilds/readerlinebuffer.go @@ -0,0 +1,114 @@ +package utilds + +import ( + "bufio" + "io" + "sync" +) + +type ReaderLineBuffer struct { + lock sync.Mutex + lines []string + maxLines int + totalLineCount int + reader io.Reader + scanner *bufio.Scanner + done bool +} + +func MakeReaderLineBuffer(reader io.Reader, maxLines int) *ReaderLineBuffer { + if maxLines <= 0 { + maxLines = 1000 // default max lines + } + + rlb := &ReaderLineBuffer{ + lines: make([]string, 0, maxLines), + maxLines: maxLines, + totalLineCount: 0, + reader: reader, + scanner: bufio.NewScanner(reader), + done: false, + } + + return rlb +} + +func (rlb *ReaderLineBuffer) IsDone() bool { + rlb.lock.Lock() + defer rlb.lock.Unlock() + return rlb.done +} + +func (rlb *ReaderLineBuffer) setDone() { + rlb.lock.Lock() + defer rlb.lock.Unlock() + rlb.done = true +} + +func (rlb *ReaderLineBuffer) ReadLine() (string, error) { + if rlb.IsDone() { + return "", io.EOF + } + + if rlb.scanner.Scan() { + line := rlb.scanner.Text() + rlb.addLine(line) + return line, nil + } + + // Check for scanner error + if err := rlb.scanner.Err(); err != nil { + rlb.setDone() + return "", err + } + + rlb.setDone() + return "", io.EOF +} + +func (rlb *ReaderLineBuffer) addLine(line string) { + rlb.lock.Lock() + defer rlb.lock.Unlock() + + rlb.totalLineCount++ + + if len(rlb.lines) >= rlb.maxLines { + // Remove oldest line to make room + copy(rlb.lines, rlb.lines[1:]) + rlb.lines[len(rlb.lines)-1] = line + } else { + rlb.lines = append(rlb.lines, line) + } +} + +func (rlb *ReaderLineBuffer) GetLines() []string { + rlb.lock.Lock() + defer rlb.lock.Unlock() + + result := make([]string, len(rlb.lines)) + copy(result, rlb.lines) + return result +} + +func (rlb *ReaderLineBuffer) GetLineCount() int { + rlb.lock.Lock() + defer rlb.lock.Unlock() + + return len(rlb.lines) +} + +func (rlb *ReaderLineBuffer) GetTotalLineCount() int { + rlb.lock.Lock() + defer rlb.lock.Unlock() + + return rlb.totalLineCount +} + +func (rlb *ReaderLineBuffer) ReadAll() { + for { + _, err := rlb.ReadLine() + if err != nil { + break + } + } +} diff --git a/tsunami/app/defaultclient.go b/tsunami/app/defaultclient.go index 17dbfbe84f..82184f204a 100644 --- a/tsunami/app/defaultclient.go +++ b/tsunami/app/defaultclient.go @@ -5,8 +5,11 @@ package app import ( "encoding/json" + "flag" + "io" "io/fs" "net/http" + "os" "github.com/wavetermdev/waveterm/tsunami/engine" "github.com/wavetermdev/waveterm/tsunami/vdom" @@ -88,6 +91,17 @@ func HandleDynFunc(pattern string, fn func(http.ResponseWriter, *http.Request)) // RunMain is used internally by generated code and should not be called directly. func RunMain() { + closeOnStdin := flag.Bool("close-on-stdin", false, "exit when stdin is closed") + flag.Parse() + + if *closeOnStdin { + go func() { + // Read stdin until EOF/close, then exit the process + io.ReadAll(os.Stdin) + os.Exit(0) + }() + } + engine.GetDefaultClient().RunMain() } diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 4d0c91e2b0..0da1bae866 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -7,6 +7,7 @@ import ( "go/token" "io" "log" + "net/url" "os" "os/exec" "os/signal" @@ -319,6 +320,15 @@ func verifyTsunamiDir(dir string) error { return nil } +func GetAppModTime(appPath string) (time.Time, error) { + appGoPath := filepath.Join(appPath, "app.go") + info, err := os.Stat(appGoPath) + if err != nil { + return time.Time{}, fmt.Errorf("failed to get app.go mod time: %w", err) + } + return info.ModTime(), nil +} + func verifyScaffoldPath(scaffoldPath string) error { if scaffoldPath == "" { return fmt.Errorf("scaffoldPath cannot be empty") @@ -760,7 +770,6 @@ func monitorAndOpenBrowser(r io.ReadCloser, verbose bool) { defer r.Close() scanner := bufio.NewScanner(r) - urlRegex := regexp.MustCompile(`\[tsunami\] listening at (http://[^\s]+)`) browserOpened := false if verbose { log.Printf("monitoring for browser open\n") @@ -770,14 +779,41 @@ func monitorAndOpenBrowser(r io.ReadCloser, verbose bool) { line := scanner.Text() fmt.Println(line) - if !browserOpened && len(urlRegex.FindStringSubmatch(line)) > 1 { - matches := urlRegex.FindStringSubmatch(line) - url := matches[1] - if verbose { - log.Printf("Opening browser to %s", url) + if !browserOpened { + port := ParseTsunamiPort(line) + if port > 0 { + url := fmt.Sprintf("http://localhost:%d", port) + if verbose { + log.Printf("Opening browser to %s", url) + } + go util.OpenBrowser(url, 100*time.Millisecond) + browserOpened = true } - go util.OpenBrowser(url, 100*time.Millisecond) - browserOpened = true } } } + +func ParseTsunamiPort(line string) int { + urlRegex := regexp.MustCompile(`\[tsunami\] listening at (http://[^\s]+)`) + matches := urlRegex.FindStringSubmatch(line) + if len(matches) < 2 { + return 0 + } + + u, err := url.Parse(matches[1]) + if err != nil { + return 0 + } + + portStr := u.Port() + if portStr == "" { + return 0 + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return 0 + } + + return port +} From fabf2844f9716ce44e6e7e815046ad1fc230a6a1 Mon Sep 17 00:00:00 2001 From: sawka Date: Sat, 13 Sep 2025 13:53:43 -0700 Subject: [PATCH 10/21] tsunami port in status --- pkg/blockcontroller/blockcontroller.go | 1 + pkg/blockcontroller/tsunamicontroller.go | 63 ++++++++++++++---------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index 19b4113138..909e500cfd 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -57,6 +57,7 @@ type BlockControllerRuntimeStatus struct { ShellProcStatus string `json:"shellprocstatus,omitempty"` ShellProcConnName string `json:"shellprocconnname,omitempty"` ShellProcExitCode int `json:"shellprocexitcode"` + TsunamiPort int `json:"tsunamiport,omitempty"` } // Controller interface that all block controllers must implement diff --git a/pkg/blockcontroller/tsunamicontroller.go b/pkg/blockcontroller/tsunamicontroller.go index 9aa58c1c05..7c3938239c 100644 --- a/pkg/blockcontroller/tsunamicontroller.go +++ b/pkg/blockcontroller/tsunamicontroller.go @@ -27,7 +27,7 @@ type TsunamiAppProc struct { StdoutBuffer *utilds.ReaderLineBuffer StderrBuffer *utilds.ReaderLineBuffer // May be nil if stderr was consumed for port detection StdinWriter io.WriteCloser - Port int // Port the tsunami app is listening on + Port int // Port the tsunami app is listening on WaitCh chan struct{} // Channel that gets closed when cmd.Wait() returns WaitRtn error // Error returned by cmd.Wait() } @@ -41,6 +41,7 @@ type TsunamiController struct { status string statusVersion int exitCode int + port int } func getCachesDir() string { @@ -198,7 +199,10 @@ func (c *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMap } c.tsunamiProc = tsunamiProc - c.updateStatus(Status_Running) + c.WithStatusLock(func() { + c.status = Status_Running + c.port = tsunamiProc.Port + }) go c.sendStatusUpdate() // Monitor process completion @@ -207,12 +211,16 @@ func (c *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMap c.runLock.Lock() if c.tsunamiProc == tsunamiProc { c.tsunamiProc = nil - c.updateStatusWithExitCode(Status_Done, tsunamiProc.WaitRtn) + c.WithStatusLock(func() { + c.status = Status_Done + c.port = 0 + c.exitCode = exitCodeFromWaitErr(tsunamiProc.WaitRtn) + }) go c.sendStatusUpdate() } c.runLock.Unlock() }() - + return nil } @@ -236,22 +244,31 @@ func (c *TsunamiController) Stop(graceful bool, newStatus string) error { if newStatus == "" { newStatus = Status_Done } - c.updateStatus(newStatus) + c.WithStatusLock(func() { + c.status = newStatus + c.port = 0 + }) go c.sendStatusUpdate() return nil } func (c *TsunamiController) GetRuntimeStatus() *BlockControllerRuntimeStatus { - c.statusLock.Lock() - defer c.statusLock.Unlock() + var rtn *BlockControllerRuntimeStatus + c.WithStatusLock(func() { + c.statusVersion++ + rtn = &BlockControllerRuntimeStatus{ + BlockId: c.blockId, + Version: c.statusVersion, + ShellProcStatus: c.status, + ShellProcExitCode: c.exitCode, + } + + if c.status == Status_Running && c.port > 0 { + rtn.TsunamiPort = c.port + } + }) - c.statusVersion++ - return &BlockControllerRuntimeStatus{ - BlockId: c.blockId, - Version: c.statusVersion, - ShellProcStatus: c.status, - ShellProcExitCode: c.exitCode, - } + return rtn } func (c *TsunamiController) SendInput(input *BlockInputUnion) error { @@ -293,7 +310,7 @@ func runTsunamiAppBinary(ctx context.Context, appBinPath string) (*TsunamiAppPro StdinWriter: stdinPipe, WaitCh: waitCh, } - + // Start goroutine to handle cmd.Wait() go func() { tsunamiProc.WaitRtn = cmd.Wait() @@ -371,24 +388,20 @@ func (c *TsunamiController) sendStatusUpdate() { }) } -func (c *TsunamiController) updateStatus(newStatus string) { +func (c *TsunamiController) WithStatusLock(fn func()) { c.statusLock.Lock() defer c.statusLock.Unlock() - c.status = newStatus + fn() } -func (c *TsunamiController) updateStatusWithExitCode(newStatus string, waitErr error) { - c.statusLock.Lock() - defer c.statusLock.Unlock() - - c.status = newStatus +func exitCodeFromWaitErr(waitErr error) int { if waitErr != nil { if exitError, ok := waitErr.(*exec.ExitError); ok { - c.exitCode = exitError.ExitCode() + return exitError.ExitCode() } else { - c.exitCode = 1 + return 1 } } else { - c.exitCode = 0 + return 0 } } From d7b075545b6c7d6d2dae4d64271813ce53b293c9 Mon Sep 17 00:00:00 2001 From: sawka Date: Sat, 13 Sep 2025 14:02:21 -0700 Subject: [PATCH 11/21] check for abs path --- frontend/types/gotypes.d.ts | 1 + pkg/blockcontroller/.gitignore | 1 + pkg/blockcontroller/tsunamicontroller.go | 21 +++++++++++++++++++++ 3 files changed, 23 insertions(+) create mode 100644 pkg/blockcontroller/.gitignore diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 9130afa27c..647af8cf47 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -60,6 +60,7 @@ declare global { shellprocstatus?: string; shellprocconnname?: string; shellprocexitcode: number; + tsunamiport?: number; }; // waveobj.BlockDef diff --git a/pkg/blockcontroller/.gitignore b/pkg/blockcontroller/.gitignore new file mode 100644 index 0000000000..a27ba57401 --- /dev/null +++ b/pkg/blockcontroller/.gitignore @@ -0,0 +1 @@ +*.old \ No newline at end of file diff --git a/pkg/blockcontroller/tsunamicontroller.go b/pkg/blockcontroller/tsunamicontroller.go index 7c3938239c..7679b3ccd3 100644 --- a/pkg/blockcontroller/tsunamicontroller.go +++ b/pkg/blockcontroller/tsunamicontroller.go @@ -134,16 +134,37 @@ func (c *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMap if scaffoldPath == "" { return fmt.Errorf("tsunami:scaffoldpath is required") } + scaffoldPath, err := wavebase.ExpandHomeDir(scaffoldPath) + if err != nil { + return fmt.Errorf("tsunami:scaffoldpath invalid: %w", err) + } + if !filepath.IsAbs(scaffoldPath) { + return fmt.Errorf("tsunami:scaffoldpath must be absolute: %s", scaffoldPath) + } sdkReplacePath := blockMeta.GetString(waveobj.MetaKey_TsunamiSdkReplacePath, "") if sdkReplacePath == "" { return fmt.Errorf("tsunami:sdkreplacepath is required") } + sdkReplacePath, err = wavebase.ExpandHomeDir(sdkReplacePath) + if err != nil { + return fmt.Errorf("tsunami:sdkreplacepath invalid: %w", err) + } + if !filepath.IsAbs(sdkReplacePath) { + return fmt.Errorf("tsunami:sdkreplacepath must be absolute: %s", sdkReplacePath) + } appPath := blockMeta.GetString(waveobj.MetaKey_TsunamiAppPath, "") if appPath == "" { return fmt.Errorf("tsunami:apppath is required") } + appPath, err = wavebase.ExpandHomeDir(appPath) + if err != nil { + return fmt.Errorf("tsunami:apppath invalid: %w", err) + } + if !filepath.IsAbs(appPath) { + return fmt.Errorf("tsunami:apppath must be absolute: %s", appPath) + } appName := filepath.Base(appPath) osArch := runtime.GOOS + "-" + runtime.GOARCH From 4203d80d3885cce3c99b089bd5a8f4b95cb7b124 Mon Sep 17 00:00:00 2001 From: sawka Date: Sat, 13 Sep 2025 14:33:16 -0700 Subject: [PATCH 12/21] getting there with the tsunami widget --- frontend/app/view/tsunami/tsunami.tsx | 121 ++++++++++++++++++++++- pkg/blockcontroller/tsunamicontroller.go | 10 +- tsunami/app/defaultclient.go | 2 + 3 files changed, 127 insertions(+), 6 deletions(-) diff --git a/frontend/app/view/tsunami/tsunami.tsx b/frontend/app/view/tsunami/tsunami.tsx index 43bc88ee88..d588b61f61 100644 --- a/frontend/app/view/tsunami/tsunami.tsx +++ b/frontend/app/view/tsunami/tsunami.tsx @@ -2,7 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 import { BlockNodeModel } from "@/app/block/blocktypes"; -import { WOS } from "@/app/store/global"; +import { atoms, globalStore, WOS } from "@/app/store/global"; +import { waveEventSubscribe } from "@/app/store/wps"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { TabRpcClient } from "@/app/store/wshrpcutil"; +import * as services from "@/store/services"; import * as jotai from "jotai"; import { memo } from "react"; @@ -12,6 +16,9 @@ class TsunamiViewModel implements ViewModel { blockId: string; viewIcon: jotai.Atom; viewName: jotai.Atom; + shellProcFullStatus: jotai.PrimitiveAtom; + shellProcStatusUnsubFn: () => void; + isRestarting: jotai.PrimitiveAtom; constructor(blockId: string, nodeModel: BlockNodeModel) { this.viewType = "tsunami"; @@ -19,12 +26,64 @@ class TsunamiViewModel implements ViewModel { this.blockAtom = WOS.getWaveObjectAtom(`block:${blockId}`); this.viewIcon = jotai.atom("cube"); this.viewName = jotai.atom("Tsunami"); + this.isRestarting = jotai.atom(false); + + this.shellProcFullStatus = jotai.atom(null) as jotai.PrimitiveAtom; + const initialShellProcStatus = services.BlockService.GetControllerStatus(blockId); + initialShellProcStatus.then((rts) => { + this.updateShellProcStatus(rts); + }); + this.shellProcStatusUnsubFn = waveEventSubscribe({ + eventType: "controllerstatus", + scope: WOS.makeORef("block", blockId), + handler: (event) => { + let bcRTS: BlockControllerRuntimeStatus = event.data; + this.updateShellProcStatus(bcRTS); + }, + }); } get viewComponent(): ViewComponent { return TsunamiView; } + updateShellProcStatus(fullStatus: BlockControllerRuntimeStatus) { + console.log("tsunami-status", fullStatus); + if (fullStatus == null) { + return; + } + const curStatus = globalStore.get(this.shellProcFullStatus); + if (curStatus == null || curStatus.version < fullStatus.version) { + globalStore.set(this.shellProcFullStatus, fullStatus); + } + } + + triggerRestartAtom() { + globalStore.set(this.isRestarting, true); + setTimeout(() => { + globalStore.set(this.isRestarting, false); + }, 300); + } + + forceRestartController() { + if (globalStore.get(this.isRestarting)) { + return; + } + this.triggerRestartAtom(); + const prtn = RpcApi.ControllerResyncCommand(TabRpcClient, { + tabid: globalStore.get(atoms.staticTabId), + blockid: this.blockId, + forcerestart: true, + }); + prtn.catch((e) => console.log("error controller resync (force restart)", e)); + } + + dispose() { + if (this.shellProcStatusUnsubFn) { + this.shellProcStatusUnsubFn(); + } + } + getSettingsMenuItems(): ContextMenuItem[] { return []; } @@ -35,13 +94,69 @@ type TsunamiViewProps = { }; const TsunamiView = memo(({ model }: TsunamiViewProps) => { + const shellProcFullStatus = jotai.useAtomValue(model.shellProcFullStatus); + const blockData = jotai.useAtomValue(model.blockAtom); + const isRestarting = jotai.useAtomValue(model.isRestarting); + + const appPath = blockData?.meta?.["tsunami:apppath"]; + const controller = blockData?.meta?.controller; + + // Check for configuration errors + const errors = []; + if (!appPath) { + errors.push("App path must be set (tsunami:apppath)"); + } + if (controller !== "tsunami") { + errors.push("Invalid controller (must be 'tsunami')"); + } + + // Show errors if any exist + if (errors.length > 0) { + return ( +
+

Tsunami

+
+ {errors.map((error, index) => ( +
+ {error} +
+ ))} +
+
+ ); + } + + // Check if we should show the iframe + const shouldShowIframe = + shellProcFullStatus?.shellprocstatus === "running" && + shellProcFullStatus?.tsunamiport && + shellProcFullStatus.tsunamiport !== 0; + + if (shouldShowIframe) { + const iframeUrl = `http://localhost:${shellProcFullStatus.tsunamiport}/?clientid=wave:${model.blockId}`; + return