From 7e87af136a88a97c8c5dfe609054a7192f8edb00 Mon Sep 17 00:00:00 2001 From: Abizer Lokhandwala Date: Thu, 30 Apr 2026 17:00:34 -0700 Subject: [PATCH 1/5] session recovery checkpoint --- cmd/nssh/config.go | 40 ++++++++++++++++++++++++++-------------- cmd/nssh/log.go | 2 ++ cmd/nssh/session.go | 14 ++++++++++---- cmd/nssh/status.go | 22 +++++++++++++++++++++- 4 files changed, 59 insertions(+), 19 deletions(-) diff --git a/cmd/nssh/config.go b/cmd/nssh/config.go index e5a38eb..ffd3190 100644 --- a/cmd/nssh/config.go +++ b/cmd/nssh/config.go @@ -62,24 +62,37 @@ func readTOML(path string) map[string]string { return m } -// loadConfig resolves the ntfy server and topic from (in priority order): -// 1. Environment variables (NSSH_NTFY_BASE) -// 2. ~/.config/nssh/config.toml (server, topic) — persistent user config -// 3. ~/.local/state/nssh/session (server, topic) — written by nssh at connect time -// 4. Defaults: server=https://ntfy.sh, topic= +// loadConfig resolves the ntfy server and topic for shim mode (xclip, xdg-open +// etc. running on a remote shell). Priority (highest first): +// 1. NSSH_NTFY_BASE env var (server only) +// 2. ~/.config/nssh/config.toml — persistent user config +// 3. ~/.local/state/nssh/session — written by `nssh ` at connect time +// 4. Defaults: server=https://ntfy.sh func loadConfig() nsshConfig { + return resolveConfig(true) +} + +// loadSessionConfig is loadConfig minus the remote-style session file. Used by +// `nssh ` on the local Mac, where the session file is a remote +// convention; reading it locally would mean every new local nssh inherits the +// topic of the last remote shell that was prepared, defeating per-host reuse. +func loadSessionConfig() nsshConfig { + return resolveConfig(false) +} + +func resolveConfig(includeSessionFile bool) nsshConfig { cfg := nsshConfig{Server: defaultServer} - // Session file (written by nssh session mode at connect time). - session := readTOML(filepath.Join(stateDir(), "session")) - if session["server"] != "" { - cfg.Server = session["server"] - } - if session["topic"] != "" { - cfg.Topic = session["topic"] + if includeSessionFile { + session := readTOML(filepath.Join(stateDir(), "session")) + if session["server"] != "" { + cfg.Server = session["server"] + } + if session["topic"] != "" { + cfg.Topic = session["topic"] + } } - // Permanent config overrides session. config := readTOML(filepath.Join(configDir(), "config.toml")) if config["server"] != "" { cfg.Server = config["server"] @@ -88,7 +101,6 @@ func loadConfig() nsshConfig { cfg.Topic = config["topic"] } - // Env var overrides everything for server. if v := os.Getenv("NSSH_NTFY_BASE"); v != "" { cfg.Server = v } diff --git a/cmd/nssh/log.go b/cmd/nssh/log.go index 2868460..05556f8 100644 --- a/cmd/nssh/log.go +++ b/cmd/nssh/log.go @@ -30,11 +30,13 @@ type LogEvent struct { // Session lifecycle. Target string `json:"target,omitempty"` + Host string `json:"host,omitempty"` Server string `json:"server,omitempty"` Topic string `json:"topic,omitempty"` Version string `json:"version,omitempty"` Exit *int `json:"exit,omitempty"` Mosh *bool `json:"mosh,omitempty"` + Joined int `json:"joined,omitempty"` // Shim invocation. Persona string `json:"persona,omitempty"` diff --git a/cmd/nssh/session.go b/cmd/nssh/session.go index 566e1c9..0778f14 100644 --- a/cmd/nssh/session.go +++ b/cmd/nssh/session.go @@ -64,16 +64,22 @@ func nsshMain() { return } - cfg := loadConfig() + cfg := loadSessionConfig() if cfg.Topic == "" { - cfg.Topic = generateTopic() + if existing := findActiveSessionForHost(shortHost); existing != nil { + cfg.Topic = existing.Topic + cfg.Server = existing.Server + fmt.Fprintf(os.Stderr, "nssh: joining active session for %s (PID %d)\n", shortHost, existing.PID) + } else { + cfg.Topic = generateTopic() + } } fmt.Fprintf(os.Stderr, "nssh: subscribing to %s\n", cfg.topicURL()) openLog(cfg.Topic, "session") - logEvent(LogEvent{Event: "session-start", Target: sshTarget, Server: cfg.Server}) + logEvent(LogEvent{Event: "session-start", Target: sshTarget, Host: shortHost, Server: cfg.Server}) - sessionFile, err := registerSession(cfg, sshTarget) + sessionFile, err := registerSession(cfg, sshTarget, shortHost) if err != nil { fmt.Fprintf(os.Stderr, "nssh: register session: %v\n", err) } diff --git a/cmd/nssh/status.go b/cmd/nssh/status.go index 09d1f60..8509b8f 100644 --- a/cmd/nssh/status.go +++ b/cmd/nssh/status.go @@ -20,6 +20,7 @@ import ( type sessionInfo struct { PID int `json:"pid"` Target string `json:"target"` + Host string `json:"host"` // canonical short host from `ssh -G`; key for session reuse Topic string `json:"topic"` Server string `json:"server"` Started time.Time `json:"started"` @@ -34,7 +35,7 @@ func sessionsDir() string { // registerSession writes a pidfile describing the current session. Called once // by nsshMain just before the subscriber starts. Returns the file path so the // caller can unregister explicitly (defers don't fire under os.Exit). -func registerSession(cfg nsshConfig, target string) (string, error) { +func registerSession(cfg nsshConfig, target, host string) (string, error) { dir := sessionsDir() if err := os.MkdirAll(dir, 0o755); err != nil { return "", err @@ -43,6 +44,7 @@ func registerSession(cfg nsshConfig, target string) (string, error) { info := sessionInfo{ PID: os.Getpid(), Target: target, + Host: host, Topic: cfg.Topic, Server: cfg.Server, Started: time.Now().UTC(), @@ -62,6 +64,24 @@ func unregisterSession(path string) { _ = os.Remove(path) } +// findActiveSessionForHost returns the oldest live session targeting the given +// canonical short host, or nil if there isn't one. Used to share a single ntfy +// topic across all nssh processes connected to the same host — otherwise a +// drive-by `nssh ` would generate a new topic, overwrite the remote's +// session file, and orphan the long-running subscriber. +func findActiveSessionForHost(host string) *sessionInfo { + if host == "" { + return nil + } + for _, s := range activeSessions() { + if s.Host == host { + s := s + return &s + } + } + return nil +} + // activeSessions scans the sessions dir, GCs entries whose PID is no longer // alive, and returns the survivors sorted by start time. func activeSessions() []sessionInfo { From 82101ebf3af237eda791b6038c614bb1d652f49e Mon Sep 17 00:00:00 2001 From: Abizer Lokhandwala Date: Mon, 11 May 2026 17:04:56 -0700 Subject: [PATCH 2/5] nssh: harden session recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - skip prepareRemote when joining an existing session for the host (the original process already wrote the remote session file and seeded the log; redoing it costs an SSH and adds a misleading duplicate session-open event) - record `joined: ` on the local session-start event so the log distinguishes a fresh connect from a join subscriber resilience: - log subscribe-up / subscribe-down events with reconnect gap, so silent failures are visible in `nssh status --tail` - track the most-recent ntfy message id and pass it as ?since= on every reconnect — anything posted while we were asleep / on a broken connection is replayed instead of dropped on the floor liveness + collision UX: - new ping / pong wire kinds and a pingTopic helper (one-shot subscribe + publish + wait for ack) to distinguish "PID is alive and answering" from "PID is alive but wedged" - on a session collision, ping the existing peer and prompt interactively: [J]oin / [R]eplace (SIGTERM + escalate to SIGKILL) / [N]ew topic / [C]ancel. Default is join when alive, replace when not. Non-interactive shells silently join with a stderr warning if the peer didn't answer. - new flags --join / --replace / --new to skip the prompt remote cleanup: - new `nssh sweep [--all|--older ] ` subcommand that enumerates mosh-server processes owned by $USER on the remote and offers to kill them. Safe with tmux-inside-mosh: tmux server is its own daemon, so detached sessions survive. follow-ups in #10 (auto-sweep on connect, spare-yourself in --all). Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 1 + CLAUDE.md | 26 ++++ cmd/nssh/log.go | 5 + cmd/nssh/main.go | 12 +- cmd/nssh/session.go | 270 ++++++++++++++++++++++++++++++++++--- cmd/nssh/sweep.go | 300 +++++++++++++++++++++++++++++++++++++++++ cmd/nssh/sweep_test.go | 38 ++++++ docs/protocol.md | 13 +- internal/ntfy/ntfy.go | 6 + 9 files changed, 649 insertions(+), 22 deletions(-) create mode 100644 cmd/nssh/sweep.go create mode 100644 cmd/nssh/sweep_test.go diff --git a/.gitignore b/.gitignore index 6379f09..1016a27 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /nssh /nssh-linux .remember/ +/.claude/ diff --git a/CLAUDE.md b/CLAUDE.md index dc4341e..e5ccabd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -41,6 +41,12 @@ to `infect self`. `infect self` creates symlinks in `~/.local/bin` pointing at the running nssh binary (darwin: no-op; desktop linux: refuses without --force to avoid shadowing real xclip/xdg-open). +`nssh sweep ` lists `mosh-server` processes owned by $USER on +the remote and offers to kill them. Safe when running tmux-inside-mosh: +killing mosh-server doesn't kill the tmux server, so detached sessions +survive. Use `--all` for unattended cleanup or `--older 168h` to keep +only the last week. + ## Architecture ### Dispatch on argv[0] @@ -82,6 +88,8 @@ JSON envelopes on the ntfy topic. Every message has a `kind` field: | `clip-write` | remote → local | Write data to the Mac clipboard | | `clip-read-request` | remote → local | Request the Mac clipboard contents | | `clip-read-response` | local → remote | Response with clipboard data | +| `ping` | local ↔ local | Liveness probe between two nssh processes sharing a topic | +| `pong` | local ↔ local | Ack for `ping`, echoing the same correlation id | Small text (≤3KB) is base64-encoded inline in the `body` field. Larger payloads and images are sent as ntfy attachments (PUT with `Filename` + `X-Message` headers). @@ -93,6 +101,24 @@ no config required. nssh writes the server + topic to `~/.local/state/nssh/sessi on the remote before launching the shell (and seeds a `session-open` event into the JSONL log). The shim reads this file. +### Session collisions + +A pidfile per live local nssh is kept at `~/.local/state/nssh/sessions/.json`. +On startup, nssh looks up the host (canonical short name from `ssh -G`) in that +registry. When an existing session is found nssh sends a `ping` on the topic and +waits ~1.5s for a `pong`, then prompts (in an interactive shell) for one of: + +| Choice | Effect | +|--------|--------| +| join | adopt the existing topic; both subscribers see every message | +| replace | SIGTERM the existing PID, then SIGKILL after 1s if it's still up; fresh topic | +| new | fresh topic; existing PID is left running but the remote bridge will follow the new topic | + +Default in the prompt is `join` if the peer answered the ping, `replace` if it +didn't. Non-interactive shells silently join (with a warning on the stderr if the +peer was unresponsive). Override with `--join` / `--replace` / `--new` on the +command line. + Optional `~/.config/nssh/config.toml` on either side to pin values: ```toml server = "https://ntfy.example.com" # default: https://ntfy.sh diff --git a/cmd/nssh/log.go b/cmd/nssh/log.go index 05556f8..f58961c 100644 --- a/cmd/nssh/log.go +++ b/cmd/nssh/log.go @@ -42,6 +42,11 @@ type LogEvent struct { Persona string `json:"persona,omitempty"` Args []string `json:"args,omitempty"` + // Subscriber resilience (subscribe-up / subscribe-down). + Reconnect bool `json:"reconnect,omitempty"` + Gap string `json:"gap,omitempty"` + Since string `json:"since,omitempty"` + // Error context. Err string `json:"err,omitempty"` } diff --git a/cmd/nssh/main.go b/cmd/nssh/main.go index d525452..a46a6ea 100644 --- a/cmd/nssh/main.go +++ b/cmd/nssh/main.go @@ -14,11 +14,18 @@ var buildVersion string func usage() { fmt.Fprintln(os.Stderr, "usage:") - fmt.Fprintln(os.Stderr, " nssh [--ssh|--mosh] [ssh args...] open a session") + fmt.Fprintln(os.Stderr, " nssh [--ssh|--mosh] [--join|--replace|--new] [ssh args...]") + fmt.Fprintln(os.Stderr, " open a session") fmt.Fprintln(os.Stderr, " nssh infect [--force] install on a remote host") fmt.Fprintln(os.Stderr, " nssh infect [--force] self symlink personas on this machine") fmt.Fprintln(os.Stderr, " nssh status [--tail] show active sessions") + fmt.Fprintln(os.Stderr, " nssh sweep [--all|--older ] kill orphan mosh-servers on a host") fmt.Fprintln(os.Stderr, " nssh --version print version info") + fmt.Fprintln(os.Stderr, "") + fmt.Fprintln(os.Stderr, "session collision flags (when another nssh is already attached to ):") + fmt.Fprintln(os.Stderr, " --join share the existing ntfy topic (no prompt)") + fmt.Fprintln(os.Stderr, " --replace kill the existing nssh, take over with a fresh topic") + fmt.Fprintln(os.Stderr, " --new generate a fresh topic without killing the existing") os.Exit(1) } @@ -71,6 +78,9 @@ func main() { case "status": statusCmd(os.Args[2:]) return + case "sweep": + sweepCmd(os.Args[2:]) + return case "-v", "--version": printVersion() return diff --git a/cmd/nssh/session.go b/cmd/nssh/session.go index 0778f14..c1f4c20 100644 --- a/cmd/nssh/session.go +++ b/cmd/nssh/session.go @@ -11,6 +11,8 @@ import ( "os" "os/exec" "os/signal" + "path/filepath" + "strings" "syscall" "time" @@ -29,6 +31,7 @@ func nsshMain() { args := os.Args[1:] forceSSH := false forceMosh := false + collisionFlag := "" // "join" | "replace" | "new" | "" for len(args) > 0 { switch args[0] { case "--ssh": @@ -39,6 +42,18 @@ func nsshMain() { forceMosh = true args = args[1:] continue + case "--join": + collisionFlag = "join" + args = args[1:] + continue + case "--replace": + collisionFlag = "replace" + args = args[1:] + continue + case "--new": + collisionFlag = "new" + args = args[1:] + continue case "-h", "--help": usage() } @@ -65,19 +80,34 @@ func nsshMain() { } cfg := loadSessionConfig() + joinedPID := 0 if cfg.Topic == "" { - if existing := findActiveSessionForHost(shortHost); existing != nil { + existing := findActiveSessionForHost(shortHost) + switch resolveSessionCollision(existing, collisionFlag) { + case "join": cfg.Topic = existing.Topic cfg.Server = existing.Server + joinedPID = existing.PID fmt.Fprintf(os.Stderr, "nssh: joining active session for %s (PID %d)\n", shortHost, existing.PID) - } else { + case "replace": + fmt.Fprintf(os.Stderr, "nssh: replacing existing session for %s (PID %d)\n", shortHost, existing.PID) + replaceSession(existing) + cfg.Topic = generateTopic() + case "new": + if existing != nil { + fmt.Fprintf(os.Stderr, "nssh: starting on a fresh topic; existing PID %d will be left on the old one\n", existing.PID) + } cfg.Topic = generateTopic() } } fmt.Fprintf(os.Stderr, "nssh: subscribing to %s\n", cfg.topicURL()) openLog(cfg.Topic, "session") - logEvent(LogEvent{Event: "session-start", Target: sshTarget, Host: shortHost, Server: cfg.Server}) + startEvent := LogEvent{Event: "session-start", Target: sshTarget, Host: shortHost, Server: cfg.Server} + if joinedPID != 0 { + startEvent.Joined = joinedPID + } + logEvent(startEvent) sessionFile, err := registerSession(cfg, sshTarget, shortHost) if err != nil { @@ -85,20 +115,24 @@ func nsshMain() { } defer unregisterSession(sessionFile) - // One SSH login-shell to probe version, write the session file, and seed - // the remote JSONL log before the interactive session starts. - remoteVer := prepareRemote(sshTarget, cfg) - if localVer := version(); isReleaseVersion(localVer) { - switch { - case remoteVer == "": - fmt.Fprintln(os.Stderr, "nssh: not installed on remote — clipboard bridge will not work") - if promptYes(" install it now?") { - infectRemote(sshTarget, false) - } - case semver.Compare(remoteVer, localVer) != 0: - fmt.Fprintf(os.Stderr, "nssh: remote version %s, local %s\n", remoteVer, localVer) - if promptYes(" update remote to " + localVer + "?") { - infectRemote(sshTarget, false) + // First nssh for this host: probe version, write the remote session file, + // and seed the remote JSONL log. When joining, the original process + // already did this and the remote state is still correct — skip the extra + // SSH and the version prompt the user has already seen. + if joinedPID == 0 { + remoteVer := prepareRemote(sshTarget, cfg) + if localVer := version(); isReleaseVersion(localVer) { + switch { + case remoteVer == "": + fmt.Fprintln(os.Stderr, "nssh: not installed on remote — clipboard bridge will not work") + if promptYes(" install it now?") { + infectRemote(sshTarget, false) + } + case semver.Compare(remoteVer, localVer) != 0: + fmt.Fprintf(os.Stderr, "nssh: remote version %s, local %s\n", remoteVer, localVer) + if promptYes(" update remote to " + localVer + "?") { + infectRemote(sshTarget, false) + } } } } @@ -124,6 +158,93 @@ func nsshMain() { } } +// resolveSessionCollision picks how this nssh should relate to any existing +// nssh attached to the same host. Inputs: +// - existing: nil if no live session for this host (just generate a topic) +// - flag: one of "" (decide automatically) / "join" / "replace" / "new" +// +// When unforced, we ping the existing topic to test whether the peer is +// responsive; an unresponsive peer makes "replace" the prompt default since +// joining a wedged subscriber is almost never what the user wants. +func resolveSessionCollision(existing *sessionInfo, flag string) string { + if existing == nil { + return "new" + } + switch flag { + case "join", "replace", "new": + return flag + } + + alive := pingTopic(existing.Server, existing.Topic, 1500*time.Millisecond) + age := time.Since(existing.Started).Round(time.Second) + fmt.Fprintf(os.Stderr, "nssh: existing session on %s (PID %d, started %s ago, alive=%v)\n", + existing.Host, existing.PID, age, alive) + + stat, err := os.Stdin.Stat() + interactive := err == nil && stat.Mode()&os.ModeCharDevice != 0 + if !interactive { + // In a script — joining is the least surprising default. Warn if the + // peer didn't answer so the operator sees something is wrong. + if !alive { + fmt.Fprintln(os.Stderr, "nssh: peer did not respond to ping; joining anyway (pass --replace or --new to override)") + } + return "join" + } + + defaultChoice := "join" + prompt := " [J]oin / [R]eplace (kill PID) / [N]ew topic / [C]ancel? [J] " + if !alive { + defaultChoice = "replace" + prompt = " [R]eplace (kill PID) / [N]ew topic / [J]oin anyway / [C]ancel? [R] " + } + fmt.Fprint(os.Stderr, prompt) + var resp string + fmt.Scanln(&resp) + switch strings.ToLower(strings.TrimSpace(resp)) { + case "j", "join": + return "join" + case "r", "replace": + return "replace" + case "n", "new": + return "new" + case "c", "cancel", "q", "quit": + fmt.Fprintln(os.Stderr, "nssh: cancelled") + os.Exit(0) + case "": + return defaultChoice + } + fmt.Fprintf(os.Stderr, "nssh: unrecognized choice %q, using default (%s)\n", resp, defaultChoice) + return defaultChoice +} + +// replaceSession terminates the existing nssh process and removes its pidfile. +// Sends SIGTERM first (lets defers run on the old process — unregister, logs); +// escalates to SIGKILL if it's still alive after 1s. Best-effort: missing +// pidfile or already-dead process is not an error. +func replaceSession(s *sessionInfo) { + if s == nil || s.PID <= 0 { + return + } + if err := syscall.Kill(s.PID, syscall.SIGTERM); err != nil { + // Process already gone — just clean up the pidfile. + _ = os.Remove(filepath.Join(sessionsDir(), fmt.Sprintf("%d.json", s.PID))) + return + } + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if syscall.Kill(s.PID, 0) != nil { + break + } + time.Sleep(50 * time.Millisecond) + } + if syscall.Kill(s.PID, 0) == nil { + // Still alive — escalate. SIGKILL won't run the other process's + // defers, so we have to remove its pidfile ourselves. + _ = syscall.Kill(s.PID, syscall.SIGKILL) + } + _ = os.Remove(filepath.Join(sessionsDir(), fmt.Sprintf("%d.json", s.PID))) +} + // runSession execs the interactive ssh/mosh subprocess, wires its stdio to // our terminal, and forwards INT/TERM/HUP signals until it exits. func runSession(cmd *exec.Cmd, sigs <-chan os.Signal) error { @@ -221,11 +342,24 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { }, } + var ( + lastID string // most recent ntfy message id we processed + downAt time.Time // when the previous connection dropped, zero on cold start + downLogged bool // whether we've already logged subscribe-down for the current outage + ) + for { if ctx.Err() != nil { return } - req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + url := endpoint + if lastID != "" { + // ntfy's ?since= is exclusive: we get back messages strictly + // newer than lastID. Without this, anything posted while we were + // asleep or disconnected is dropped on the floor. + url += "?since=" + lastID + } + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return } @@ -235,6 +369,11 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { return } fmt.Fprintf(os.Stderr, "nssh: ntfy: %v — retrying\n", err) + if !downLogged { + logEvent(LogEvent{Event: "subscribe-down", Err: err.Error()}) + downAt = time.Now() + downLogged = true + } select { case <-ctx.Done(): return @@ -243,6 +382,17 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { continue } + upEvent := LogEvent{Event: "subscribe-up"} + if !downAt.IsZero() { + upEvent.Reconnect = true + upEvent.Gap = time.Since(downAt).Round(time.Second).String() + } + if lastID != "" { + upEvent.Since = lastID + } + logEvent(upEvent) + downLogged = false + scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { var msg ntfy.Msg @@ -250,11 +400,23 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { continue } if msg.Event == "message" && msg.Message != "" { + if msg.ID != "" { + lastID = msg.ID + } go handleMessage(msg, topicURL, sshTarget) } } - if err := scanner.Err(); err != nil && ctx.Err() == nil { - fmt.Fprintf(os.Stderr, "nssh: ntfy stream ended (%v) — reconnecting\n", err) + reason := "eof" + if err := scanner.Err(); err != nil { + reason = err.Error() + if ctx.Err() == nil { + fmt.Fprintf(os.Stderr, "nssh: ntfy stream ended (%v) — reconnecting\n", err) + } + } + if ctx.Err() == nil { + logEvent(LogEvent{Event: "subscribe-down", Err: reason}) + downAt = time.Now() + downLogged = true } resp.Body.Close() @@ -305,7 +467,75 @@ func handleMessage(msg ntfy.Msg, topicURL, sshTarget string) { handleClipReadRequest(env, topicURL) case "clip-read-response": // Responses are for the remote shim, not us. Ignore. + case "ping": + handlePing(env, topicURL) + case "pong": + // Pongs are for whoever issued the matching ping. Ignore here. default: fmt.Fprintf(os.Stderr, "nssh: unknown envelope kind %q\n", env.Kind) } } + +// handlePing publishes a pong with the same correlation id. Used by a peer +// nssh process to verify this subscriber is alive (not just kill -0 alive). +func handlePing(env wire.Envelope, topicURL string) { + resp := wire.Envelope{Kind: "pong", ID: env.ID} + if err := wire.Publish(topicURL, resp, nil); err != nil { + fmt.Fprintf(os.Stderr, "nssh: pong: %v\n", err) + return + } + logMessage("out", resp, 0) +} + +// pingTopic publishes a ping envelope to the topic and waits up to `timeout` +// for a pong with the matching correlation id. Returns true if any peer on +// the topic acked the ping. Used at session-start to decide whether an +// existing pidfile points at a live, responsive nssh or a wedged one. +// +// We open the subscriber *before* publishing so we don't race the pong: +// ntfy's "messages I have not seen yet" view starts at connect time. +func pingTopic(server, topic string, timeout time.Duration) bool { + topicURL := strings.TrimRight(server, "/") + "/" + topic + corrID := generateTopic() // reuse: just need an unguessable random string + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", topicURL+"/json", nil) + if err != nil { + return false + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + + // Publish ping *after* the subscriber is connected — otherwise the pong + // can race ahead of us and we'd never see it. + go func() { + _ = wire.Publish(topicURL, wire.Envelope{Kind: "ping", ID: corrID}, nil) + }() + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + if ctx.Err() != nil { + return false + } + var msg ntfy.Msg + if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil { + continue + } + if msg.Event != "message" { + continue + } + env, ok := wire.Parse(msg.Message) + if !ok { + continue + } + if env.Kind == "pong" && env.ID == corrID { + return true + } + } + return false +} diff --git a/cmd/nssh/sweep.go b/cmd/nssh/sweep.go new file mode 100644 index 0000000..3390c13 --- /dev/null +++ b/cmd/nssh/sweep.go @@ -0,0 +1,300 @@ +package main + +import ( + "bufio" + "fmt" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "time" +) + +// moshServer is one mosh-server process on the remote. +type moshServer struct { + PID int + Uptime time.Duration +} + +func sweepUsage() { + fmt.Fprintln(os.Stderr, "usage: nssh sweep [--all|--older ] ") + fmt.Fprintln(os.Stderr, " --all kill every mosh-server owned by $USER on ") + fmt.Fprintln(os.Stderr, " --older kill mosh-servers older than the given duration (e.g. 24h, 168h)") + os.Exit(1) +} + +// sweepCmd handles `nssh sweep [--all|--older ] `. +// +// Lists mosh-server processes owned by $USER on the remote, then either kills +// all of them (--all), kills those older than a threshold (--older), or +// interactively prompts. SIGTERM with a 2s grace period, then SIGKILL. +// +// Safe to use when running tmux-inside-mosh: killing mosh-server doesn't kill +// the tmux server, so detached sessions survive. +func sweepCmd(args []string) { + all := false + var olderThan time.Duration + var target string + + for i := 0; i < len(args); i++ { + a := args[i] + switch { + case a == "--all": + all = true + case a == "--older": + i++ + if i >= len(args) { + sweepUsage() + } + d, err := time.ParseDuration(args[i]) + if err != nil { + fmt.Fprintf(os.Stderr, "nssh: invalid duration %q: %v\n", args[i], err) + os.Exit(1) + } + olderThan = d + case strings.HasPrefix(a, "--older="): + d, err := time.ParseDuration(strings.TrimPrefix(a, "--older=")) + if err != nil { + fmt.Fprintf(os.Stderr, "nssh: invalid duration %q: %v\n", a, err) + os.Exit(1) + } + olderThan = d + case a == "-h", a == "--help": + sweepUsage() + default: + if target != "" { + fmt.Fprintf(os.Stderr, "nssh: unexpected arg %q\n", a) + os.Exit(1) + } + target = a + } + } + if target == "" { + sweepUsage() + } + if all && olderThan > 0 { + fmt.Fprintln(os.Stderr, "nssh: --all and --older are mutually exclusive") + os.Exit(1) + } + + servers, err := listRemoteMoshServers(target) + if err != nil { + fmt.Fprintf(os.Stderr, "nssh: list mosh-servers on %s: %v\n", target, err) + os.Exit(1) + } + if len(servers) == 0 { + fmt.Printf("no mosh-server processes for $USER on %s\n", target) + return + } + + fmt.Printf("%d mosh-server process(es) on %s:\n", len(servers), target) + fmt.Printf(" %-8s %s\n", "PID", "UPTIME") + for _, s := range servers { + fmt.Printf(" %-8d %s\n", s.PID, shortDuration(s.Uptime)) + } + + var toKill []int + switch { + case all: + for _, s := range servers { + toKill = append(toKill, s.PID) + } + case olderThan > 0: + for _, s := range servers { + if s.Uptime > olderThan { + toKill = append(toKill, s.PID) + } + } + if len(toKill) == 0 { + fmt.Printf("no mosh-servers older than %s\n", olderThan) + return + } + default: + toKill = promptSweepSelection(servers) + if len(toKill) == 0 { + return + } + } + + pidStrs := make([]string, len(toKill)) + for i, p := range toKill { + pidStrs[i] = strconv.Itoa(p) + } + fmt.Printf("nssh: sending SIGTERM to %s on %s…\n", strings.Join(pidStrs, ", "), target) + if err := killRemotePIDs(target, "TERM", pidStrs); err != nil { + fmt.Fprintf(os.Stderr, "nssh: kill -TERM: %v\n", err) + os.Exit(1) + } + + // Give them a couple seconds to exit cleanly, then re-check and escalate. + time.Sleep(2 * time.Second) + survivors, err := listRemoteMoshServers(target) + if err != nil { + return // partial success is still success; don't error out + } + stillAlive := pidsStillIn(toKill, survivors) + if len(stillAlive) == 0 { + return + } + pidStrs = pidStrs[:0] + for _, p := range stillAlive { + pidStrs = append(pidStrs, strconv.Itoa(p)) + } + fmt.Fprintf(os.Stderr, "nssh: %d survived SIGTERM, sending SIGKILL: %s\n", len(stillAlive), strings.Join(pidStrs, ", ")) + if err := killRemotePIDs(target, "KILL", pidStrs); err != nil { + fmt.Fprintf(os.Stderr, "nssh: kill -KILL: %v\n", err) + os.Exit(1) + } +} + +// listRemoteMoshServers SSHs to target and returns mosh-server processes +// owned by the user, sorted oldest-first. Parses `ps`'s portable etime +// format ([[DD-]hh:]mm:ss) since not every system's procps supports etimes. +func listRemoteMoshServers(target string) ([]moshServer, error) { + cmd := exec.Command("ssh", "-o", "BatchMode=yes", target, + `ps -u "$USER" -o pid=,etime=,comm= 2>/dev/null | awk '$3=="mosh-server"{print $1, $2}'`) + cmd.Stderr = os.Stderr + out, err := cmd.Output() + if err != nil { + return nil, err + } + var servers []moshServer + for _, line := range strings.Split(string(out), "\n") { + parts := strings.Fields(line) + if len(parts) < 2 { + continue + } + pid, err := strconv.Atoi(parts[0]) + if err != nil { + continue + } + dur, err := parseEtime(parts[1]) + if err != nil { + continue + } + servers = append(servers, moshServer{PID: pid, Uptime: dur}) + } + sort.Slice(servers, func(i, j int) bool { + return servers[i].Uptime > servers[j].Uptime // oldest first + }) + return servers, nil +} + +// parseEtime parses ps `etime` format: [[DD-]hh:]mm:ss. Returns the duration. +func parseEtime(s string) (time.Duration, error) { + var days int + if i := strings.Index(s, "-"); i >= 0 { + d, err := strconv.Atoi(s[:i]) + if err != nil { + return 0, fmt.Errorf("etime days %q: %w", s, err) + } + days = d + s = s[i+1:] + } + parts := strings.Split(s, ":") + if len(parts) < 2 || len(parts) > 3 { + return 0, fmt.Errorf("etime %q: unexpected format", s) + } + nums := make([]int, len(parts)) + for i, p := range parts { + n, err := strconv.Atoi(p) + if err != nil { + return 0, fmt.Errorf("etime %q: %w", s, err) + } + nums[i] = n + } + var hours, minutes, seconds int + switch len(nums) { + case 3: + hours, minutes, seconds = nums[0], nums[1], nums[2] + case 2: + minutes, seconds = nums[0], nums[1] + } + return time.Duration(days)*24*time.Hour + + time.Duration(hours)*time.Hour + + time.Duration(minutes)*time.Minute + + time.Duration(seconds)*time.Second, nil +} + +// promptSweepSelection asks the user which PIDs to kill. Accepts: +// - "all" or "a": every server in the list +// - "old": every server older than 24h +// - comma- or space-separated PIDs +// - empty: skip (kill nothing) +func promptSweepSelection(servers []moshServer) []int { + stat, err := os.Stdin.Stat() + if err != nil || stat.Mode()&os.ModeCharDevice == 0 { + fmt.Fprintln(os.Stderr, "nssh: non-interactive; pass --all or --older to choose") + return nil + } + fmt.Print("kill which? (PIDs, \"all\", \"old\" for >24h, empty to skip): ") + reader := bufio.NewReader(os.Stdin) + line, err := reader.ReadString('\n') + if err != nil { + return nil + } + line = strings.TrimSpace(line) + if line == "" { + return nil + } + switch strings.ToLower(line) { + case "all", "a": + out := make([]int, len(servers)) + for i, s := range servers { + out[i] = s.PID + } + return out + case "old", "o": + var out []int + for _, s := range servers { + if s.Uptime > 24*time.Hour { + out = append(out, s.PID) + } + } + return out + } + wanted := map[int]bool{} + for _, tok := range strings.FieldsFunc(line, func(r rune) bool { return r == ',' || r == ' ' }) { + n, err := strconv.Atoi(strings.TrimSpace(tok)) + if err != nil { + fmt.Fprintf(os.Stderr, "nssh: ignoring unparseable token %q\n", tok) + continue + } + wanted[n] = true + } + var out []int + for _, s := range servers { + if wanted[s.PID] { + out = append(out, s.PID) + delete(wanted, s.PID) + } + } + for p := range wanted { + fmt.Fprintf(os.Stderr, "nssh: PID %d not in the list, ignoring\n", p) + } + return out +} + +// killRemotePIDs runs `kill - ...` on the remote. +func killRemotePIDs(target, sig string, pids []string) error { + args := append([]string{"-o", "BatchMode=yes", target, "kill", "-" + sig}, pids...) + cmd := exec.Command("ssh", args...) + cmd.Stderr = os.Stderr + return cmd.Run() +} + +// pidsStillIn returns the subset of `pids` whose values appear in `servers`. +func pidsStillIn(pids []int, servers []moshServer) []int { + alive := map[int]bool{} + for _, s := range servers { + alive[s.PID] = true + } + var out []int + for _, p := range pids { + if alive[p] { + out = append(out, p) + } + } + return out +} diff --git a/cmd/nssh/sweep_test.go b/cmd/nssh/sweep_test.go new file mode 100644 index 0000000..6d78824 --- /dev/null +++ b/cmd/nssh/sweep_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "testing" + "time" +) + +func TestParseEtime(t *testing.T) { + cases := []struct { + in string + want time.Duration + }{ + {"00:15", 15 * time.Second}, + {"1:30", 90 * time.Second}, + {"02:15:30", 2*time.Hour + 15*time.Minute + 30*time.Second}, + {"4-12:00:00", 4*24*time.Hour + 12*time.Hour}, + {"7-00:00:00", 7 * 24 * time.Hour}, + {"30-23:59:59", 30*24*time.Hour + 23*time.Hour + 59*time.Minute + 59*time.Second}, + } + for _, c := range cases { + got, err := parseEtime(c.in) + if err != nil { + t.Errorf("parseEtime(%q) err: %v", c.in, err) + continue + } + if got != c.want { + t.Errorf("parseEtime(%q) = %v, want %v", c.in, got, c.want) + } + } +} + +func TestParseEtimeRejects(t *testing.T) { + for _, in := range []string{"", "abc", "1", "1:2:3:4", "a:b", "-:00:00"} { + if _, err := parseEtime(in); err == nil { + t.Errorf("parseEtime(%q) = nil err, want error", in) + } + } +} diff --git a/docs/protocol.md b/docs/protocol.md index 9236a68..cfdf4b2 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -47,6 +47,8 @@ on ntfy.sh and unlimited on self-hosted. | `clip-write` | remote → local | `mime`, payload | Write data to the macOS clipboard. Empty payload (`Body == ""` and no attachment) is dropped with a stderr message. | | `clip-read-request` | remote → local | `id`, `mime` | Ask the local side to read its clipboard. The shim subscribes to the topic with `?since=` for 5s, waiting for a matching response. | | `clip-read-response` | local → remote | `id`, `mime`, payload | Response to `clip-read-request`. The shim filters by `id` so concurrent reads don't cross paths. Body prefixed with `ERROR: ` indicates failure (e.g. clipboard tools missing). | +| `ping` | local ↔ local | `id` | Liveness probe between two nssh processes sharing a topic. The peer replies with a `pong` carrying the same `id`. | +| `pong` | local ↔ local | `id` | Ack for `ping`. Senders correlate by `id` so concurrent pings don't cross paths. | ### MIME conventions @@ -85,16 +87,23 @@ type LogEvent struct { // Session lifecycle. Target string `json:"target,omitempty"` + Host string `json:"host,omitempty"` Server string `json:"server,omitempty"` Topic string `json:"topic,omitempty"` Version string `json:"version,omitempty"` Exit *int `json:"exit,omitempty"` Mosh *bool `json:"mosh,omitempty"` + Joined int `json:"joined,omitempty"` // Shim invocation. Persona string `json:"persona,omitempty"` Args []string `json:"args,omitempty"` + // Subscriber resilience (subscribe-up / subscribe-down). + Reconnect bool `json:"reconnect,omitempty"` + Gap string `json:"gap,omitempty"` + Since string `json:"since,omitempty"` + // Error context. Err string `json:"err,omitempty"` } @@ -105,8 +114,10 @@ type LogEvent struct { | Event | Emitted by | Fields | Meaning | |-------|------------|--------|---------| | `session-open` | local (during `prepareRemote`, written to remote log via SSH heredoc) | `server`, `topic`, `target`, `version` | Local nssh announces itself to the remote at session start. Side is `session-init`. | -| `session-start` | local | `target`, `server` | Local subscriber is starting. | +| `session-start` | local | `target`, `host`, `server`, `joined` | Local subscriber is starting. `joined` is the PID of the existing nssh whose topic we adopted, omitted on a fresh connect. | | `session-end` | local | `exit`, `mosh` | Local interactive session ended. `exit` is `0` on success; `mosh` records which transport was used. | +| `subscribe-up` | local | `reconnect`, `gap`, `since` | ntfy `/json` long-poll connected. `reconnect=true` plus `gap` is set after a prior `subscribe-down`; `since` echoes the ntfy message id resumed from. | +| `subscribe-down` | local | `err` | ntfy `/json` long-poll dropped (network, timeout, EOF). Followed by a reconnect attempt. | | `msg-send` | either | `kind`, `mime`, `id`, `url`, `size` | Envelope published to the topic. | | `msg-recv` | either | `kind`, `mime`, `id`, `url`, `size` | Envelope received from the topic. | | `msg-unknown` | either | `size` | Topic message that didn't parse as a valid envelope. | diff --git a/internal/ntfy/ntfy.go b/internal/ntfy/ntfy.go index aeb5eff..1d83b83 100644 --- a/internal/ntfy/ntfy.go +++ b/internal/ntfy/ntfy.go @@ -16,7 +16,13 @@ type Attachment struct { } // Msg is a single message from the ntfy JSON stream. +// +// ID is ntfy's per-message identifier; pass it back as ?since= on +// reconnect to replay anything published while we were offline. Time is +// the unix seconds the server timestamped the message. type Msg struct { + ID string `json:"id"` + Time int64 `json:"time,omitempty"` Event string `json:"event"` Message string `json:"message"` Attachment *Attachment `json:"attachment,omitempty"` From ec5dc3a2621f6596120710099c293f03886ee9f9 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 11 May 2026 19:42:38 -0700 Subject: [PATCH 3/5] fix(nssh): guard against nil existing session in collision switch --- cmd/nssh/session.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/nssh/session.go b/cmd/nssh/session.go index c1f4c20..7c9ccc8 100644 --- a/cmd/nssh/session.go +++ b/cmd/nssh/session.go @@ -85,11 +85,19 @@ func nsshMain() { existing := findActiveSessionForHost(shortHost) switch resolveSessionCollision(existing, collisionFlag) { case "join": + if existing == nil { + cfg.Topic = generateTopic() + break + } cfg.Topic = existing.Topic cfg.Server = existing.Server joinedPID = existing.PID fmt.Fprintf(os.Stderr, "nssh: joining active session for %s (PID %d)\n", shortHost, existing.PID) case "replace": + if existing == nil { + cfg.Topic = generateTopic() + break + } fmt.Fprintf(os.Stderr, "nssh: replacing existing session for %s (PID %d)\n", shortHost, existing.PID) replaceSession(existing) cfg.Topic = generateTopic() From 5ce5743c99200bf6dac830055b9a9a8168c00f72 Mon Sep 17 00:00:00 2001 From: Abizer Lokhandwala Date: Mon, 11 May 2026 19:53:16 -0700 Subject: [PATCH 4/5] nssh: simplify session recovery helpers - collapse --join/--replace/--new flag parsing onto one case arm - flatten the collision switch so the existing==nil branch hoists out of all three "case" bodies - drop the redundant joinedPID guard in session-start (omitempty handles 0) - replaceSession: extract pidfile path once, single deferred os.Remove - subscribeNtfy: hoist subscribe-down logging into a markDown closure shared by the connect-failure and stream-end paths - sweep: collapse --older/--older= branches; extract pidsToStrings --- cmd/nssh/session.go | 97 +++++++++++++++++++-------------------------- cmd/nssh/sweep.go | 41 +++++++++---------- 2 files changed, 62 insertions(+), 76 deletions(-) diff --git a/cmd/nssh/session.go b/cmd/nssh/session.go index 7c9ccc8..39a3771 100644 --- a/cmd/nssh/session.go +++ b/cmd/nssh/session.go @@ -32,32 +32,21 @@ func nsshMain() { forceSSH := false forceMosh := false collisionFlag := "" // "join" | "replace" | "new" | "" +parse: for len(args) > 0 { switch args[0] { case "--ssh": forceSSH = true - args = args[1:] - continue case "--mosh": forceMosh = true - args = args[1:] - continue - case "--join": - collisionFlag = "join" - args = args[1:] - continue - case "--replace": - collisionFlag = "replace" - args = args[1:] - continue - case "--new": - collisionFlag = "new" - args = args[1:] - continue + case "--join", "--replace", "--new": + collisionFlag = strings.TrimPrefix(args[0], "--") case "-h", "--help": usage() + default: + break parse } - break + args = args[1:] } if forceSSH && forceMosh { fmt.Fprintln(os.Stderr, "nssh: --ssh and --mosh are mutually exclusive") @@ -83,26 +72,19 @@ func nsshMain() { joinedPID := 0 if cfg.Topic == "" { existing := findActiveSessionForHost(shortHost) - switch resolveSessionCollision(existing, collisionFlag) { - case "join": - if existing == nil { - cfg.Topic = generateTopic() - break - } + choice := resolveSessionCollision(existing, collisionFlag) + switch { + case existing != nil && choice == "join": cfg.Topic = existing.Topic cfg.Server = existing.Server joinedPID = existing.PID fmt.Fprintf(os.Stderr, "nssh: joining active session for %s (PID %d)\n", shortHost, existing.PID) - case "replace": - if existing == nil { - cfg.Topic = generateTopic() - break - } + case existing != nil && choice == "replace": fmt.Fprintf(os.Stderr, "nssh: replacing existing session for %s (PID %d)\n", shortHost, existing.PID) replaceSession(existing) cfg.Topic = generateTopic() - case "new": - if existing != nil { + default: + if existing != nil && choice == "new" { fmt.Fprintf(os.Stderr, "nssh: starting on a fresh topic; existing PID %d will be left on the old one\n", existing.PID) } cfg.Topic = generateTopic() @@ -111,11 +93,13 @@ func nsshMain() { fmt.Fprintf(os.Stderr, "nssh: subscribing to %s\n", cfg.topicURL()) openLog(cfg.Topic, "session") - startEvent := LogEvent{Event: "session-start", Target: sshTarget, Host: shortHost, Server: cfg.Server} - if joinedPID != 0 { - startEvent.Joined = joinedPID - } - logEvent(startEvent) + logEvent(LogEvent{ + Event: "session-start", + Target: sshTarget, + Host: shortHost, + Server: cfg.Server, + Joined: joinedPID, // omitempty drops 0 + }) sessionFile, err := registerSession(cfg, sshTarget, shortHost) if err != nil { @@ -233,24 +217,22 @@ func replaceSession(s *sessionInfo) { if s == nil || s.PID <= 0 { return } + pidfile := filepath.Join(sessionsDir(), fmt.Sprintf("%d.json", s.PID)) + defer os.Remove(pidfile) + if err := syscall.Kill(s.PID, syscall.SIGTERM); err != nil { - // Process already gone — just clean up the pidfile. - _ = os.Remove(filepath.Join(sessionsDir(), fmt.Sprintf("%d.json", s.PID))) - return + return // process already gone } deadline := time.Now().Add(time.Second) for time.Now().Before(deadline) { if syscall.Kill(s.PID, 0) != nil { - break + return } time.Sleep(50 * time.Millisecond) } - if syscall.Kill(s.PID, 0) == nil { - // Still alive — escalate. SIGKILL won't run the other process's - // defers, so we have to remove its pidfile ourselves. - _ = syscall.Kill(s.PID, syscall.SIGKILL) - } - _ = os.Remove(filepath.Join(sessionsDir(), fmt.Sprintf("%d.json", s.PID))) + // Still alive — escalate. SIGKILL won't run the other process's defers, + // so we leave pidfile cleanup to our deferred os.Remove above. + _ = syscall.Kill(s.PID, syscall.SIGKILL) } // runSession execs the interactive ssh/mosh subprocess, wires its stdio to @@ -353,8 +335,19 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { var ( lastID string // most recent ntfy message id we processed downAt time.Time // when the previous connection dropped, zero on cold start - downLogged bool // whether we've already logged subscribe-down for the current outage + downLogged bool // already logged subscribe-down for the current outage ) + // markDown is idempotent for the current outage: connect-failure and + // stream-end paths both invoke it, but only the first call records the + // timestamp + log line. Cleared on every successful subscribe-up. + markDown := func(reason string) { + if downLogged || ctx.Err() != nil { + return + } + logEvent(LogEvent{Event: "subscribe-down", Err: reason}) + downAt = time.Now() + downLogged = true + } for { if ctx.Err() != nil { @@ -377,11 +370,7 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { return } fmt.Fprintf(os.Stderr, "nssh: ntfy: %v — retrying\n", err) - if !downLogged { - logEvent(LogEvent{Event: "subscribe-down", Err: err.Error()}) - downAt = time.Now() - downLogged = true - } + markDown(err.Error()) select { case <-ctx.Done(): return @@ -421,11 +410,7 @@ func subscribeNtfy(ctx context.Context, cfg nsshConfig, sshTarget string) { fmt.Fprintf(os.Stderr, "nssh: ntfy stream ended (%v) — reconnecting\n", err) } } - if ctx.Err() == nil { - logEvent(LogEvent{Event: "subscribe-down", Err: reason}) - downAt = time.Now() - downLogged = true - } + markDown(reason) resp.Body.Close() select { diff --git a/cmd/nssh/sweep.go b/cmd/nssh/sweep.go index 3390c13..3d71f08 100644 --- a/cmd/nssh/sweep.go +++ b/cmd/nssh/sweep.go @@ -39,27 +39,19 @@ func sweepCmd(args []string) { for i := 0; i < len(args); i++ { a := args[i] + var durStr string switch { case a == "--all": all = true + continue case a == "--older": i++ if i >= len(args) { sweepUsage() } - d, err := time.ParseDuration(args[i]) - if err != nil { - fmt.Fprintf(os.Stderr, "nssh: invalid duration %q: %v\n", args[i], err) - os.Exit(1) - } - olderThan = d + durStr = args[i] case strings.HasPrefix(a, "--older="): - d, err := time.ParseDuration(strings.TrimPrefix(a, "--older=")) - if err != nil { - fmt.Fprintf(os.Stderr, "nssh: invalid duration %q: %v\n", a, err) - os.Exit(1) - } - olderThan = d + durStr = strings.TrimPrefix(a, "--older=") case a == "-h", a == "--help": sweepUsage() default: @@ -68,7 +60,14 @@ func sweepCmd(args []string) { os.Exit(1) } target = a + continue + } + d, err := time.ParseDuration(durStr) + if err != nil { + fmt.Fprintf(os.Stderr, "nssh: invalid duration %q: %v\n", durStr, err) + os.Exit(1) } + olderThan = d } if target == "" { sweepUsage() @@ -117,10 +116,7 @@ func sweepCmd(args []string) { } } - pidStrs := make([]string, len(toKill)) - for i, p := range toKill { - pidStrs[i] = strconv.Itoa(p) - } + pidStrs := pidsToStrings(toKill) fmt.Printf("nssh: sending SIGTERM to %s on %s…\n", strings.Join(pidStrs, ", "), target) if err := killRemotePIDs(target, "TERM", pidStrs); err != nil { fmt.Fprintf(os.Stderr, "nssh: kill -TERM: %v\n", err) @@ -137,10 +133,7 @@ func sweepCmd(args []string) { if len(stillAlive) == 0 { return } - pidStrs = pidStrs[:0] - for _, p := range stillAlive { - pidStrs = append(pidStrs, strconv.Itoa(p)) - } + pidStrs = pidsToStrings(stillAlive) fmt.Fprintf(os.Stderr, "nssh: %d survived SIGTERM, sending SIGKILL: %s\n", len(stillAlive), strings.Join(pidStrs, ", ")) if err := killRemotePIDs(target, "KILL", pidStrs); err != nil { fmt.Fprintf(os.Stderr, "nssh: kill -KILL: %v\n", err) @@ -148,6 +141,14 @@ func sweepCmd(args []string) { } } +func pidsToStrings(pids []int) []string { + out := make([]string, len(pids)) + for i, p := range pids { + out[i] = strconv.Itoa(p) + } + return out +} + // listRemoteMoshServers SSHs to target and returns mosh-server processes // owned by the user, sorted oldest-first. Parses `ps`'s portable etime // format ([[DD-]hh:]mm:ss) since not every system's procps supports etimes. From 148702d54aca998f32daa7e699c58905ca3f59df Mon Sep 17 00:00:00 2001 From: Abizer Lokhandwala Date: Mon, 11 May 2026 20:41:03 -0700 Subject: [PATCH 5/5] nssh: render new LogEvent fields in `status --tail` The session-recovery changes added Host / Joined / Reconnect / Gap / Since to LogEvent but the formatter in status.go was never updated, so the reconnect-gap and replay-since data that was the whole point of the visibility work was silently dropped from tail output. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/nssh/status.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/cmd/nssh/status.go b/cmd/nssh/status.go index 8509b8f..652a2c8 100644 --- a/cmd/nssh/status.go +++ b/cmd/nssh/status.go @@ -324,9 +324,18 @@ func formatEvent(raw, label string) string { if e.Exit != nil { fmt.Fprintf(&sb, " exit=%d", *e.Exit) } + if e.Gap != "" { + fmt.Fprintf(&sb, " gap=%s", e.Gap) + } + if e.Host != "" { + fmt.Fprintf(&sb, " host=%s", e.Host) + } if e.ID != "" { fmt.Fprintf(&sb, " id=%s", e.ID) } + if e.Joined != 0 { + fmt.Fprintf(&sb, " joined=%d", e.Joined) + } if e.Kind != "" { fmt.Fprintf(&sb, " kind=%s", e.Kind) } @@ -339,9 +348,15 @@ func formatEvent(raw, label string) string { if e.Persona != "" { fmt.Fprintf(&sb, " persona=%s", e.Persona) } + if e.Reconnect { + fmt.Fprintf(&sb, " reconnect=%v", e.Reconnect) + } if e.Server != "" { fmt.Fprintf(&sb, " server=%s", e.Server) } + if e.Since != "" { + fmt.Fprintf(&sb, " since=%s", e.Since) + } if e.Size > 0 { fmt.Fprintf(&sb, " size=%d", e.Size) }