From 227ff85b191f95f70ad34a768489a3cf43d44d9c Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 6 Jan 2025 11:34:40 -0800 Subject: [PATCH 01/19] event based anlaytics, starter code --- db/migrations-wstore/000007_events.down.sql | 1 + db/migrations-wstore/000007_events.up.sql | 7 ++ pkg/telemetry/telemetry.go | 71 +++++++++++++++++++++ pkg/util/utilfn/utilfn.go | 23 +++++++ pkg/wcloud/wcloud.go | 63 ++++++++++++++++++ 5 files changed, 165 insertions(+) create mode 100644 db/migrations-wstore/000007_events.down.sql create mode 100644 db/migrations-wstore/000007_events.up.sql diff --git a/db/migrations-wstore/000007_events.down.sql b/db/migrations-wstore/000007_events.down.sql new file mode 100644 index 0000000000..7acba0115a --- /dev/null +++ b/db/migrations-wstore/000007_events.down.sql @@ -0,0 +1 @@ +DROP TABLE db_tevent; diff --git a/db/migrations-wstore/000007_events.up.sql b/db/migrations-wstore/000007_events.up.sql new file mode 100644 index 0000000000..9cfb1caf01 --- /dev/null +++ b/db/migrations-wstore/000007_events.up.sql @@ -0,0 +1,7 @@ +CREATE TABLE db_tevent ( + id int PRIMARY KEY, + ts int NOT NULL, + event varchar(50) NOT NULL, + props json NOT NULL, + uploaded boolean NOT NULL DEFAULT 0 +); \ No newline at end of file diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 4c1c680672..3708f083b7 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -6,6 +6,8 @@ package telemetry import ( "context" "database/sql/driver" + "encoding/json" + "fmt" "log" "time" @@ -20,6 +22,40 @@ import ( const MaxTzNameLen = 50 +type TEvent struct { + Ts int64 `json:"ts" db:"ts"` + Event string `json:"event" db:"event"` + Props map[string]any `json:"-" db:"-"` // Don't scan directly to map + + // DB fields + Id int64 `json:"-" db:"id"` + Uploaded bool `json:"-" db:"uploaded"` + + // For database scanning + RawProps string `json:"-" db:"props"` +} + +func NewTEvent(event string, props map[string]any) *TEvent { + if event == "" { + panic("TEvent.Event cannot be empty") + } + if props == nil { + props = make(map[string]any) + } + return &TEvent{ + Ts: time.Now().UnixMilli(), + Event: event, + Props: props, + } +} + +func (t *TEvent) convertRawJSON() error { + if t.RawProps != "" { + return json.Unmarshal([]byte(t.RawProps), &t.Props) + } + return nil +} + type ActivityType struct { Day string `json:"day"` Uploaded bool `json:"-"` @@ -94,6 +130,41 @@ func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { }() } +func InsertTEvent(ctx context.Context, event *TEvent) error { + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + query := `INSERT INTO db_tevent (ts, event, props) + VALUES (?, ?, ?)` + tx.Exec(query, event.Ts, event.Event, dbutil.QuickJson(event.Props)) + return nil + }) +} + +func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*TEvent, error) { + return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*TEvent, error) { + var rtn []*TEvent + query := `SELECT id, ts, event, props, uploaded FROM db_tevent WHERE uploaded = 0 ORDER BY ts LIMIT ?` + tx.Select(&rtn, query, maxEvents) + for _, event := range rtn { + if err := event.convertRawJSON(); err != nil { + return nil, fmt.Errorf("scan json for event %d: %w", event.Id, err) + } + } + return rtn, nil + }) +} + +func MarkTEventsAsUploaded(ctx context.Context, events []*TEvent) error { + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + ids := make([]int64, 0, len(events)) + for _, event := range events { + ids = append(ids, event.Id) + } + query := `UPDATE db_tevent SET uploaded = 1 WHERE id IN (SELECT value FROM json_each(?))` + tx.Exec(query, dbutil.QuickJson(ids)) + return nil + }) +} + func UpdateActivity(ctx context.Context, update wshrpc.ActivityUpdate) error { now := time.Now() dayStr := daystr.GetCurDayStr() diff --git a/pkg/util/utilfn/utilfn.go b/pkg/util/utilfn/utilfn.go index e2f3265ef1..76ea73a220 100644 --- a/pkg/util/utilfn/utilfn.go +++ b/pkg/util/utilfn/utilfn.go @@ -984,3 +984,26 @@ func FilterValidArch(arch string) (string, error) { } return "", fmt.Errorf("unknown architecture: %s", formatted) } + +func ConvertUUIDv4Tov7(uuidv4 string) (string, error) { + // Parse the UUIDv4 + parts := strings.Split(uuidv4, "-") + if len(parts) != 5 { + return "", fmt.Errorf("invalid UUIDv4 format") + } + + // Section 1 and 2: Fixed timestamp for Jan 1, 2024 + section1 := "01823a80" // High 32 bits of the timestamp + section2 := "0000" // Middle 16 bits of the timestamp + + // Section 3: Version (7) and the last 3 bytes of randomness from UUIDv4 + section3 := "7" + parts[2][1:] // Replace the first nibble with '7' for version + + // Section 4 and 5: Copy from the original UUIDv4 + section4 := parts[3] + section5 := parts[4] + + // Combine sections to form UUIDv7 + uuidv7 := fmt.Sprintf("%s-%s-%s-%s-%s", section1, section2, section3, section4, section5) + return uuidv7, nil +} diff --git a/pkg/wcloud/wcloud.go b/pkg/wcloud/wcloud.go index df9703d674..60624e653f 100644 --- a/pkg/wcloud/wcloud.go +++ b/pkg/wcloud/wcloud.go @@ -43,6 +43,7 @@ const WCloudWebShareUpdateTimeout = 15 * time.Second const MaxUpdatePayloadSize = 1 * (1024 * 1024) const TelemetryUrl = "/telemetry" +const TEventsUrl = "/tevents" const NoTelemetryUrl = "/no-telemetry" const WebShareUpdateUrl = "/auth/web-share-update" @@ -148,6 +149,68 @@ func doRequest(req *http.Request, outputObj interface{}) (*http.Response, error) return resp, nil } +type TDataInputType struct { + ClientId string `json:"clientId"` + TEvents []*telemetry.TEvent `json:"tevents"` +} + +const TEventsBatchSize = 1000 + +// returns (done, error) +func sendTEventsBatch(clientId string) (bool, error) { + ctx, cancelFn := context.WithTimeout(context.Background(), WCloudDefaultTimeout) + defer cancelFn() + events, err := telemetry.GetNonUploadedTEvents(ctx, TEventsBatchSize) + if err != nil { + return true, fmt.Errorf("cannot get events: %v", err) + } + if len(events) == 0 { + return true, nil + } + log.Printf("[wcloud] sending %d tevents\n", len(events)) + input := TDataInputType{ + ClientId: clientId, + TEvents: events, + } + req, err := makeAnonPostReq(ctx, TEventsUrl, input) + if err != nil { + return true, err + } + _, err = doRequest(req, nil) + if err != nil { + return true, err + } + err = telemetry.MarkTEventsAsUploaded(ctx, events) + if err != nil { + return true, fmt.Errorf("error marking activity as uploaded: %v", err) + } + return len(events) < TEventsBatchSize, nil +} + +func SendTEvents(clientId string) error { + if !telemetry.IsTelemetryEnabled() { + log.Printf("telemetry disabled, not sending\n") + return nil + } + numIters := 0 + for { + numIters++ + done, err := sendTEventsBatch(clientId) + if err != nil { + log.Printf("error sending telemetry events: %v\n", err) + break + } + if done { + break + } + if numIters > 10 { + log.Printf("hit 10 iterations, stopping\n") + break + } + } + return nil +} + func SendTelemetry(ctx context.Context, clientId string) error { if !telemetry.IsTelemetryEnabled() { log.Printf("telemetry disabled, not sending\n") From a1bef50aa6fe9433e06342e6a34ab07920450af5 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 27 Jan 2025 15:23:06 -0800 Subject: [PATCH 02/19] rename fn --- pkg/telemetry/telemetry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 80ebbe60f9..a459fb6734 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -35,7 +35,7 @@ type TEvent struct { RawProps string `json:"-" db:"props"` } -func NewTEvent(event string, props map[string]any) *TEvent { +func MakeTEvent(event string, props map[string]any) *TEvent { if event == "" { panic("TEvent.Event cannot be empty") } From 4499221530e4f178b8cdd3283844836ef2c4a12a Mon Sep 17 00:00:00 2001 From: sawka Date: Wed, 29 Jan 2025 14:24:38 -0800 Subject: [PATCH 03/19] update dev names --- Taskfile.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index 160142e472..66c7497d83 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -26,8 +26,8 @@ tasks: - docsite:build:embedded - build:backend env: - WCLOUD_ENDPOINT: "https://ot2e112zx5.execute-api.us-west-2.amazonaws.com/dev" - WCLOUD_WS_ENDPOINT: "wss://5lfzlg5crl.execute-api.us-west-2.amazonaws.com/dev/" + WCLOUD_ENDPOINT: "https://api-dev.wavterm.dev" + WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.wavterm.dev" electron:start: desc: Run the Electron application directly. @@ -39,8 +39,8 @@ tasks: - docsite:build:embedded - build:backend env: - WCLOUD_ENDPOINT: "https://ot2e112zx5.execute-api.us-west-2.amazonaws.com/dev" - WCLOUD_WS_ENDPOINT: "wss://5lfzlg5crl.execute-api.us-west-2.amazonaws.com/dev/" + WCLOUD_ENDPOINT: "https://api-dev.wavterm.dev" + WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.wavterm.dev" storybook: desc: Start the Storybook server. From 663016308013b7eb51c7897da9b1fcdec9e40b0e Mon Sep 17 00:00:00 2001 From: sawka Date: Wed, 29 Jan 2025 15:50:17 -0800 Subject: [PATCH 04/19] spell waveterm correctly --- Taskfile.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index 66c7497d83..f3bab623e4 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -26,8 +26,8 @@ tasks: - docsite:build:embedded - build:backend env: - WCLOUD_ENDPOINT: "https://api-dev.wavterm.dev" - WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.wavterm.dev" + WCLOUD_ENDPOINT: "https://api-dev.waveterm.dev/central" + WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.waveterm.dev/" electron:start: desc: Run the Electron application directly. @@ -39,8 +39,8 @@ tasks: - docsite:build:embedded - build:backend env: - WCLOUD_ENDPOINT: "https://api-dev.wavterm.dev" - WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.wavterm.dev" + WCLOUD_ENDPOINT: "https://api-dev.waveterm.dev" + WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.waveterm.dev" storybook: desc: Start the Storybook server. From f5f21c154e2cc2c733c766bd4f46959a94534915 Mon Sep 17 00:00:00 2001 From: sawka Date: Wed, 29 Jan 2025 16:00:09 -0800 Subject: [PATCH 05/19] add geolocation data to telemetry --- docs/docs/telemetry.mdx | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/docs/telemetry.mdx b/docs/docs/telemetry.mdx index 6715ca01c9..9b33c4ba7c 100644 --- a/docs/docs/telemetry.mdx +++ b/docs/docs/telemetry.mdx @@ -96,6 +96,15 @@ Lastly, some data is sent along with the telemetry that describes how to classif | AutoUpdateChannel | The type of auto update in use. This specifically refers to whether a latest or beta channel is selected. | | CurDay | The current day (in your time zone) when telemetry is sent. It does not include the time of day. | +## Geo Data + +We do not store IP addresses in our telemetry table. However, CloudFlare passes us Geo-Location headers. We store these two header values: + +| Name | Description | +| ------------ | ----------------------------------------------------------------- | +| CFCountry | 2-letter country code (e.g. "US", "FR", or "JP") | +| CFRegionCode | region code (often a provence, region, or state within a country) | + --- ## When Telemetry is Turned Off From 9745c70acc1cff73b02e313f4aa27d000697ccb7 Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 30 Jan 2025 15:03:10 -0800 Subject: [PATCH 06/19] dont need 'wsh token' to send analytics --- cmd/wsh/cmd/wshcmd-token.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/wsh/cmd/wshcmd-token.go b/cmd/wsh/cmd/wshcmd-token.go index 47b2381c80..2660c12507 100644 --- a/cmd/wsh/cmd/wshcmd-token.go +++ b/cmd/wsh/cmd/wshcmd-token.go @@ -22,9 +22,6 @@ func init() { } func tokenCmdRun(cmd *cobra.Command, args []string) (rtnErr error) { - defer func() { - sendActivity("token", rtnErr == nil) - }() if len(args) != 2 { OutputHelpMessage(cmd) return fmt.Errorf("wsh token requires exactly 2 arguments, got %d", len(args)) From e2fb4c40c17da43193b9b90357d3468de60cdd23 Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 30 Jan 2025 15:03:42 -0800 Subject: [PATCH 07/19] update for tevent, use PT wall-clock time --- pkg/telemetry/telemetry.go | 44 ++++++++++++++++++++++++++++++++------ pkg/util/utilfn/utilfn.go | 16 ++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index a459fb6734..4c2ca47af6 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -14,6 +14,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/util/daystr" "github.com/wavetermdev/waveterm/pkg/util/dbutil" + "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/wconfig" "github.com/wavetermdev/waveterm/pkg/wshrpc" @@ -23,9 +24,10 @@ import ( const MaxTzNameLen = 50 type TEvent struct { - Ts int64 `json:"ts" db:"ts"` - Event string `json:"event" db:"event"` - Props map[string]any `json:"-" db:"-"` // Don't scan directly to map + Ts int64 `json:"ts" db:"ts"` + TsLocal string `json:"tslocal" db:"-"` // iso8601 format (wall clock converted to PT) + Event string `json:"event" db:"event"` + Props map[string]any `json:"props" db:"-"` // Don't scan directly to map // DB fields Id int64 `json:"-" db:"id"` @@ -42,11 +44,41 @@ func MakeTEvent(event string, props map[string]any) *TEvent { if props == nil { props = make(map[string]any) } + now := time.Now() + localTime := utilfn.ConvertToWallClockPT(now) return &TEvent{ - Ts: time.Now().UnixMilli(), - Event: event, - Props: props, + Ts: now.UnixMilli(), + TsLocal: localTime.Format(time.RFC3339), + Event: event, + Props: props, + } +} + +func (t *TEvent) SetUser(key string, value any) { + if t.Props == nil { + t.Props = make(map[string]any) + } + if t.Props["$set"] == nil { + t.Props["$set"] = make(map[string]any) + } + t.Props["$set"].(map[string]any)[key] = value +} + +func (t *TEvent) SetUserOnce(key string, value any) { + if t.Props == nil { + t.Props = make(map[string]any) + } + if t.Props["$set_once"] == nil { + t.Props["$set_once"] = make(map[string]any) + } + t.Props["$set_once"].(map[string]any)[key] = value +} + +func (t *TEvent) Set(key string, value any) { + if t.Props == nil { + t.Props = make(map[string]any) } + t.Props[key] = value } func (t *TEvent) convertRawJSON() error { diff --git a/pkg/util/utilfn/utilfn.go b/pkg/util/utilfn/utilfn.go index 1452c2c12b..a3a953afba 100644 --- a/pkg/util/utilfn/utilfn.go +++ b/pkg/util/utilfn/utilfn.go @@ -31,6 +31,15 @@ import ( ) var HexDigits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'} +var PTLoc *time.Location + +func init() { + loc, err := time.LoadLocation("America/Los_Angeles") + if err != nil { + loc = time.FixedZone("PT", -8*60*60) + } + PTLoc = loc +} func GetStrArr(v interface{}, field string) []string { if v == nil { @@ -971,3 +980,10 @@ func DumpGoRoutineStacks() { n := runtime.Stack(buf, true) os.Stdout.Write(buf[:n]) } + +func ConvertToWallClockPT(t time.Time) time.Time { + year, month, day := t.Date() + hour, min, sec := t.Clock() + pstTime := time.Date(year, month, day, hour, min, sec, 0, PTLoc) + return pstTime +} From 0ca931ca4d30a96e274cfbf23b893310cec6a50b Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 30 Jan 2025 16:34:05 -0800 Subject: [PATCH 08/19] working on validation + record tevent func --- ROADMAP.md | 2 +- pkg/telemetry/telemetry.go | 138 +++++++++++++--------------- pkg/util/dbutil/dbutil.go | 61 +++++++++---- pkg/util/utilfn/utilfn.go | 27 ++++++ pkg/wcloud/wcloud.go | 7 +- pkg/wshrpc/tevent.go | 144 ++++++++++++++++++++++++++++++ pkg/wshrpc/wshrpctypes.go | 1 + pkg/wshrpc/wshserver/wshserver.go | 4 + 8 files changed, 290 insertions(+), 94 deletions(-) create mode 100644 pkg/wshrpc/tevent.go diff --git a/ROADMAP.md b/ROADMAP.md index ec0a438085..a4fab2951a 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -38,7 +38,7 @@ Targeting 1/31/25 ## v0.12 -Targeting mid-February (more will get added before work on v0.12 kicks off) +Targeting mid-February. - 🔷 Import/Export Tab Layouts and Widgets - 🔷 log viewer diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 4c2ca47af6..1632ac2db4 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -6,7 +6,6 @@ package telemetry import ( "context" "database/sql/driver" - "encoding/json" "fmt" "log" "time" @@ -22,71 +21,7 @@ import ( ) const MaxTzNameLen = 50 - -type TEvent struct { - Ts int64 `json:"ts" db:"ts"` - TsLocal string `json:"tslocal" db:"-"` // iso8601 format (wall clock converted to PT) - Event string `json:"event" db:"event"` - Props map[string]any `json:"props" db:"-"` // Don't scan directly to map - - // DB fields - Id int64 `json:"-" db:"id"` - Uploaded bool `json:"-" db:"uploaded"` - - // For database scanning - RawProps string `json:"-" db:"props"` -} - -func MakeTEvent(event string, props map[string]any) *TEvent { - if event == "" { - panic("TEvent.Event cannot be empty") - } - if props == nil { - props = make(map[string]any) - } - now := time.Now() - localTime := utilfn.ConvertToWallClockPT(now) - return &TEvent{ - Ts: now.UnixMilli(), - TsLocal: localTime.Format(time.RFC3339), - Event: event, - Props: props, - } -} - -func (t *TEvent) SetUser(key string, value any) { - if t.Props == nil { - t.Props = make(map[string]any) - } - if t.Props["$set"] == nil { - t.Props["$set"] = make(map[string]any) - } - t.Props["$set"].(map[string]any)[key] = value -} - -func (t *TEvent) SetUserOnce(key string, value any) { - if t.Props == nil { - t.Props = make(map[string]any) - } - if t.Props["$set_once"] == nil { - t.Props["$set_once"] = make(map[string]any) - } - t.Props["$set_once"].(map[string]any)[key] = value -} - -func (t *TEvent) Set(key string, value any) { - if t.Props == nil { - t.Props = make(map[string]any) - } - t.Props[key] = value -} - -func (t *TEvent) convertRawJSON() error { - if t.RawProps != "" { - return json.Unmarshal([]byte(t.RawProps), &t.Props) - } - return nil -} +const ActivityEventName = "activity" type ActivityType struct { Day string `json:"day"` @@ -162,7 +97,7 @@ func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { }() } -func InsertTEvent(ctx context.Context, event *TEvent) error { +func InsertTEvent(ctx context.Context, event *wshrpc.TEvent) error { return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { query := `INSERT INTO db_tevent (ts, event, props) VALUES (?, ?, ?)` @@ -171,13 +106,68 @@ func InsertTEvent(ctx context.Context, event *TEvent) error { }) } -func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*TEvent, error) { - return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*TEvent, error) { - var rtn []*TEvent - query := `SELECT id, ts, event, props, uploaded FROM db_tevent WHERE uploaded = 0 ORDER BY ts LIMIT ?` - tx.Select(&rtn, query, maxEvents) +// merges newActivity into curActivity, returns curActivity +func mergeActivity(curActivity map[string]any, newActivity map[string]any) map[string]any { + if curActivity == nil { + curActivity = make(map[string]any) + } + for key, val := range newActivity { + newVal := utilfn.ConvertInt(val) + curVal := utilfn.ConvertInt(curActivity[key]) + curActivity[key] = curVal + newVal + } + return curActivity +} + +// ignores the timestamp in tevent, and uses the current time +func UpdateActivityTEvent(ctx context.Context, tevent *wshrpc.TEvent) error { + eventTs := time.Now() + // compute to hour boundary, and round up to next hour + eventTs = eventTs.Truncate(time.Hour).Add(time.Hour) + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + // find event that matches this timestamp with event name "activity" + var hasRow bool + curActivity := make(map[string]any) + rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) + if rawProps != "" { + hasRow = true + curActivity = dbutil.ParseJsonMap(rawProps, true) + } + curActivity = mergeActivity(curActivity, tevent.Props) + if hasRow { + query := `UPDATE db_tevent SET props = ? WHERE ts = ? AND event = ?` + tx.Exec(query, dbutil.QuickJson(curActivity), eventTs.UnixMilli(), ActivityEventName) + } else { + query := `INSERT INTO db_tevent (ts, event, props) VALUES (?, ?, ?)` + tx.Exec(query, eventTs.UnixMilli(), ActivityEventName, dbutil.QuickJson(curActivity)) + } + return nil + }) +} + +func RecordTEvent(ctx context.Context, tevent *wshrpc.TEvent) error { + if tevent == nil { + return nil + } + err := tevent.ValidateCurrentTEvent() + if err != nil { + return err + } + tevent.EnsureTimestamps() + if tevent.Event == ActivityEventName { + return UpdateActivityTEvent(ctx, tevent) + } + return InsertTEvent(ctx, tevent) +} + +func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*wshrpc.TEvent, error) { + now := time.Now() + return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*wshrpc.TEvent, error) { + var rtn []*wshrpc.TEvent + query := `SELECT id, ts, event, props, uploaded FROM db_tevent WHERE uploaded = 0 AND ts <= ? ORDER BY ts LIMIT ?` + tx.Select(&rtn, query, now.UnixMilli(), maxEvents) for _, event := range rtn { - if err := event.convertRawJSON(); err != nil { + if err := event.ConvertRawJSON(); err != nil { return nil, fmt.Errorf("scan json for event %d: %w", event.Id, err) } } @@ -185,7 +175,7 @@ func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*TEvent, error }) } -func MarkTEventsAsUploaded(ctx context.Context, events []*TEvent) error { +func MarkTEventsAsUploaded(ctx context.Context, events []*wshrpc.TEvent) error { return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { ids := make([]int64, 0, len(events)) for _, event := range events { diff --git a/pkg/util/dbutil/dbutil.go b/pkg/util/dbutil/dbutil.go index cc75416dca..a13a435472 100644 --- a/pkg/util/dbutil/dbutil.go +++ b/pkg/util/dbutil/dbutil.go @@ -11,7 +11,7 @@ import ( "strconv" ) -func QuickSetStr(strVal *string, m map[string]interface{}, name string) { +func QuickSetStr(strVal *string, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -28,7 +28,7 @@ func QuickSetStr(strVal *string, m map[string]interface{}, name string) { *strVal = str } -func QuickSetInt(ival *int, m map[string]interface{}, name string) { +func QuickSetInt(ival *int, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -64,7 +64,7 @@ func QuickSetNullableInt64(ival **int64, m map[string]any, name string) { } } -func QuickSetInt64(ival *int64, m map[string]interface{}, name string) { +func QuickSetInt64(ival *int64, m map[string]any, name string) { v, ok := m[name] if !ok { // leave as zero @@ -82,7 +82,7 @@ func QuickSetInt64(ival *int64, m map[string]interface{}, name string) { } } -func QuickSetBool(bval *bool, m map[string]interface{}, name string) { +func QuickSetBool(bval *bool, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -100,7 +100,7 @@ func QuickSetBool(bval *bool, m map[string]interface{}, name string) { } } -func QuickSetBytes(bval *[]byte, m map[string]interface{}, name string) { +func QuickSetBytes(bval *[]byte, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -130,7 +130,7 @@ func getByteArr(m map[string]any, name string, def string) ([]byte, bool) { return barr, true } -func QuickSetJson(ptr interface{}, m map[string]interface{}, name string) { +func QuickSetJson(ptr any, m map[string]any, name string) { barr, ok := getByteArr(m, name, "{}") if !ok { return @@ -138,7 +138,7 @@ func QuickSetJson(ptr interface{}, m map[string]interface{}, name string) { json.Unmarshal(barr, ptr) } -func QuickSetNullableJson(ptr interface{}, m map[string]interface{}, name string) { +func QuickSetNullableJson(ptr any, m map[string]any, name string) { barr, ok := getByteArr(m, name, "null") if !ok { return @@ -146,7 +146,7 @@ func QuickSetNullableJson(ptr interface{}, m map[string]interface{}, name string json.Unmarshal(barr, ptr) } -func QuickSetJsonArr(ptr interface{}, m map[string]interface{}, name string) { +func QuickSetJsonArr(ptr any, m map[string]any, name string) { barr, ok := getByteArr(m, name, "[]") if !ok { return @@ -154,7 +154,7 @@ func QuickSetJsonArr(ptr interface{}, m map[string]interface{}, name string) { json.Unmarshal(barr, ptr) } -func CheckNil(v interface{}) bool { +func CheckNil(v any) bool { rv := reflect.ValueOf(v) if !rv.IsValid() { return true @@ -168,7 +168,7 @@ func CheckNil(v interface{}) bool { } } -func QuickNullableJson(v interface{}) string { +func QuickNullableJson(v any) string { if CheckNil(v) { return "null" } @@ -176,7 +176,7 @@ func QuickNullableJson(v interface{}) string { return string(barr) } -func QuickJson(v interface{}) string { +func QuickJson(v any) string { if CheckNil(v) { return "{}" } @@ -184,7 +184,7 @@ func QuickJson(v interface{}) string { return string(barr) } -func QuickJsonBytes(v interface{}) []byte { +func QuickJsonBytes(v any) []byte { if CheckNil(v) { return []byte("{}") } @@ -192,7 +192,7 @@ func QuickJsonBytes(v interface{}) []byte { return barr } -func QuickJsonArr(v interface{}) string { +func QuickJsonArr(v any) string { if CheckNil(v) { return "[]" } @@ -200,7 +200,7 @@ func QuickJsonArr(v interface{}) string { return string(barr) } -func QuickJsonArrBytes(v interface{}) []byte { +func QuickJsonArrBytes(v any) []byte { if CheckNil(v) { return []byte("[]") } @@ -208,7 +208,7 @@ func QuickJsonArrBytes(v interface{}) []byte { return barr } -func QuickScanJson(ptr interface{}, val interface{}) error { +func QuickScanJson(ptr any, val any) error { barrVal, ok := val.([]byte) if !ok { strVal, ok := val.(string) @@ -223,7 +223,7 @@ func QuickScanJson(ptr interface{}, val interface{}) error { return json.Unmarshal(barrVal, ptr) } -func QuickValueJson(v interface{}) (driver.Value, error) { +func QuickValueJson(v any) (driver.Value, error) { if CheckNil(v) { return "{}", nil } @@ -233,3 +233,32 @@ func QuickValueJson(v interface{}) (driver.Value, error) { } return string(barr), nil } + +// on error will return nil unless forceMake is set, in which case it returns make(map[string]any) +func ParseJsonMap(val string, forceMake bool) map[string]any { + var noRtn map[string]any + if forceMake { + noRtn = make(map[string]any) + } + if val == "" { + return noRtn + } + var m map[string]any + err := json.Unmarshal([]byte(val), &m) + if err != nil { + return noRtn + } +return m +} + +func ParseJsonArr[T any](val string) []T { + if val == "" { + return nil + } + var arr []T + err := json.Unmarshal([]byte(val), &arr) + if err != nil { + return nil + } + return arr +} diff --git a/pkg/util/utilfn/utilfn.go b/pkg/util/utilfn/utilfn.go index a3a953afba..6aec3943e3 100644 --- a/pkg/util/utilfn/utilfn.go +++ b/pkg/util/utilfn/utilfn.go @@ -85,6 +85,33 @@ func GetBool(v interface{}, field string) bool { return bval } +// converts an int or int64 to an int64 +// nil or bad type returns 0 +func ConvertInt(val any) int64 { + if val == 0 { + return 0 + } + switch typedVal := val.(type) { + case int: + return int64(typedVal) + case int64: + return typedVal + default: + return 0 + } +} + +func ConvertMap(val any) map[string]any { + if val == nil { + return nil + } + m, ok := val.(map[string]any) + if !ok { + return nil + } + return m +} + var needsQuoteRe = regexp.MustCompile(`[^\w@%:,./=+-]`) // minimum maxlen=6, pass -1 for no max length diff --git a/pkg/wcloud/wcloud.go b/pkg/wcloud/wcloud.go index d3feefde65..821de10b94 100644 --- a/pkg/wcloud/wcloud.go +++ b/pkg/wcloud/wcloud.go @@ -20,6 +20,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/telemetry" "github.com/wavetermdev/waveterm/pkg/util/daystr" "github.com/wavetermdev/waveterm/pkg/wavebase" + "github.com/wavetermdev/waveterm/pkg/wshrpc" ) const WCloudEndpoint = "https://api.waveterm.dev/central" @@ -150,11 +151,11 @@ func doRequest(req *http.Request, outputObj interface{}) (*http.Response, error) } type TDataInputType struct { - ClientId string `json:"clientId"` - TEvents []*telemetry.TEvent `json:"tevents"` + ClientId string `json:"clientId"` + TEvents []*wshrpc.TEvent `json:"tevents"` } -const TEventsBatchSize = 1000 +const TEventsBatchSize = 100 // returns (done, error) func sendTEventsBatch(clientId string) (bool, error) { diff --git a/pkg/wshrpc/tevent.go b/pkg/wshrpc/tevent.go new file mode 100644 index 0000000000..c02d6f1bbf --- /dev/null +++ b/pkg/wshrpc/tevent.go @@ -0,0 +1,144 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshrpc + +import ( + "encoding/json" + "fmt" + "regexp" + "time" + + "github.com/wavetermdev/waveterm/pkg/util/utilfn" +) + +type TEvent struct { + Ts int64 `json:"ts" db:"ts"` + TsLocal string `json:"tslocal" db:"-"` // iso8601 format (wall clock converted to PT) + Event string `json:"event" db:"event"` + Props map[string]any `json:"props" db:"-"` // Don't scan directly to map + + // DB fields + Id int64 `json:"-" db:"id"` + Uploaded bool `json:"-" db:"uploaded"` + + // For database scanning + RawProps string `json:"-" db:"props"` +} + +var phNestedProps = []string{"$set", "$set_once", "$add", "$unset"} +var eventNameRe = regexp.MustCompile(`^[a-zA-Z0-9.:_/-]+$`) +var propNameRe = regexp.MustCompile(`^[a-zA-Z0-9.:_/$-]+$`) + +// validates a tevent that was just created (not for validating out of the DB, or an uploaded TEvent) +// checks that TS is pretty current (or unset) +func (te *TEvent) ValidateCurrentTEvent() error { + if te == nil { + return fmt.Errorf("TEvent cannot be nil") + } + if te.Event == "" { + return fmt.Errorf("TEvent.Event cannot be empty") + } + if !eventNameRe.MatchString(te.Event) { + return fmt.Errorf("TEvent.Event invalid: %q", te.Event) + } + if te.Ts != 0 { + now := time.Now().UnixMilli() + if te.Ts > now+60000 || te.Ts < now-60000 { + return fmt.Errorf("TEvent.Ts is not current: %d", te.Ts) + } + } + err := validatePropNames(te.Props, true) + if err != nil { + return fmt.Errorf("TEvent.Props: %v", err) + } + barr, err := json.Marshal(te.Props) + if err != nil { + return fmt.Errorf("TEvent.Props JSON error: %v", err) + } + if len(barr) > 20000 { + return fmt.Errorf("TEvent.Props too large: %d", len(barr)) + } + return nil +} + +func validatePropNames(props map[string]any, topLevel bool) error { + if props == nil { + return nil + } + for k := range props { + if !propNameRe.MatchString(k) { + return fmt.Errorf("TEvent.Props key invalid: %q", k) + } + } + if !topLevel { + return nil + } + for _, k := range phNestedProps { + nestedMap := utilfn.ConvertMap(props[k]) + err := validatePropNames(nestedMap, false) + if err != nil { + return fmt.Errorf("%v in nestedMap %s", err, k) + } + } + return nil +} + +func MakeTEvent(event string, props map[string]any) *TEvent { + if event == "" { + panic("TEvent.Event cannot be empty") + } + if props == nil { + props = make(map[string]any) + } + now := time.Now() + localTime := utilfn.ConvertToWallClockPT(now) + return &TEvent{ + Ts: now.UnixMilli(), + TsLocal: localTime.Format(time.RFC3339), + Event: event, + Props: props, + } +} + +func (t *TEvent) EnsureTimestamps() { + if t.Ts == 0 { + t.Ts = time.Now().UnixMilli() + } + gtime := time.UnixMilli(t.Ts) + t.TsLocal = utilfn.ConvertToWallClockPT(gtime).Format(time.RFC3339) +} + +func (t *TEvent) SetUser(key string, value any) { + if t.Props == nil { + t.Props = make(map[string]any) + } + if t.Props["$set"] == nil { + t.Props["$set"] = make(map[string]any) + } + t.Props["$set"].(map[string]any)[key] = value +} + +func (t *TEvent) SetUserOnce(key string, value any) { + if t.Props == nil { + t.Props = make(map[string]any) + } + if t.Props["$set_once"] == nil { + t.Props["$set_once"] = make(map[string]any) + } + t.Props["$set_once"].(map[string]any)[key] = value +} + +func (t *TEvent) Set(key string, value any) { + if t.Props == nil { + t.Props = make(map[string]any) + } + t.Props[key] = value +} + +func (t *TEvent) ConvertRawJSON() error { + if t.RawProps != "" { + return json.Unmarshal([]byte(t.RawProps), &t.Props) + } + return nil +} diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 7fc5121dce..4ff4a043d5 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -177,6 +177,7 @@ type WshRpcInterface interface { WaveInfoCommand(ctx context.Context) (*WaveInfoData, error) WshActivityCommand(ct context.Context, data map[string]int) error ActivityCommand(ctx context.Context, data ActivityUpdate) error + RecordTEventCommand(ctx context.Context, data TEvent) error GetVarCommand(ctx context.Context, data CommandVarData) (*CommandVarResponseData, error) SetVarCommand(ctx context.Context, data CommandVarData) error PathCommand(ctx context.Context, data PathCommandData) (string, error) diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index fd575872b9..c52f63ea41 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -734,6 +734,10 @@ func (ws *WshServer) WorkspaceListCommand(ctx context.Context) ([]wshrpc.Workspa return rtn, nil } +func (ws *WshServer) RecordTEventCommand(ctx context.Context, data wshrpc.TEvent) error { + return telemetry.RecordTEvent(ctx, &data) +} + var wshActivityRe = regexp.MustCompile(`^[a-z:#]+$`) func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int) error { From aaacfeb3e715f4c9914eac5b28ea43fb8088032f Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 31 Jan 2025 16:20:06 -0800 Subject: [PATCH 09/19] move TEvent to pkg/telemetry/telemetrydata, lots of bug fixes, trying to get events working --- cmd/generatego/main-generatego.go | 1 + cmd/server/main-server.go | 12 +- cmd/wsh/cmd/wshcmd-debug.go | 13 ++ db/migrations-wstore/000007_events.up.sql | 3 +- emain/emain.ts | 12 ++ frontend/app/store/wshclientapi.ts | 10 ++ frontend/types/gotypes.d.ts | 27 ++++ pkg/telemetry/telemetry.go | 55 +++---- pkg/telemetry/telemetrydata/telemetrydata.go | 151 +++++++++++++++++++ pkg/util/utilfn/streamtolines.go | 2 +- pkg/util/utilfn/utilfn.go | 4 +- pkg/wcloud/wcloud.go | 26 ++-- pkg/wshrpc/tevent.go | 144 ------------------ pkg/wshrpc/wshclient/wshclient.go | 13 ++ pkg/wshrpc/wshrpctypes.go | 4 +- pkg/wshrpc/wshserver/wshserver.go | 12 +- 16 files changed, 302 insertions(+), 187 deletions(-) create mode 100644 pkg/telemetry/telemetrydata/telemetrydata.go delete mode 100644 pkg/wshrpc/tevent.go diff --git a/cmd/generatego/main-generatego.go b/cmd/generatego/main-generatego.go index c93faa352b..9794b68035 100644 --- a/cmd/generatego/main-generatego.go +++ b/cmd/generatego/main-generatego.go @@ -24,6 +24,7 @@ func GenerateWshClient() error { fmt.Fprintf(os.Stderr, "generating wshclient file to %s\n", WshClientFileName) var buf strings.Builder gogen.GenerateBoilerplate(&buf, "wshclient", []string{ + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata", "github.com/wavetermdev/waveterm/pkg/wshutil", "github.com/wavetermdev/waveterm/pkg/wshrpc", "github.com/wavetermdev/waveterm/pkg/wconfig", diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index cd0d8640f6..e1eb265dde 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -22,6 +22,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs" "github.com/wavetermdev/waveterm/pkg/service" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/sigutil" "github.com/wavetermdev/waveterm/pkg/wavebase" @@ -113,7 +114,7 @@ func sendTelemetryWrapper() { defer func() { panichandler.PanicHandler("sendTelemetryWrapper", recover()) }() - ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() beforeSendActivityUpdate(ctx) client, err := wstore.DBGetSingleton[*waveobj.Client](ctx) @@ -121,7 +122,7 @@ func sendTelemetryWrapper() { log.Printf("[error] getting client data for telemetry: %v\n", err) return } - err = wcloud.SendTelemetry(ctx, client.OID) + err = wcloud.SendAllTelemetry(ctx, client.OID) if err != nil { log.Printf("[error] sending telemetry: %v\n", err) } @@ -150,6 +151,13 @@ func startupActivityUpdate() { if err != nil { log.Printf("error updating startup activity: %v\n", err) } + tevent := telemetrydata.MakeTEvent("startup", telemetrydata.TEventProps{ + ClientVersion: "v" + WaveVersion, + }) + err = telemetry.RecordTEvent(ctx, tevent) + if err != nil { + log.Printf("error recording startup event: %v\n", err) + } } func shutdownActivityUpdate() { diff --git a/cmd/wsh/cmd/wshcmd-debug.go b/cmd/wsh/cmd/wshcmd-debug.go index 29a1857689..9efac0ff87 100644 --- a/cmd/wsh/cmd/wshcmd-debug.go +++ b/cmd/wsh/cmd/wshcmd-debug.go @@ -24,11 +24,24 @@ var debugBlockIdsCmd = &cobra.Command{ Hidden: true, } +var debugSendTelemetryCmd = &cobra.Command{ + Use: "send-telemetry", + Short: "send telemetry", + RunE: debugSendTelemetryRun, + Hidden: true, +} + func init() { debugCmd.AddCommand(debugBlockIdsCmd) + debugCmd.AddCommand(debugSendTelemetryCmd) rootCmd.AddCommand(debugCmd) } +func debugSendTelemetryRun(cmd *cobra.Command, args []string) error { + err := wshclient.SendTelemetryCommand(RpcClient, nil) + return err +} + func debugBlockIdsRun(cmd *cobra.Command, args []string) error { oref, err := resolveBlockArg() if err != nil { diff --git a/db/migrations-wstore/000007_events.up.sql b/db/migrations-wstore/000007_events.up.sql index 9cfb1caf01..35e059a24e 100644 --- a/db/migrations-wstore/000007_events.up.sql +++ b/db/migrations-wstore/000007_events.up.sql @@ -1,6 +1,7 @@ CREATE TABLE db_tevent ( - id int PRIMARY KEY, + id INTEGER PRIMARY KEY, ts int NOT NULL, + tslocal varchar(100) NOT NULL, event varchar(50) NOT NULL, props json NOT NULL, uploaded boolean NOT NULL DEFAULT 0 diff --git a/emain/emain.ts b/emain/emain.ts index 41ea190c6b..1308aa2ab0 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -472,6 +472,18 @@ function logActiveState() { activity.displays = getActivityDisplays(); try { await RpcApi.ActivityCommand(ElectronWshClient, activity, { noresponse: true }); + await RpcApi.RecordTEventCommand( + ElectronWshClient, + { + event: "activity", + props: { + "activity:activeminutes": activity.activeminutes, + "activity:fgminutes": activity.fgminutes, + "activity:openminutes": activity.openminutes, + }, + }, + { noresponse: true } + ); } catch (e) { console.log("error logging active state", e); } finally { diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts index 2822ac298a..1ab611c5df 100644 --- a/frontend/app/store/wshclientapi.ts +++ b/frontend/app/store/wshclientapi.ts @@ -252,6 +252,11 @@ class RpcApiType { return client.wshRpcCall("path", data, opts); } + // command "recordtevent" [call] + RecordTEventCommand(client: WshClient, data: TEvent, opts?: RpcOpts): Promise { + return client.wshRpcCall("recordtevent", data, opts); + } + // command "remotefilecopy" [call] RemoteFileCopyCommand(client: WshClient, data: CommandRemoteFileCopyData, opts?: RpcOpts): Promise { return client.wshRpcCall("remotefilecopy", data, opts); @@ -337,6 +342,11 @@ class RpcApiType { return client.wshRpcCall("routeunannounce", null, opts); } + // command "sendtelemetry" [call] + SendTelemetryCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("sendtelemetry", null, opts); + } + // command "setconfig" [call] SetConfigCommand(client: WshClient, data: SettingsType, opts?: RpcOpts): Promise { return client.wshRpcCall("setconfig", data, opts); diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 41423efb28..e2b7a7871b 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -745,6 +745,33 @@ declare global { allscopes?: boolean; }; + // telemetrydata.TEvent + type TEvent = { + ts?: number; + tslocal?: string; + event: string; + props: TEventProps; + }; + + // telemetrydata.TEventProps + type TEventProps = { + "activity:activeminutes"?: number; + "activity:fgminutes"?: number; + "activity:openminutes"?: number; + "client:version"?: string; + $set?: TEventUserProps; + $set_once?: TEventUserProps; + }; + + // telemetrydata.TEventUserProps + type TEventUserProps = { + "client:arch"?: string; + "client:version"?: string; + "client:initial_version"?: string; + "client:buildtime"?: string; + "client:osrelease"?: string; + }; + // waveobj.Tab type Tab = WaveObj & { name: string; diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 1632ac2db4..a2a2ed3a79 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -6,11 +6,13 @@ package telemetry import ( "context" "database/sql/driver" + "encoding/json" "fmt" "log" "time" "github.com/wavetermdev/waveterm/pkg/panichandler" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/daystr" "github.com/wavetermdev/waveterm/pkg/util/dbutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -97,59 +99,58 @@ func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { }() } -func InsertTEvent(ctx context.Context, event *wshrpc.TEvent) error { +func InsertTEvent(ctx context.Context, event *telemetrydata.TEvent) error { return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { - query := `INSERT INTO db_tevent (ts, event, props) - VALUES (?, ?, ?)` - tx.Exec(query, event.Ts, event.Event, dbutil.QuickJson(event.Props)) + query := `INSERT INTO db_tevent (ts, tslocal, event, props) + VALUES (?, ?, ?, ?)` + tx.Exec(query, event.Ts, event.TsLocal, event.Event, dbutil.QuickJson(event.Props)) return nil }) } // merges newActivity into curActivity, returns curActivity -func mergeActivity(curActivity map[string]any, newActivity map[string]any) map[string]any { - if curActivity == nil { - curActivity = make(map[string]any) - } - for key, val := range newActivity { - newVal := utilfn.ConvertInt(val) - curVal := utilfn.ConvertInt(curActivity[key]) - curActivity[key] = curVal + newVal - } - return curActivity +func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetrydata.TEventProps) { + curActivity.ActiveMinutes += newActivity.ActiveMinutes + curActivity.FgMinutes += newActivity.FgMinutes + curActivity.OpenMinutes += newActivity.OpenMinutes } // ignores the timestamp in tevent, and uses the current time -func UpdateActivityTEvent(ctx context.Context, tevent *wshrpc.TEvent) error { +func UpdateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { eventTs := time.Now() // compute to hour boundary, and round up to next hour eventTs = eventTs.Truncate(time.Hour).Add(time.Hour) return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { // find event that matches this timestamp with event name "activity" var hasRow bool - curActivity := make(map[string]any) + var curActivity telemetrydata.TEventProps rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) if rawProps != "" { hasRow = true - curActivity = dbutil.ParseJsonMap(rawProps, true) + err := json.Unmarshal([]byte(rawProps), &curActivity) + if err != nil { + // ignore, curActivity will just be 0 + log.Printf("error unmarshalling activity props: %v\n", err) + } } - curActivity = mergeActivity(curActivity, tevent.Props) + mergeActivity(&curActivity, tevent.Props) if hasRow { query := `UPDATE db_tevent SET props = ? WHERE ts = ? AND event = ?` tx.Exec(query, dbutil.QuickJson(curActivity), eventTs.UnixMilli(), ActivityEventName) } else { - query := `INSERT INTO db_tevent (ts, event, props) VALUES (?, ?, ?)` - tx.Exec(query, eventTs.UnixMilli(), ActivityEventName, dbutil.QuickJson(curActivity)) + query := `INSERT INTO db_tevent (ts, tslocal, event, props) VALUES (?, ?, ?, ?)` + tsLocal := utilfn.ConvertToWallClockPT(eventTs).Format(time.RFC3339) + tx.Exec(query, eventTs.UnixMilli(), tsLocal, ActivityEventName, dbutil.QuickJson(curActivity)) } return nil }) } -func RecordTEvent(ctx context.Context, tevent *wshrpc.TEvent) error { +func RecordTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { if tevent == nil { return nil } - err := tevent.ValidateCurrentTEvent() + err := tevent.Validate(true) if err != nil { return err } @@ -160,11 +161,11 @@ func RecordTEvent(ctx context.Context, tevent *wshrpc.TEvent) error { return InsertTEvent(ctx, tevent) } -func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*wshrpc.TEvent, error) { +func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*telemetrydata.TEvent, error) { now := time.Now() - return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*wshrpc.TEvent, error) { - var rtn []*wshrpc.TEvent - query := `SELECT id, ts, event, props, uploaded FROM db_tevent WHERE uploaded = 0 AND ts <= ? ORDER BY ts LIMIT ?` + return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*telemetrydata.TEvent, error) { + var rtn []*telemetrydata.TEvent + query := `SELECT id, ts, tslocal, event, props, uploaded FROM db_tevent WHERE uploaded = 0 AND ts <= ? ORDER BY ts LIMIT ?` tx.Select(&rtn, query, now.UnixMilli(), maxEvents) for _, event := range rtn { if err := event.ConvertRawJSON(); err != nil { @@ -175,7 +176,7 @@ func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*wshrpc.TEvent }) } -func MarkTEventsAsUploaded(ctx context.Context, events []*wshrpc.TEvent) error { +func MarkTEventsAsUploaded(ctx context.Context, events []*telemetrydata.TEvent) error { return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { ids := make([]int64, 0, len(events)) for _, event := range events { diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go new file mode 100644 index 0000000000..203fb1b251 --- /dev/null +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -0,0 +1,151 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package telemetrydata + +import ( + "encoding/json" + "fmt" + "regexp" + "time" + + "github.com/wavetermdev/waveterm/pkg/util/utilfn" +) + +var ValidEventNames = map[string]bool{ + "app:startup": true, + "app:shutdown": true, + "app:activity": true, +} + +type TEvent struct { + Ts int64 `json:"ts,omitempty" db:"ts"` + TsLocal string `json:"tslocal,omitempty" db:"tslocal"` // iso8601 format (wall clock converted to PT) + Event string `json:"event" db:"event"` + Props TEventProps `json:"props" db:"-"` // Don't scan directly to map + + // DB fields + Id int64 `json:"-" db:"id"` + Uploaded bool `json:"-" db:"uploaded"` + + // For database scanning + RawProps string `json:"-" db:"props"` +} + +type TEventUserProps struct { + ClientArch string `json:"client:arch,omitempty"` + ClientVersion string `json:"client:version,omitempty"` + ClientInitialVersion string `json:"client:initial_version,omitempty"` + ClientBuildTime string `json:"client:buildtime,omitempty"` + ClientOSRelease string `json:"client:osrelease,omitempty"` +} + +type TEventProps struct { + ActiveMinutes int `json:"activity:activeminutes,omitempty"` + FgMinutes int `json:"activity:fgminutes,omitempty"` + OpenMinutes int `json:"activity:openminutes,omitempty"` + ClientVersion string `json:"client:version,omitempty"` + UserSet *TEventUserProps `json:"$set,omitempty"` + UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` +} + +func MakeTEvent(event string, props TEventProps) *TEvent { + now := time.Now() + // TsLocal gets set in EnsureTimestamps() + return &TEvent{ + Ts: now.UnixMilli(), + Event: event, + Props: props, + } +} + +func MakeUntypedTEvent(event string, propsMap map[string]any) (*TEvent, error) { + if event == "" { + return nil, fmt.Errorf("event name must be non-empty") + } + var props TEventProps + err := utilfn.ReUnmarshal(&props, propsMap) + if err != nil { + return nil, fmt.Errorf("error re-marshalling TEvent props: %w", err) + } + return MakeTEvent(event, props), nil +} + +func (t *TEvent) EnsureTimestamps() { + if t.Ts == 0 { + t.Ts = time.Now().UnixMilli() + } + gtime := time.UnixMilli(t.Ts) + t.TsLocal = utilfn.ConvertToWallClockPT(gtime).Format(time.RFC3339) +} + +func (t *TEvent) UserSetProps() *TEventUserProps { + if t.Props.UserSet == nil { + t.Props.UserSet = &TEventUserProps{} + } + return t.Props.UserSet +} + +func (t *TEvent) UserSetOnceProps() *TEventUserProps { + if t.Props.UserSetOnce == nil { + t.Props.UserSetOnce = &TEventUserProps{} + } + return t.Props.UserSetOnce +} + +func (t *TEvent) ConvertRawJSON() error { + if t.RawProps != "" { + return json.Unmarshal([]byte(t.RawProps), &t.Props) + } + return nil +} + +var eventNameRe = regexp.MustCompile(`^[a-zA-Z0-9.:_/-]+$`) + +// validates a tevent that was just created (not for validating out of the DB, or an uploaded TEvent) +// checks that TS is pretty current (or unset) +func (te *TEvent) Validate(current bool) error { + if te == nil { + return fmt.Errorf("TEvent cannot be nil") + } + if te.Event == "" { + return fmt.Errorf("TEvent.Event cannot be empty") + } + if !eventNameRe.MatchString(te.Event) { + return fmt.Errorf("TEvent.Event invalid: %q", te.Event) + } + if !ValidEventNames[te.Event] { + return fmt.Errorf("TEvent.Event not valid: %q", te.Event) + } + if current { + if te.Ts != 0 { + now := time.Now().UnixMilli() + if te.Ts > now+60000 || te.Ts < now-60000 { + return fmt.Errorf("TEvent.Ts is not current: %d", te.Ts) + } + } + } else { + if te.Ts == 0 { + return fmt.Errorf("TEvent.Ts must be set") + } + if te.TsLocal == "" { + return fmt.Errorf("TEvent.TsLocal must be set") + } + t, err := time.Parse(time.RFC3339, te.TsLocal) + if err != nil { + return fmt.Errorf("TEvent.TsLocal parse error: %v", err) + } + now := time.Now() + if t.Before(now.Add(-30*24*time.Hour)) || t.After(now.Add(2*24*time.Hour)) { + return fmt.Errorf("tslocal out of valid range") + } + } + barr, err := json.Marshal(te.Props) + if err != nil { + return fmt.Errorf("TEvent.Props JSON error: %v", err) + } + if len(barr) > 20000 { + return fmt.Errorf("TEvent.Props too large: %d", len(barr)) + } + return nil +} diff --git a/pkg/util/utilfn/streamtolines.go b/pkg/util/utilfn/streamtolines.go index d9fa3363bc..39f2adcc1e 100644 --- a/pkg/util/utilfn/streamtolines.go +++ b/pkg/util/utilfn/streamtolines.go @@ -58,7 +58,7 @@ func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]by func StreamToLines(input io.Reader, lineFn func([]byte)) error { var lineBuf lineBuf - readBuf := make([]byte, 16*1024) + readBuf := make([]byte, 64*1024) for { n, err := input.Read(readBuf) streamToLines_processBuf(&lineBuf, readBuf[:n], lineFn) diff --git a/pkg/util/utilfn/utilfn.go b/pkg/util/utilfn/utilfn.go index 6aec3943e3..904e8d460c 100644 --- a/pkg/util/utilfn/utilfn.go +++ b/pkg/util/utilfn/utilfn.go @@ -85,7 +85,7 @@ func GetBool(v interface{}, field string) bool { return bval } -// converts an int or int64 to an int64 +// converts an int, int64, or float64 to an int64 // nil or bad type returns 0 func ConvertInt(val any) int64 { if val == 0 { @@ -96,6 +96,8 @@ func ConvertInt(val any) int64 { return int64(typedVal) case int64: return typedVal + case float64: + return int64(typedVal) default: return 0 } diff --git a/pkg/wcloud/wcloud.go b/pkg/wcloud/wcloud.go index 821de10b94..7d161eced3 100644 --- a/pkg/wcloud/wcloud.go +++ b/pkg/wcloud/wcloud.go @@ -18,9 +18,9 @@ import ( "time" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/daystr" "github.com/wavetermdev/waveterm/pkg/wavebase" - "github.com/wavetermdev/waveterm/pkg/wshrpc" ) const WCloudEndpoint = "https://api.waveterm.dev/central" @@ -151,8 +151,8 @@ func doRequest(req *http.Request, outputObj interface{}) (*http.Response, error) } type TDataInputType struct { - ClientId string `json:"clientId"` - TEvents []*wshrpc.TEvent `json:"tevents"` + ClientId string `json:"clientId"` + TEvents []*telemetrydata.TEvent `json:"tevents"` } const TEventsBatchSize = 100 @@ -188,11 +188,7 @@ func sendTEventsBatch(clientId string) (bool, error) { return len(events) < TEventsBatchSize, nil } -func SendTEvents(clientId string) error { - if !telemetry.IsTelemetryEnabled() { - log.Printf("telemetry disabled, not sending\n") - return nil - } +func sendTEvents(clientId string) error { numIters := 0 for { numIters++ @@ -212,11 +208,23 @@ func SendTEvents(clientId string) error { return nil } -func SendTelemetry(ctx context.Context, clientId string) error { +func SendAllTelemetry(ctx context.Context, clientId string) error { if !telemetry.IsTelemetryEnabled() { log.Printf("telemetry disabled, not sending\n") return nil } + err := sendTEvents(clientId) + if err != nil { + return err + } + err = sendTelemetry(ctx, clientId) + if err != nil { + return err + } + return nil +} + +func sendTelemetry(ctx context.Context, clientId string) error { activity, err := telemetry.GetNonUploadedActivity(ctx) if err != nil { return fmt.Errorf("cannot get activity: %v", err) diff --git a/pkg/wshrpc/tevent.go b/pkg/wshrpc/tevent.go deleted file mode 100644 index c02d6f1bbf..0000000000 --- a/pkg/wshrpc/tevent.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2025, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -package wshrpc - -import ( - "encoding/json" - "fmt" - "regexp" - "time" - - "github.com/wavetermdev/waveterm/pkg/util/utilfn" -) - -type TEvent struct { - Ts int64 `json:"ts" db:"ts"` - TsLocal string `json:"tslocal" db:"-"` // iso8601 format (wall clock converted to PT) - Event string `json:"event" db:"event"` - Props map[string]any `json:"props" db:"-"` // Don't scan directly to map - - // DB fields - Id int64 `json:"-" db:"id"` - Uploaded bool `json:"-" db:"uploaded"` - - // For database scanning - RawProps string `json:"-" db:"props"` -} - -var phNestedProps = []string{"$set", "$set_once", "$add", "$unset"} -var eventNameRe = regexp.MustCompile(`^[a-zA-Z0-9.:_/-]+$`) -var propNameRe = regexp.MustCompile(`^[a-zA-Z0-9.:_/$-]+$`) - -// validates a tevent that was just created (not for validating out of the DB, or an uploaded TEvent) -// checks that TS is pretty current (or unset) -func (te *TEvent) ValidateCurrentTEvent() error { - if te == nil { - return fmt.Errorf("TEvent cannot be nil") - } - if te.Event == "" { - return fmt.Errorf("TEvent.Event cannot be empty") - } - if !eventNameRe.MatchString(te.Event) { - return fmt.Errorf("TEvent.Event invalid: %q", te.Event) - } - if te.Ts != 0 { - now := time.Now().UnixMilli() - if te.Ts > now+60000 || te.Ts < now-60000 { - return fmt.Errorf("TEvent.Ts is not current: %d", te.Ts) - } - } - err := validatePropNames(te.Props, true) - if err != nil { - return fmt.Errorf("TEvent.Props: %v", err) - } - barr, err := json.Marshal(te.Props) - if err != nil { - return fmt.Errorf("TEvent.Props JSON error: %v", err) - } - if len(barr) > 20000 { - return fmt.Errorf("TEvent.Props too large: %d", len(barr)) - } - return nil -} - -func validatePropNames(props map[string]any, topLevel bool) error { - if props == nil { - return nil - } - for k := range props { - if !propNameRe.MatchString(k) { - return fmt.Errorf("TEvent.Props key invalid: %q", k) - } - } - if !topLevel { - return nil - } - for _, k := range phNestedProps { - nestedMap := utilfn.ConvertMap(props[k]) - err := validatePropNames(nestedMap, false) - if err != nil { - return fmt.Errorf("%v in nestedMap %s", err, k) - } - } - return nil -} - -func MakeTEvent(event string, props map[string]any) *TEvent { - if event == "" { - panic("TEvent.Event cannot be empty") - } - if props == nil { - props = make(map[string]any) - } - now := time.Now() - localTime := utilfn.ConvertToWallClockPT(now) - return &TEvent{ - Ts: now.UnixMilli(), - TsLocal: localTime.Format(time.RFC3339), - Event: event, - Props: props, - } -} - -func (t *TEvent) EnsureTimestamps() { - if t.Ts == 0 { - t.Ts = time.Now().UnixMilli() - } - gtime := time.UnixMilli(t.Ts) - t.TsLocal = utilfn.ConvertToWallClockPT(gtime).Format(time.RFC3339) -} - -func (t *TEvent) SetUser(key string, value any) { - if t.Props == nil { - t.Props = make(map[string]any) - } - if t.Props["$set"] == nil { - t.Props["$set"] = make(map[string]any) - } - t.Props["$set"].(map[string]any)[key] = value -} - -func (t *TEvent) SetUserOnce(key string, value any) { - if t.Props == nil { - t.Props = make(map[string]any) - } - if t.Props["$set_once"] == nil { - t.Props["$set_once"] = make(map[string]any) - } - t.Props["$set_once"].(map[string]any)[key] = value -} - -func (t *TEvent) Set(key string, value any) { - if t.Props == nil { - t.Props = make(map[string]any) - } - t.Props[key] = value -} - -func (t *TEvent) ConvertRawJSON() error { - if t.RawProps != "" { - return json.Unmarshal([]byte(t.RawProps), &t.Props) - } - return nil -} diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 2480484ec2..3784f67039 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -6,6 +6,7 @@ package wshclient import ( + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/wshutil" "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wconfig" @@ -306,6 +307,12 @@ func PathCommand(w *wshutil.WshRpc, data wshrpc.PathCommandData, opts *wshrpc.Rp return resp, err } +// command "recordtevent", wshserver.RecordTEventCommand +func RecordTEventCommand(w *wshutil.WshRpc, data telemetrydata.TEvent, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "recordtevent", data, opts) + return err +} + // command "remotefilecopy", wshserver.RemoteFileCopyCommand func RemoteFileCopyCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteFileCopyData, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "remotefilecopy", data, opts) @@ -404,6 +411,12 @@ func RouteUnannounceCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { return err } +// command "sendtelemetry", wshserver.SendTelemetryCommand +func SendTelemetryCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "sendtelemetry", nil, opts) + return err +} + // command "setconfig", wshserver.SetConfigCommand func SetConfigCommand(w *wshutil.WshRpc, data wshrpc.MetaSettingsType, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "setconfig", data, opts) diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 4ff4a043d5..e424b388a0 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -13,6 +13,7 @@ import ( "reflect" "github.com/wavetermdev/waveterm/pkg/ijson" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/vdom" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wconfig" @@ -177,10 +178,11 @@ type WshRpcInterface interface { WaveInfoCommand(ctx context.Context) (*WaveInfoData, error) WshActivityCommand(ct context.Context, data map[string]int) error ActivityCommand(ctx context.Context, data ActivityUpdate) error - RecordTEventCommand(ctx context.Context, data TEvent) error + RecordTEventCommand(ctx context.Context, data telemetrydata.TEvent) error GetVarCommand(ctx context.Context, data CommandVarData) (*CommandVarResponseData, error) SetVarCommand(ctx context.Context, data CommandVarData) error PathCommand(ctx context.Context, data PathCommandData) (string, error) + SendTelemetryCommand(ctx context.Context) error // connection functions ConnStatusCommand(ctx context.Context) ([]ConnStatus, error) diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index c52f63ea41..d576175c99 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -27,6 +27,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/remote/conncontroller" "github.com/wavetermdev/waveterm/pkg/remote/fileshare" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/envutil" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -34,6 +35,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/waveai" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/waveobj" + "github.com/wavetermdev/waveterm/pkg/wcloud" "github.com/wavetermdev/waveterm/pkg/wconfig" "github.com/wavetermdev/waveterm/pkg/wcore" "github.com/wavetermdev/waveterm/pkg/wps" @@ -734,10 +736,18 @@ func (ws *WshServer) WorkspaceListCommand(ctx context.Context) ([]wshrpc.Workspa return rtn, nil } -func (ws *WshServer) RecordTEventCommand(ctx context.Context, data wshrpc.TEvent) error { +func (ws *WshServer) RecordTEventCommand(ctx context.Context, data telemetrydata.TEvent) error { return telemetry.RecordTEvent(ctx, &data) } +func (ws WshServer) SendTelemetryCommand(ctx context.Context) error { + client, err := wstore.DBGetSingleton[*waveobj.Client](ctx) + if err != nil { + return fmt.Errorf("getting client data for telemetry: %v", err) + } + return wcloud.SendAllTelemetry(ctx, client.OID) +} + var wshActivityRe = regexp.MustCompile(`^[a-z:#]+$`) func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int) error { From cdf8cb211fe026c3111fa0bb3e6bdf689e0645ab Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 31 Jan 2025 17:56:32 -0800 Subject: [PATCH 10/19] telemetry updates, bug fixes, uuid in TEvent --- cmd/server/main-server.go | 16 +++- db/migrations-wstore/000007_events.up.sql | 2 +- emain/emain.ts | 2 +- frontend/types/gotypes.d.ts | 9 +++ pkg/panichandler/panichandler.go | 4 +- pkg/telemetry/telemetry.go | 85 +++++++++++++++----- pkg/telemetry/telemetrydata/telemetrydata.go | 35 ++++++-- pkg/wcloud/wcloud.go | 41 ++++++---- pkg/wshutil/wshutil.go | 4 +- 9 files changed, 149 insertions(+), 49 deletions(-) diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index e1eb265dde..39f73894e5 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -151,8 +151,17 @@ func startupActivityUpdate() { if err != nil { log.Printf("error updating startup activity: %v\n", err) } - tevent := telemetrydata.MakeTEvent("startup", telemetrydata.TEventProps{ + tevent := telemetrydata.MakeTEvent("app:startup", telemetrydata.TEventProps{ ClientVersion: "v" + WaveVersion, + UserSet: &telemetrydata.TEventUserProps{ + ClientVersion: "v" + WaveVersion, + ClientBuildTime: BuildTime, + ClientArch: wavebase.ClientArch(), + ClientOSRelease: wavebase.UnameKernelRelease(), + }, + UserSetOnce: &telemetrydata.TEventUserProps{ + ClientInitialVersion: "v" + WaveVersion, + }, }) err = telemetry.RecordTEvent(ctx, tevent) if err != nil { @@ -168,6 +177,11 @@ func shutdownActivityUpdate() { if err != nil { log.Printf("error updating shutdown activity: %v\n", err) } + tevent := telemetrydata.MakeTEvent("app:shutdown", telemetrydata.TEventProps{}) + err = telemetry.RecordTEvent(ctx, tevent) + if err != nil { + log.Printf("error recording shutdown event: %v\n", err) + } } func createMainWshClient() { diff --git a/db/migrations-wstore/000007_events.up.sql b/db/migrations-wstore/000007_events.up.sql index 35e059a24e..3c6311960c 100644 --- a/db/migrations-wstore/000007_events.up.sql +++ b/db/migrations-wstore/000007_events.up.sql @@ -1,5 +1,5 @@ CREATE TABLE db_tevent ( - id INTEGER PRIMARY KEY, + uuid varchar(36) PRIMARY KEY, ts int NOT NULL, tslocal varchar(100) NOT NULL, event varchar(50) NOT NULL, diff --git a/emain/emain.ts b/emain/emain.ts index 1308aa2ab0..6f1ab40c57 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -475,7 +475,7 @@ function logActiveState() { await RpcApi.RecordTEventCommand( ElectronWshClient, { - event: "activity", + event: "app:activity", props: { "activity:activeminutes": activity.activeminutes, "activity:fgminutes": activity.fgminutes, diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index e2b7a7871b..35aaf599c0 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -747,6 +747,7 @@ declare global { // telemetrydata.TEvent type TEvent = { + uuid?: string; ts?: number; tslocal?: string; event: string; @@ -758,7 +759,13 @@ declare global { "activity:activeminutes"?: number; "activity:fgminutes"?: number; "activity:openminutes"?: number; + "client:arch"?: string; "client:version"?: string; + "client:initial_version"?: string; + "client:buildtime"?: string; + "client:osrelease"?: string; + "loc:countrycode"?: string; + "loc:regioncode"?: string; $set?: TEventUserProps; $set_once?: TEventUserProps; }; @@ -770,6 +777,8 @@ declare global { "client:initial_version"?: string; "client:buildtime"?: string; "client:osrelease"?: string; + "loc:countrycode"?: string; + "loc:regioncode"?: string; }; // waveobj.Tab diff --git a/pkg/panichandler/panichandler.go b/pkg/panichandler/panichandler.go index deb7441f74..1272ca7a95 100644 --- a/pkg/panichandler/panichandler.go +++ b/pkg/panichandler/panichandler.go @@ -30,7 +30,9 @@ func PanicHandler(debugStr string, recoverVal any) error { debug.PrintStack() if PanicTelemetryHandler != nil { go func() { - defer PanicHandlerNoTelemetry("PanicTelemetryHandler", recover()) + defer func() { + PanicHandlerNoTelemetry("PanicTelemetryHandler", recover()) + }() PanicTelemetryHandler() }() } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index a2a2ed3a79..b31c008644 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -11,6 +11,7 @@ import ( "log" "time" + "github.com/google/uuid" "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/daystr" @@ -88,7 +89,9 @@ func AutoUpdateChannel() string { // Wraps UpdateCurrentActivity, spawns goroutine, and logs errors func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { go func() { - defer panichandler.PanicHandlerNoTelemetry("GoUpdateActivityWrap", recover()) + defer func() { + panichandler.PanicHandlerNoTelemetry("GoUpdateActivityWrap", recover()) + }() ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() err := UpdateActivity(ctx, update) @@ -99,11 +102,23 @@ func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { }() } -func InsertTEvent(ctx context.Context, event *telemetrydata.TEvent) error { +func insertTEvent(ctx context.Context, event *telemetrydata.TEvent) error { + if event.Uuid == "" { + return fmt.Errorf("cannot insert TEvent: uuid is empty") + } + if event.Ts == 0 { + return fmt.Errorf("cannot insert TEvent: ts is 0") + } + if event.TsLocal == "" { + return fmt.Errorf("cannot insert TEvent: tslocal is empty") + } + if event.Event == "" { + return fmt.Errorf("cannot insert TEvent: event is empty") + } return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { - query := `INSERT INTO db_tevent (ts, tslocal, event, props) - VALUES (?, ?, ?, ?)` - tx.Exec(query, event.Ts, event.TsLocal, event.Event, dbutil.QuickJson(event.Props)) + query := `INSERT INTO db_tevent (uuid, ts, tslocal, event, props) + VALUES (?, ?, ?, ?, ?)` + tx.Exec(query, event.Uuid, event.Ts, event.TsLocal, event.Event, dbutil.QuickJson(event.Props)) return nil }) } @@ -116,7 +131,7 @@ func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetry } // ignores the timestamp in tevent, and uses the current time -func UpdateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { +func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { eventTs := time.Now() // compute to hour boundary, and round up to next hour eventTs = eventTs.Truncate(time.Hour).Add(time.Hour) @@ -124,9 +139,10 @@ func UpdateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err // find event that matches this timestamp with event name "activity" var hasRow bool var curActivity telemetrydata.TEventProps - rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) - if rawProps != "" { + uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) + if uuidStr != "" { hasRow = true + rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE uuid = ?`, uuidStr) err := json.Unmarshal([]byte(rawProps), &curActivity) if err != nil { // ignore, curActivity will just be 0 @@ -135,41 +151,72 @@ func UpdateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err } mergeActivity(&curActivity, tevent.Props) if hasRow { - query := `UPDATE db_tevent SET props = ? WHERE ts = ? AND event = ?` - tx.Exec(query, dbutil.QuickJson(curActivity), eventTs.UnixMilli(), ActivityEventName) + query := `UPDATE db_tevent SET props = ? WHERE uuid = ?` + tx.Exec(query, dbutil.QuickJson(curActivity), uuidStr) } else { - query := `INSERT INTO db_tevent (ts, tslocal, event, props) VALUES (?, ?, ?, ?)` + query := `INSERT INTO db_tevent (uuid, ts, tslocal, event, props) VALUES (?, ?, ?, ?)` tsLocal := utilfn.ConvertToWallClockPT(eventTs).Format(time.RFC3339) - tx.Exec(query, eventTs.UnixMilli(), tsLocal, ActivityEventName, dbutil.QuickJson(curActivity)) + tx.Exec(query, uuid.New().String(), eventTs.UnixMilli(), tsLocal, ActivityEventName, dbutil.QuickJson(curActivity)) } return nil }) } +func GoRecordTEventWrap(tevent *telemetrydata.TEvent) { + if tevent == nil || tevent.Event == "" { + return + } + go func() { + defer func() { + panichandler.PanicHandlerNoTelemetry("GoRecordTEventWrap", recover()) + }() + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + err := RecordTEvent(ctx, tevent) + if err != nil { + // ignore error, just log, since this is not critical + log.Printf("error recording %q telemetry event: %v\n", tevent.Event, err) + } + }() +} + func RecordTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { if tevent == nil { return nil } + if tevent.Uuid == "" { + tevent.Uuid = uuid.New().String() + } err := tevent.Validate(true) if err != nil { return err } tevent.EnsureTimestamps() if tevent.Event == ActivityEventName { - return UpdateActivityTEvent(ctx, tevent) + return updateActivityTEvent(ctx, tevent) } - return InsertTEvent(ctx, tevent) + return insertTEvent(ctx, tevent) +} + +func CleanOldTEvents(ctx context.Context) error { + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + // delete events older than 28 days + query := `DELETE FROM db_tevent WHERE ts < ?` + olderThan := time.Now().AddDate(0, 0, -28).UnixMilli() + tx.Exec(query, olderThan) + return nil + }) } func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*telemetrydata.TEvent, error) { now := time.Now() return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*telemetrydata.TEvent, error) { var rtn []*telemetrydata.TEvent - query := `SELECT id, ts, tslocal, event, props, uploaded FROM db_tevent WHERE uploaded = 0 AND ts <= ? ORDER BY ts LIMIT ?` + query := `SELECT uuid, ts, tslocal, event, props, uploaded FROM db_tevent WHERE uploaded = 0 AND ts <= ? ORDER BY ts LIMIT ?` tx.Select(&rtn, query, now.UnixMilli(), maxEvents) for _, event := range rtn { if err := event.ConvertRawJSON(); err != nil { - return nil, fmt.Errorf("scan json for event %d: %w", event.Id, err) + return nil, fmt.Errorf("scan json for event %s: %w", event.Uuid, err) } } return rtn, nil @@ -178,11 +225,11 @@ func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*telemetrydata func MarkTEventsAsUploaded(ctx context.Context, events []*telemetrydata.TEvent) error { return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { - ids := make([]int64, 0, len(events)) + ids := make([]string, 0, len(events)) for _, event := range events { - ids = append(ids, event.Id) + ids = append(ids, event.Uuid) } - query := `UPDATE db_tevent SET uploaded = 1 WHERE id IN (SELECT value FROM json_each(?))` + query := `UPDATE db_tevent SET uploaded = 1 WHERE uuid IN (SELECT value FROM json_each(?))` tx.Exec(query, dbutil.QuickJson(ids)) return nil }) diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go index 203fb1b251..9b66878910 100644 --- a/pkg/telemetry/telemetrydata/telemetrydata.go +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -9,6 +9,7 @@ import ( "regexp" "time" + "github.com/google/uuid" "github.com/wavetermdev/waveterm/pkg/util/utilfn" ) @@ -19,14 +20,14 @@ var ValidEventNames = map[string]bool{ } type TEvent struct { + Uuid string `json:"uuid,omitempty" db:"uuid"` Ts int64 `json:"ts,omitempty" db:"ts"` TsLocal string `json:"tslocal,omitempty" db:"tslocal"` // iso8601 format (wall clock converted to PT) Event string `json:"event" db:"event"` Props TEventProps `json:"props" db:"-"` // Don't scan directly to map // DB fields - Id int64 `json:"-" db:"id"` - Uploaded bool `json:"-" db:"uploaded"` + Uploaded bool `json:"-" db:"uploaded"` // For database scanning RawProps string `json:"-" db:"props"` @@ -38,21 +39,32 @@ type TEventUserProps struct { ClientInitialVersion string `json:"client:initial_version,omitempty"` ClientBuildTime string `json:"client:buildtime,omitempty"` ClientOSRelease string `json:"client:osrelease,omitempty"` + LocCountryCode string `json:"loc:countrycode,omitempty"` + LocRegionCode string `json:"loc:regioncode,omitempty"` } type TEventProps struct { - ActiveMinutes int `json:"activity:activeminutes,omitempty"` - FgMinutes int `json:"activity:fgminutes,omitempty"` - OpenMinutes int `json:"activity:openminutes,omitempty"` - ClientVersion string `json:"client:version,omitempty"` - UserSet *TEventUserProps `json:"$set,omitempty"` - UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` + ActiveMinutes int `json:"activity:activeminutes,omitempty"` + FgMinutes int `json:"activity:fgminutes,omitempty"` + OpenMinutes int `json:"activity:openminutes,omitempty"` + + ClientArch string `json:"client:arch,omitempty"` + ClientVersion string `json:"client:version,omitempty"` + ClientInitialVersion string `json:"client:initial_version,omitempty"` + ClientBuildTime string `json:"client:buildtime,omitempty"` + ClientOSRelease string `json:"client:osrelease,omitempty"` + LocCountryCode string `json:"loc:countrycode,omitempty"` + LocRegionCode string `json:"loc:regioncode,omitempty"` + + UserSet *TEventUserProps `json:"$set,omitempty"` + UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` } func MakeTEvent(event string, props TEventProps) *TEvent { now := time.Now() // TsLocal gets set in EnsureTimestamps() return &TEvent{ + Uuid: uuid.New().String(), Ts: now.UnixMilli(), Event: event, Props: props, @@ -117,6 +129,13 @@ func (te *TEvent) Validate(current bool) error { if !ValidEventNames[te.Event] { return fmt.Errorf("TEvent.Event not valid: %q", te.Event) } + if te.Uuid == "" { + return fmt.Errorf("TEvent.Uuid cannot be empty") + } + _, err := uuid.Parse(te.Uuid) + if err != nil { + return fmt.Errorf("TEvent.Uuid invalid: %v", err) + } if current { if te.Ts != 0 { now := time.Now().UnixMilli() diff --git a/pkg/wcloud/wcloud.go b/pkg/wcloud/wcloud.go index 7d161eced3..b7743aa86d 100644 --- a/pkg/wcloud/wcloud.go +++ b/pkg/wcloud/wcloud.go @@ -150,53 +150,55 @@ func doRequest(req *http.Request, outputObj interface{}) (*http.Response, error) return resp, nil } -type TDataInputType struct { - ClientId string `json:"clientId"` - TEvents []*telemetrydata.TEvent `json:"tevents"` +type TEventsInputType struct { + ClientId string `json:"clientid"` + Events []*telemetrydata.TEvent `json:"events"` } const TEventsBatchSize = 100 -// returns (done, error) -func sendTEventsBatch(clientId string) (bool, error) { +// returns (done, num-sent, error) +func sendTEventsBatch(clientId string) (bool, int, error) { ctx, cancelFn := context.WithTimeout(context.Background(), WCloudDefaultTimeout) defer cancelFn() events, err := telemetry.GetNonUploadedTEvents(ctx, TEventsBatchSize) if err != nil { - return true, fmt.Errorf("cannot get events: %v", err) + return true, 0, fmt.Errorf("cannot get events: %v", err) } if len(events) == 0 { - return true, nil + return true, 0, nil } log.Printf("[wcloud] sending %d tevents\n", len(events)) - input := TDataInputType{ + input := TEventsInputType{ ClientId: clientId, - TEvents: events, + Events: events, } req, err := makeAnonPostReq(ctx, TEventsUrl, input) if err != nil { - return true, err + return true, 0, err } _, err = doRequest(req, nil) if err != nil { - return true, err + return true, 0, err } err = telemetry.MarkTEventsAsUploaded(ctx, events) if err != nil { - return true, fmt.Errorf("error marking activity as uploaded: %v", err) + return true, 0, fmt.Errorf("error marking activity as uploaded: %v", err) } - return len(events) < TEventsBatchSize, nil + return len(events) < TEventsBatchSize, len(events), nil } -func sendTEvents(clientId string) error { +func sendTEvents(clientId string) (int, error) { numIters := 0 + totalEvents := 0 for { numIters++ - done, err := sendTEventsBatch(clientId) + done, numEvents, err := sendTEventsBatch(clientId) if err != nil { log.Printf("error sending telemetry events: %v\n", err) break } + totalEvents += numEvents if done { break } @@ -205,15 +207,20 @@ func sendTEvents(clientId string) error { break } } - return nil + return totalEvents, nil } func SendAllTelemetry(ctx context.Context, clientId string) error { + defer func() { + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + telemetry.CleanOldTEvents(ctx) + }() if !telemetry.IsTelemetryEnabled() { log.Printf("telemetry disabled, not sending\n") return nil } - err := sendTEvents(clientId) + _, err := sendTEvents(clientId) if err != nil { return err } diff --git a/pkg/wshutil/wshutil.go b/pkg/wshutil/wshutil.go index 871fd72d1f..74ca6ac96c 100644 --- a/pkg/wshutil/wshutil.go +++ b/pkg/wshutil/wshutil.go @@ -156,7 +156,9 @@ func installShutdownSignalHandlers(quiet bool) { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) go func() { - defer panichandler.PanicHandlerNoTelemetry("installShutdownSignalHandlers", recover()) + defer func() { + panichandler.PanicHandlerNoTelemetry("installShutdownSignalHandlers", recover()) + }() for sig := range sigCh { DoShutdown(fmt.Sprintf("got signal %v", sig), 1, quiet) break From a94ed6295efb416fff7644fdc5e24b430d52f60a Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 31 Jan 2025 18:07:18 -0800 Subject: [PATCH 11/19] truncate the app:activity event to 'now' on app shutdown --- cmd/server/main-server.go | 4 ++++ pkg/telemetry/telemetry.go | 21 +++++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index 39f73894e5..cf6456bdcd 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -177,6 +177,10 @@ func shutdownActivityUpdate() { if err != nil { log.Printf("error updating shutdown activity: %v\n", err) } + err = telemetry.TruncateActivityTEventForShutdown(ctx) + if err != nil { + log.Printf("error truncating activity t-event for shutdown: %v\n", err) + } tevent := telemetrydata.MakeTEvent("app:shutdown", telemetrydata.TEventProps{}) err = telemetry.RecordTEvent(ctx, tevent) if err != nil { diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index b31c008644..05e7e2787f 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -24,7 +24,7 @@ import ( ) const MaxTzNameLen = 50 -const ActivityEventName = "activity" +const ActivityEventName = "app:activity" type ActivityType struct { Day string `json:"day"` @@ -136,7 +136,7 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err // compute to hour boundary, and round up to next hour eventTs = eventTs.Truncate(time.Hour).Add(time.Hour) return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { - // find event that matches this timestamp with event name "activity" + // find event that matches this timestamp with event name "app:activity" var hasRow bool var curActivity telemetrydata.TEventProps uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) @@ -162,6 +162,23 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err }) } +func TruncateActivityTEventForShutdown(ctx context.Context) error { + nowTs := time.Now() + eventTs := nowTs.Truncate(time.Hour).Add(time.Hour) + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + // find event that matches this timestamp with event name "app:activity" + uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) + if uuidStr == "" { + return nil + } + // we're going to update this app:activity event back to nowTs + tsLocal := utilfn.ConvertToWallClockPT(nowTs).Format(time.RFC3339) + query := `UPDATE db_tevent SET ts = ?, tslocal = ? WHERE uuid = ?` + tx.Exec(query, nowTs.UnixMilli(), tsLocal, uuidStr) + return nil + }) +} + func GoRecordTEventWrap(tevent *telemetrydata.TEvent) { if tevent == nil || tevent.Event == "" { return From 77d9969188b1f676ea854fdbec00c2eca4c229e2 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 31 Jan 2025 18:37:39 -0800 Subject: [PATCH 12/19] updates, more client stats --- cmd/server/main-server.go | 23 +++++++++++--------- pkg/telemetry/telemetry.go | 2 +- pkg/telemetry/telemetrydata/telemetrydata.go | 13 +++++------ pkg/wshrpc/wshserver/wshserver.go | 6 ++++- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index cf6456bdcd..7d0be15237 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -83,7 +83,7 @@ func stdinReadWatch() { } } -func configWatcher() { +func startConfigWatcher() { watcher := wconfig.GetWatcher() if watcher != nil { watcher.Start() @@ -151,13 +151,17 @@ func startupActivityUpdate() { if err != nil { log.Printf("error updating startup activity: %v\n", err) } + autoUpdateChannel := telemetry.AutoUpdateChannel() + autoUpdateEnabled := telemetry.IsAutoUpdateEnabled() tevent := telemetrydata.MakeTEvent("app:startup", telemetrydata.TEventProps{ - ClientVersion: "v" + WaveVersion, UserSet: &telemetrydata.TEventUserProps{ - ClientVersion: "v" + WaveVersion, - ClientBuildTime: BuildTime, - ClientArch: wavebase.ClientArch(), - ClientOSRelease: wavebase.UnameKernelRelease(), + ClientVersion: "v" + WaveVersion, + ClientBuildTime: BuildTime, + ClientArch: wavebase.ClientArch(), + ClientOSRelease: wavebase.UnameKernelRelease(), + ClientIsDev: wavebase.IsDevMode(), + AutoUpdateChannel: autoUpdateChannel, + AutoUpdateEnabled: autoUpdateEnabled, }, UserSetOnce: &telemetrydata.TEventUserProps{ ClientInitialVersion: "v" + WaveVersion, @@ -309,15 +313,14 @@ func main() { } createMainWshClient() - sigutil.InstallShutdownSignalHandlers(doShutdown) sigutil.InstallSIGUSR1Handler() - - startupActivityUpdate() go stdinReadWatch() go telemetryLoop() - configWatcher() + startConfigWatcher() + startupActivityUpdate() // must be after startConfigWatcher() blocklogger.InitBlockLogger() + webListener, err := web.MakeTCPListener("web") if err != nil { log.Printf("error creating web listener: %v\n", err) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 05e7e2787f..45fbf00292 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -154,7 +154,7 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err query := `UPDATE db_tevent SET props = ? WHERE uuid = ?` tx.Exec(query, dbutil.QuickJson(curActivity), uuidStr) } else { - query := `INSERT INTO db_tevent (uuid, ts, tslocal, event, props) VALUES (?, ?, ?, ?)` + query := `INSERT INTO db_tevent (uuid, ts, tslocal, event, props) VALUES (?, ?, ?, ?, ?)` tsLocal := utilfn.ConvertToWallClockPT(eventTs).Format(time.RFC3339) tx.Exec(query, uuid.New().String(), eventTs.UnixMilli(), tsLocal, ActivityEventName, dbutil.QuickJson(curActivity)) } diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go index 9b66878910..8a9c2e75a9 100644 --- a/pkg/telemetry/telemetrydata/telemetrydata.go +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -39,23 +39,20 @@ type TEventUserProps struct { ClientInitialVersion string `json:"client:initial_version,omitempty"` ClientBuildTime string `json:"client:buildtime,omitempty"` ClientOSRelease string `json:"client:osrelease,omitempty"` + ClientIsDev bool `json:"client:isdev,omitempty"` + AutoUpdateChannel string `json:"autoupdate:channel,omitempty"` + AutoUpdateEnabled bool `json:"autoupdate:enabled,omitempty"` LocCountryCode string `json:"loc:countrycode,omitempty"` LocRegionCode string `json:"loc:regioncode,omitempty"` } type TEventProps struct { + TEventUserProps // generally don't need to set these since they will be automatically copied over + ActiveMinutes int `json:"activity:activeminutes,omitempty"` FgMinutes int `json:"activity:fgminutes,omitempty"` OpenMinutes int `json:"activity:openminutes,omitempty"` - ClientArch string `json:"client:arch,omitempty"` - ClientVersion string `json:"client:version,omitempty"` - ClientInitialVersion string `json:"client:initial_version,omitempty"` - ClientBuildTime string `json:"client:buildtime,omitempty"` - ClientOSRelease string `json:"client:osrelease,omitempty"` - LocCountryCode string `json:"loc:countrycode,omitempty"` - LocRegionCode string `json:"loc:regioncode,omitempty"` - UserSet *TEventUserProps `json:"$set,omitempty"` UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` } diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index d576175c99..01b0caecd5 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -737,7 +737,11 @@ func (ws *WshServer) WorkspaceListCommand(ctx context.Context) ([]wshrpc.Workspa } func (ws *WshServer) RecordTEventCommand(ctx context.Context, data telemetrydata.TEvent) error { - return telemetry.RecordTEvent(ctx, &data) + err := telemetry.RecordTEvent(ctx, &data) + if err != nil { + log.Printf("error recording telemetry event: %v", err) + } + return err } func (ws WshServer) SendTelemetryCommand(ctx context.Context) error { From 7b0769f674ebe7f70115eda733527a15ff98f891 Mon Sep 17 00:00:00 2001 From: sawka Date: Fri, 31 Jan 2025 18:43:14 -0800 Subject: [PATCH 13/19] update --- frontend/types/gotypes.d.ts | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 35aaf599c0..fa81e6c48a 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -756,16 +756,10 @@ declare global { // telemetrydata.TEventProps type TEventProps = { + TEventUserProps: TEventUserProps; "activity:activeminutes"?: number; "activity:fgminutes"?: number; "activity:openminutes"?: number; - "client:arch"?: string; - "client:version"?: string; - "client:initial_version"?: string; - "client:buildtime"?: string; - "client:osrelease"?: string; - "loc:countrycode"?: string; - "loc:regioncode"?: string; $set?: TEventUserProps; $set_once?: TEventUserProps; }; @@ -777,6 +771,9 @@ declare global { "client:initial_version"?: string; "client:buildtime"?: string; "client:osrelease"?: string; + "client:isdev"?: boolean; + "autoupdate:channel"?: string; + "autoupdate:enabled"?: boolean; "loc:countrycode"?: string; "loc:regioncode"?: string; }; From 181ca56c2789c198122a355338ad43156f45b902 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Feb 2025 10:21:49 -0800 Subject: [PATCH 14/19] working on more posthog integration --- cmd/server/main-server.go | 5 ++++- frontend/app/block/blockframe.tsx | 2 ++ frontend/app/store/global.ts | 10 +++++++++ frontend/app/tab/tab.tsx | 5 +++-- frontend/types/gotypes.d.ts | 7 ++++++- pkg/panichandler/panichandler.go | 4 ++-- pkg/telemetry/telemetrydata/telemetrydata.go | 22 ++++++++++++++++---- pkg/waveai/waveai.go | 13 ++++++++++++ pkg/wcore/block.go | 7 +++++++ pkg/wcore/workspace.go | 4 ++++ pkg/wshrpc/wshserver/wshserver.go | 10 +++++++++ 11 files changed, 79 insertions(+), 10 deletions(-) diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index 7d0be15237..c66a392952 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -102,12 +102,15 @@ func telemetryLoop() { } } -func panicTelemetryHandler() { +func panicTelemetryHandler(panicName string) { activity := wshrpc.ActivityUpdate{NumPanics: 1} err := telemetry.UpdateActivity(context.Background(), activity) if err != nil { log.Printf("error updating activity (panicTelemetryHandler): %v\n", err) } + telemetry.RecordTEvent(context.Background(), telemetrydata.MakeTEvent("debug:panic", telemetrydata.TEventProps{ + PanicType: panicName, + })) } func sendTelemetryWrapper() { diff --git a/frontend/app/block/blockframe.tsx b/frontend/app/block/blockframe.tsx index e519abde8f..b69a18572d 100644 --- a/frontend/app/block/blockframe.tsx +++ b/frontend/app/block/blockframe.tsx @@ -12,6 +12,7 @@ import { getConnStatusAtom, getSettingsKeyAtom, globalStore, + recordTEvent, useBlockAtom, WOS, } from "@/app/store/global"; @@ -182,6 +183,7 @@ const BlockFrame_Header = ({ return; } RpcApi.ActivityCommand(TabRpcClient, { nummagnify: 1 }); + recordTEvent("action:magnify"); }, [magnified]); if (blockData?.meta?.["frame:title"]) { diff --git a/frontend/app/store/global.ts b/frontend/app/store/global.ts index e80c3e7bc9..dcd161cf8a 100644 --- a/frontend/app/store/global.ts +++ b/frontend/app/store/global.ts @@ -1,6 +1,8 @@ // Copyright 2025, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 +import { RpcApi } from "@/app/store/wshclientapi"; +import { TabRpcClient } from "@/app/store/wshrpcutil"; import { getLayoutModelForTabById, LayoutTreeActionType, @@ -667,6 +669,13 @@ function setActiveTab(tabId: string) { getApi().setActiveTab(tabId); } +function recordTEvent(event: string, props?: TEventProps) { + if (props == null) { + props = {}; + } + RpcApi.RecordTEventCommand(TabRpcClient, { event, props }, { noresponse: true }); +} + export { atoms, counterInc, @@ -695,6 +704,7 @@ export { PLATFORM, pushFlashError, pushNotification, + recordTEvent, refocusNode, registerBlockComponentModel, removeFlashError, diff --git a/frontend/app/tab/tab.tsx b/frontend/app/tab/tab.tsx index d253dd7d00..2024b06c31 100644 --- a/frontend/app/tab/tab.tsx +++ b/frontend/app/tab/tab.tsx @@ -1,7 +1,7 @@ // Copyright 2025, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -import { atoms, globalStore, refocusNode } from "@/app/store/global"; +import { atoms, globalStore, recordTEvent, refocusNode } from "@/app/store/global"; import { RpcApi } from "@/app/store/wshclientapi"; import { TabRpcClient } from "@/app/store/wshrpcutil"; import { Button } from "@/element/button"; @@ -183,7 +183,8 @@ const Tab = memo( click: () => fireAndForget(async () => { await ObjectService.UpdateObjectMeta(oref, preset); - await RpcApi.ActivityCommand(TabRpcClient, { settabtheme: 1 }); + RpcApi.ActivityCommand(TabRpcClient, { settabtheme: 1 }, { noresponse: true }); + recordTEvent("action:settabtheme"); }), }); } diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index fa81e6c48a..e199514580 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -756,10 +756,15 @@ declare global { // telemetrydata.TEventProps type TEventProps = { - TEventUserProps: TEventUserProps; "activity:activeminutes"?: number; "activity:fgminutes"?: number; "activity:openminutes"?: number; + "action:initiator"?: "keyboard" | "mouse"; + "debug:panictype"?: string; + "block:view"?: string; + "ai:backendtype"?: string; + "wsh:cmd"?: string; + "wsh:haderror"?: boolean; $set?: TEventUserProps; $set_once?: TEventUserProps; }; diff --git a/pkg/panichandler/panichandler.go b/pkg/panichandler/panichandler.go index 1272ca7a95..8a71c79765 100644 --- a/pkg/panichandler/panichandler.go +++ b/pkg/panichandler/panichandler.go @@ -11,7 +11,7 @@ import ( // to log NumPanics into the local telemetry system // gets around import cycles -var PanicTelemetryHandler func() +var PanicTelemetryHandler func(panicType string) func PanicHandlerNoTelemetry(debugStr string, recoverVal any) { if recoverVal == nil { @@ -33,7 +33,7 @@ func PanicHandler(debugStr string, recoverVal any) error { defer func() { PanicHandlerNoTelemetry("PanicTelemetryHandler", recover()) }() - PanicTelemetryHandler() + PanicTelemetryHandler(debugStr) }() } if err, ok := recoverVal.(error); ok { diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go index 8a9c2e75a9..b13a9fcb12 100644 --- a/pkg/telemetry/telemetrydata/telemetrydata.go +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -14,9 +14,16 @@ import ( ) var ValidEventNames = map[string]bool{ - "app:startup": true, - "app:shutdown": true, - "app:activity": true, + "app:startup": true, + "app:shutdown": true, + "app:activity": true, + "action:magnify": true, + "action:settabtheme": true, + "action:runaicmd": true, + "action:createtab": true, + "action:createblock": true, + "wsh:run": true, + "debug:panic": true, } type TEvent struct { @@ -47,12 +54,19 @@ type TEventUserProps struct { } type TEventProps struct { - TEventUserProps // generally don't need to set these since they will be automatically copied over + TEventUserProps `tstype:"-"` // generally don't need to set these since they will be automatically copied over ActiveMinutes int `json:"activity:activeminutes,omitempty"` FgMinutes int `json:"activity:fgminutes,omitempty"` OpenMinutes int `json:"activity:openminutes,omitempty"` + ActionInitiator string `json:"action:initiator,omitempty" tstype:"\"keyboard\" | \"mouse\""` + PanicType string `json:"debug:panictype,omitempty"` + BlockView string `json:"block:view,omitempty"` + AiBackendType string `json:"ai:backendtype,omitempty"` + WshCmd string `json:"wsh:cmd,omitempty"` + WshHadError bool `json:"wsh:haderror,omitempty"` + UserSet *TEventUserProps `json:"$set,omitempty"` UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` } diff --git a/pkg/waveai/waveai.go b/pkg/waveai/waveai.go index b9198f5950..89c9bdfc4f 100644 --- a/pkg/waveai/waveai.go +++ b/pkg/waveai/waveai.go @@ -8,6 +8,7 @@ import ( "log" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/wshrpc" ) @@ -63,24 +64,36 @@ func RunAICommand(ctx context.Context, request wshrpc.WaveAIStreamRequest) chan endpoint = "default" } var backend AIBackend + var backendType string if request.Opts.APIType == ApiType_Anthropic { backend = AnthropicBackend{} + backendType = ApiType_Anthropic } else if request.Opts.APIType == ApiType_Perplexity { backend = PerplexityBackend{} + backendType = ApiType_Perplexity } else if request.Opts.APIType == APIType_Google { backend = GoogleBackend{} + backendType = APIType_Google } else if IsCloudAIRequest(request.Opts) { endpoint = "waveterm cloud" request.Opts.APIType = APIType_OpenAI request.Opts.Model = "default" backend = WaveAICloudBackend{} + backendType = "wave" } else { backend = OpenAIBackend{} + backendType = APIType_OpenAI } if backend == nil { log.Printf("no backend found for %s\n", request.Opts.APIType) return nil } + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "action:runaicmd", + Props: telemetrydata.TEventProps{ + AiBackendType: backendType, + }, + }) log.Printf("sending ai chat message to %s endpoint %q using model %s\n", request.Opts.APIType, endpoint, request.Opts.Model) return backend.StreamCompletion(ctx, request) diff --git a/pkg/wcore/block.go b/pkg/wcore/block.go index 24786c588d..8eb720ab8e 100644 --- a/pkg/wcore/block.go +++ b/pkg/wcore/block.go @@ -14,6 +14,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/filestore" "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wps" @@ -106,6 +107,12 @@ func CreateBlock(ctx context.Context, tabId string, blockDef *waveobj.BlockDef, telemetry.UpdateActivity(tctx, wshrpc.ActivityUpdate{ Renderers: map[string]int{blockView: 1}, }) + telemetry.RecordTEvent(tctx, &telemetrydata.TEvent{ + Event: "action:createblock", + Props: telemetrydata.TEventProps{ + BlockView: blockView, + }, + }) }() return blockData, nil } diff --git a/pkg/wcore/workspace.go b/pkg/wcore/workspace.go index f69926c71f..5ae340d0fa 100644 --- a/pkg/wcore/workspace.go +++ b/pkg/wcore/workspace.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/wavetermdev/waveterm/pkg/eventbus" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wconfig" @@ -236,6 +237,9 @@ func CreateTab(ctx context.Context, workspaceId string, tabName string, activate } } telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{NewTab: 1}, "createtab") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "action:createtab", + }) return tab.OID, nil } diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index 01b0caecd5..0ed0ae4e20 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -758,6 +758,7 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int if len(data) == 0 { return nil } + props := telemetrydata.TEventProps{} for key, value := range data { if len(key) > 20 { delete(data, key) @@ -768,11 +769,20 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int if value != 1 { delete(data, key) } + if strings.HasSuffix(key, "#error") { + props.WshHadError = true + } else { + props.WshCmd = key + } } activityUpdate := wshrpc.ActivityUpdate{ WshCmds: data, } telemetry.GoUpdateActivityWrap(activityUpdate, "wsh-activity") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "wsh:run", + Props: props, + }) return nil } From be31352f26a4bacce8aa0785f2cbeebf69824e06 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Feb 2025 10:59:04 -0800 Subject: [PATCH 15/19] record view name --- frontend/app/block/blockframe.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/app/block/blockframe.tsx b/frontend/app/block/blockframe.tsx index b69a18572d..fe8e15b9bf 100644 --- a/frontend/app/block/blockframe.tsx +++ b/frontend/app/block/blockframe.tsx @@ -183,7 +183,7 @@ const BlockFrame_Header = ({ return; } RpcApi.ActivityCommand(TabRpcClient, { nummagnify: 1 }); - recordTEvent("action:magnify"); + recordTEvent("action:magnify", { "block:view": viewName }); }, [magnified]); if (blockData?.meta?.["frame:title"]) { From 15724141f7f46586ef2bde102316226e104b0955 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Feb 2025 11:04:47 -0800 Subject: [PATCH 16/19] more tevents --- pkg/remote/conncontroller/conncontroller.go | 13 +++++++++++++ pkg/telemetry/telemetrydata/telemetrydata.go | 3 +++ pkg/wslconn/wslconn.go | 13 +++++++++++++ 3 files changed, 29 insertions(+) diff --git a/pkg/remote/conncontroller/conncontroller.go b/pkg/remote/conncontroller/conncontroller.go index 31fbe84586..9738048450 100644 --- a/pkg/remote/conncontroller/conncontroller.go +++ b/pkg/remote/conncontroller/conncontroller.go @@ -25,6 +25,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/userinput" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -556,6 +557,12 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wconfig.ConnKeyword telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"ssh:connecterror": 1}, }, "ssh-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connecterror", + Props: telemetrydata.TEventProps{ + ConnType: "ssh", + }, + }) } else { conn.Infof(ctx, "successfully connected (wsh:%v)\n\n", conn.WshEnabled.Load()) conn.Status = Status_Connected @@ -566,6 +573,12 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wconfig.ConnKeyword telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"ssh:connect": 1}, }, "ssh-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connect", + Props: telemetrydata.TEventProps{ + ConnType: "ssh", + }, + }) } }) conn.FireConnChangeEvent() diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go index b13a9fcb12..9058aca7a2 100644 --- a/pkg/telemetry/telemetrydata/telemetrydata.go +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -24,6 +24,8 @@ var ValidEventNames = map[string]bool{ "action:createblock": true, "wsh:run": true, "debug:panic": true, + "conn:connect": true, + "conn:connecterror": true, } type TEvent struct { @@ -66,6 +68,7 @@ type TEventProps struct { AiBackendType string `json:"ai:backendtype,omitempty"` WshCmd string `json:"wsh:cmd,omitempty"` WshHadError bool `json:"wsh:haderror,omitempty"` + ConnType string `json:"conn:conntype,omitempty"` UserSet *TEventUserProps `json:"$set,omitempty"` UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` diff --git a/pkg/wslconn/wslconn.go b/pkg/wslconn/wslconn.go index 63d99f1979..7fe6594907 100644 --- a/pkg/wslconn/wslconn.go +++ b/pkg/wslconn/wslconn.go @@ -20,6 +20,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote/conncontroller" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/userinput" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -534,6 +535,12 @@ func (conn *WslConn) Connect(ctx context.Context) error { telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"wsl:connecterror": 1}, }, "wsl-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connecterror", + Props: telemetrydata.TEventProps{ + ConnType: "wsl", + }, + }) } else { conn.Infof(ctx, "successfully connected (wsh:%v)\n\n", conn.WshEnabled.Load()) conn.Status = Status_Connected @@ -544,6 +551,12 @@ func (conn *WslConn) Connect(ctx context.Context) error { telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"wsl:connect": 1}, }, "wsl-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connect", + Props: telemetrydata.TEventProps{ + ConnType: "wsl", + }, + }) } }) conn.FireConnChangeEvent() From 8d67ba47714203a5c61f53f1e90d0b8b575ffb9f Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Feb 2025 11:22:01 -0800 Subject: [PATCH 17/19] udpate batches/batchsize --- frontend/types/gotypes.d.ts | 1 + pkg/wcloud/wcloud.go | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index e199514580..9359a98716 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -765,6 +765,7 @@ declare global { "ai:backendtype"?: string; "wsh:cmd"?: string; "wsh:haderror"?: boolean; + "conn:conntype"?: string; $set?: TEventUserProps; $set_once?: TEventUserProps; }; diff --git a/pkg/wcloud/wcloud.go b/pkg/wcloud/wcloud.go index b7743aa86d..dfed44098f 100644 --- a/pkg/wcloud/wcloud.go +++ b/pkg/wcloud/wcloud.go @@ -155,7 +155,8 @@ type TEventsInputType struct { Events []*telemetrydata.TEvent `json:"events"` } -const TEventsBatchSize = 100 +const TEventsBatchSize = 200 +const TEventsMaxBatches = 10 // returns (done, num-sent, error) func sendTEventsBatch(clientId string) (bool, int, error) { @@ -202,8 +203,8 @@ func sendTEvents(clientId string) (int, error) { if done { break } - if numIters > 10 { - log.Printf("hit 10 iterations, stopping\n") + if numIters > TEventsMaxBatches { + log.Printf("sendTEvents, hit %d iterations, stopping\n", numIters) break } } From af9d6289b443fb6765164cfeb03dc07f430b5e5f Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Feb 2025 14:00:28 -0800 Subject: [PATCH 18/19] app:displays event --- emain/emain.ts | 26 ++++++++++++++++++++ frontend/types/gotypes.d.ts | 5 ++++ pkg/telemetry/telemetrydata/telemetrydata.go | 7 ++++++ 3 files changed, 38 insertions(+) diff --git a/emain/emain.ts b/emain/emain.ts index 6f1ab40c57..c2037b70c2 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -459,6 +459,31 @@ function getActivityDisplays(): ActivityDisplayType[] { return rtn; } +async function sendDisplaysTDataEvent() { + const displays = getActivityDisplays(); + if (displays.length === 0) { + return; + } + const props: TEventProps = {}; + props["display:count"] = displays.length; + props["display:height"] = displays[0].height; + props["display:width"] = displays[0].width; + props["display:dpr"] = displays[0].dpr; + props["display:all"] = displays; + try { + await RpcApi.RecordTEventCommand( + ElectronWshClient, + { + event: "app:display", + props, + }, + { noresponse: true } + ); + } catch (e) { + console.log("error sending display tdata event", e); + } +} + function logActiveState() { fireAndForget(async () => { const astate = getActivityState(); @@ -633,6 +658,7 @@ async function appMain() { await relaunchBrowserWindows(); await initDocsite(); setTimeout(runActiveTimer, 5000); // start active timer, wait 5s just to be safe + setTimeout(sendDisplaysTDataEvent, 5000); makeAppMenu(); makeDockTaskbar(); diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 9359a98716..f0a733aba5 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -766,6 +766,11 @@ declare global { "wsh:cmd"?: string; "wsh:haderror"?: boolean; "conn:conntype"?: string; + "display:height"?: number; + "display:width"?: number; + "display:dpr"?: number; + "display:count"?: number; + "display:all"?: any; $set?: TEventUserProps; $set_once?: TEventUserProps; }; diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go index 9058aca7a2..0991b5db91 100644 --- a/pkg/telemetry/telemetrydata/telemetrydata.go +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -17,6 +17,7 @@ var ValidEventNames = map[string]bool{ "app:startup": true, "app:shutdown": true, "app:activity": true, + "app:display": true, "action:magnify": true, "action:settabtheme": true, "action:runaicmd": true, @@ -70,6 +71,12 @@ type TEventProps struct { WshHadError bool `json:"wsh:haderror,omitempty"` ConnType string `json:"conn:conntype,omitempty"` + DisplayHeight int `json:"display:height,omitempty"` + DisplayWidth int `json:"display:width,omitempty"` + DisplayDPR float64 `json:"display:dpr,omitempty"` + DisplayCount int `json:"display:count,omitempty"` + DisplayAll interface{} `json:"display:all,omitempty"` + UserSet *TEventUserProps `json:"$set,omitempty"` UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` } From fbb2e770b24daeed342be27d85a4d4658f65fa91 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Feb 2025 14:42:18 -0800 Subject: [PATCH 19/19] app:counts event --- cmd/server/main-server.go | 44 +++++++++++++++++++- frontend/types/gotypes.d.ts | 7 ++++ pkg/telemetry/telemetrydata/telemetrydata.go | 9 ++++ pkg/util/utilfn/compare.go | 20 +++++++++ 4 files changed, 79 insertions(+), 1 deletion(-) diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index c66a392952..076fedd451 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -25,6 +25,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/sigutil" + "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wcloud" @@ -47,6 +48,8 @@ var BuildTime = "0" const InitialTelemetryWait = 10 * time.Second const TelemetryTick = 2 * time.Minute const TelemetryInterval = 4 * time.Hour +const TelemetryInitialCountsWait = 5 * time.Second +const TelemetryCountsInterval = 1 * time.Hour var shutdownOnce sync.Once @@ -131,6 +134,44 @@ func sendTelemetryWrapper() { } } +func updateTelemetryCounts(lastCounts telemetrydata.TEventProps) telemetrydata.TEventProps { + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + var props telemetrydata.TEventProps + props.CountBlocks, _ = wstore.DBGetCount[*waveobj.Block](ctx) + props.CountTabs, _ = wstore.DBGetCount[*waveobj.Tab](ctx) + props.CountWindows, _ = wstore.DBGetCount[*waveobj.Window](ctx) + props.CountWorkspaces, _, _ = wstore.DBGetWSCounts(ctx) + props.CountSSHConn = conncontroller.GetNumSSHHasConnected() + props.CountWSLConn = wslconn.GetNumWSLHasConnected() + props.CountViews, _ = wstore.DBGetBlockViewCounts(ctx) + if utilfn.CompareAsMarshaledJson(props, lastCounts) { + return lastCounts + } + tevent := telemetrydata.MakeTEvent("app:counts", props) + err := telemetry.RecordTEvent(ctx, tevent) + if err != nil { + log.Printf("error recording counts tevent: %v\n", err) + } + return props +} + +func updateTelemetryCountsLoop() { + defer func() { + panichandler.PanicHandler("updateTelemetryCountsLoop", recover()) + }() + var nextSend int64 + var lastCounts telemetrydata.TEventProps + time.Sleep(TelemetryInitialCountsWait) + for { + if time.Now().Unix() > nextSend { + nextSend = time.Now().Add(TelemetryCountsInterval).Unix() + lastCounts = updateTelemetryCounts(lastCounts) + } + time.Sleep(TelemetryTick) + } +} + func beforeSendActivityUpdate(ctx context.Context) { activity := wshrpc.ActivityUpdate{} activity.NumTabs, _ = wstore.DBGetCount[*waveobj.Tab](ctx) @@ -318,9 +359,10 @@ func main() { createMainWshClient() sigutil.InstallShutdownSignalHandlers(doShutdown) sigutil.InstallSIGUSR1Handler() + startConfigWatcher() go stdinReadWatch() go telemetryLoop() - startConfigWatcher() + go updateTelemetryCountsLoop() startupActivityUpdate() // must be after startConfigWatcher() blocklogger.InitBlockLogger() diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index f0a733aba5..d8b5e0aa5b 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -771,6 +771,13 @@ declare global { "display:dpr"?: number; "display:count"?: number; "display:all"?: any; + "count:blocks"?: number; + "count:tabs"?: number; + "count:windows"?: number; + "count:workspaces"?: number; + "count:sshconn"?: number; + "count:wslconn"?: number; + "count:views"?: {[key: string]: number}; $set?: TEventUserProps; $set_once?: TEventUserProps; }; diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go index 0991b5db91..1370a00ba8 100644 --- a/pkg/telemetry/telemetrydata/telemetrydata.go +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -18,6 +18,7 @@ var ValidEventNames = map[string]bool{ "app:shutdown": true, "app:activity": true, "app:display": true, + "app:counts": true, "action:magnify": true, "action:settabtheme": true, "action:runaicmd": true, @@ -77,6 +78,14 @@ type TEventProps struct { DisplayCount int `json:"display:count,omitempty"` DisplayAll interface{} `json:"display:all,omitempty"` + CountBlocks int `json:"count:blocks,omitempty"` + CountTabs int `json:"count:tabs,omitempty"` + CountWindows int `json:"count:windows,omitempty"` + CountWorkspaces int `json:"count:workspaces,omitempty"` + CountSSHConn int `json:"count:sshconn,omitempty"` + CountWSLConn int `json:"count:wslconn,omitempty"` + CountViews map[string]int `json:"count:views,omitempty"` + UserSet *TEventUserProps `json:"$set,omitempty"` UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` } diff --git a/pkg/util/utilfn/compare.go b/pkg/util/utilfn/compare.go index ea3b20758f..c4684f9ac5 100644 --- a/pkg/util/utilfn/compare.go +++ b/pkg/util/utilfn/compare.go @@ -4,9 +4,29 @@ package utilfn import ( + "bytes" + "encoding/json" "reflect" ) +func CompareAsMarshaledJson(a, b any) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + barrA, err := json.Marshal(a) + if err != nil { + return false + } + barrB, err := json.Marshal(b) + if err != nil { + return false + } + return bytes.Equal(barrA, barrB) +} + // this is a shallow equal, but with special handling for numeric types // it will up convert to float64 and compare func JsonValEqual(a, b any) bool {