diff --git a/ROADMAP.md b/ROADMAP.md index ec0a43808..a4fab2951 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -38,7 +38,7 @@ Targeting 1/31/25 ## v0.12 -Targeting mid-February (more will get added before work on v0.12 kicks off) +Targeting mid-February. - 🔷 Import/Export Tab Layouts and Widgets - 🔷 log viewer diff --git a/Taskfile.yml b/Taskfile.yml index 7be83f6c4..e4c3b59ae 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -26,8 +26,8 @@ tasks: - docsite:build:embedded - build:backend env: - WCLOUD_ENDPOINT: "https://ot2e112zx5.execute-api.us-west-2.amazonaws.com/dev" - WCLOUD_WS_ENDPOINT: "wss://5lfzlg5crl.execute-api.us-west-2.amazonaws.com/dev/" + WCLOUD_ENDPOINT: "https://api-dev.waveterm.dev/central" + WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.waveterm.dev/" electron:start: desc: Run the Electron application directly. @@ -39,8 +39,8 @@ tasks: - docsite:build:embedded - build:backend env: - WCLOUD_ENDPOINT: "https://ot2e112zx5.execute-api.us-west-2.amazonaws.com/dev" - WCLOUD_WS_ENDPOINT: "wss://5lfzlg5crl.execute-api.us-west-2.amazonaws.com/dev/" + WCLOUD_ENDPOINT: "https://api-dev.waveterm.dev" + WCLOUD_WS_ENDPOINT: "wss://wsapi-dev.waveterm.dev" storybook: desc: Start the Storybook server. diff --git a/cmd/generatego/main-generatego.go b/cmd/generatego/main-generatego.go index 62e65d4f7..9116fa551 100644 --- a/cmd/generatego/main-generatego.go +++ b/cmd/generatego/main-generatego.go @@ -24,6 +24,7 @@ func GenerateWshClient() error { fmt.Fprintf(os.Stderr, "generating wshclient file to %s\n", WshClientFileName) var buf strings.Builder gogen.GenerateBoilerplate(&buf, "wshclient", []string{ + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata", "github.com/wavetermdev/waveterm/pkg/wshutil", "github.com/wavetermdev/waveterm/pkg/wshrpc", "github.com/wavetermdev/waveterm/pkg/wconfig", diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index cd0d8640f..076fedd45 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -22,8 +22,10 @@ import ( "github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs" "github.com/wavetermdev/waveterm/pkg/service" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/sigutil" + "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wcloud" @@ -46,6 +48,8 @@ var BuildTime = "0" const InitialTelemetryWait = 10 * time.Second const TelemetryTick = 2 * time.Minute const TelemetryInterval = 4 * time.Hour +const TelemetryInitialCountsWait = 5 * time.Second +const TelemetryCountsInterval = 1 * time.Hour var shutdownOnce sync.Once @@ -82,7 +86,7 @@ func stdinReadWatch() { } } -func configWatcher() { +func startConfigWatcher() { watcher := wconfig.GetWatcher() if watcher != nil { watcher.Start() @@ -101,19 +105,22 @@ func telemetryLoop() { } } -func panicTelemetryHandler() { +func panicTelemetryHandler(panicName string) { activity := wshrpc.ActivityUpdate{NumPanics: 1} err := telemetry.UpdateActivity(context.Background(), activity) if err != nil { log.Printf("error updating activity (panicTelemetryHandler): %v\n", err) } + telemetry.RecordTEvent(context.Background(), telemetrydata.MakeTEvent("debug:panic", telemetrydata.TEventProps{ + PanicType: panicName, + })) } func sendTelemetryWrapper() { defer func() { panichandler.PanicHandler("sendTelemetryWrapper", recover()) }() - ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() beforeSendActivityUpdate(ctx) client, err := wstore.DBGetSingleton[*waveobj.Client](ctx) @@ -121,12 +128,50 @@ func sendTelemetryWrapper() { log.Printf("[error] getting client data for telemetry: %v\n", err) return } - err = wcloud.SendTelemetry(ctx, client.OID) + err = wcloud.SendAllTelemetry(ctx, client.OID) if err != nil { log.Printf("[error] sending telemetry: %v\n", err) } } +func updateTelemetryCounts(lastCounts telemetrydata.TEventProps) telemetrydata.TEventProps { + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + var props telemetrydata.TEventProps + props.CountBlocks, _ = wstore.DBGetCount[*waveobj.Block](ctx) + props.CountTabs, _ = wstore.DBGetCount[*waveobj.Tab](ctx) + props.CountWindows, _ = wstore.DBGetCount[*waveobj.Window](ctx) + props.CountWorkspaces, _, _ = wstore.DBGetWSCounts(ctx) + props.CountSSHConn = conncontroller.GetNumSSHHasConnected() + props.CountWSLConn = wslconn.GetNumWSLHasConnected() + props.CountViews, _ = wstore.DBGetBlockViewCounts(ctx) + if utilfn.CompareAsMarshaledJson(props, lastCounts) { + return lastCounts + } + tevent := telemetrydata.MakeTEvent("app:counts", props) + err := telemetry.RecordTEvent(ctx, tevent) + if err != nil { + log.Printf("error recording counts tevent: %v\n", err) + } + return props +} + +func updateTelemetryCountsLoop() { + defer func() { + panichandler.PanicHandler("updateTelemetryCountsLoop", recover()) + }() + var nextSend int64 + var lastCounts telemetrydata.TEventProps + time.Sleep(TelemetryInitialCountsWait) + for { + if time.Now().Unix() > nextSend { + nextSend = time.Now().Add(TelemetryCountsInterval).Unix() + lastCounts = updateTelemetryCounts(lastCounts) + } + time.Sleep(TelemetryTick) + } +} + func beforeSendActivityUpdate(ctx context.Context) { activity := wshrpc.ActivityUpdate{} activity.NumTabs, _ = wstore.DBGetCount[*waveobj.Tab](ctx) @@ -150,6 +195,26 @@ func startupActivityUpdate() { if err != nil { log.Printf("error updating startup activity: %v\n", err) } + autoUpdateChannel := telemetry.AutoUpdateChannel() + autoUpdateEnabled := telemetry.IsAutoUpdateEnabled() + tevent := telemetrydata.MakeTEvent("app:startup", telemetrydata.TEventProps{ + UserSet: &telemetrydata.TEventUserProps{ + ClientVersion: "v" + WaveVersion, + ClientBuildTime: BuildTime, + ClientArch: wavebase.ClientArch(), + ClientOSRelease: wavebase.UnameKernelRelease(), + ClientIsDev: wavebase.IsDevMode(), + AutoUpdateChannel: autoUpdateChannel, + AutoUpdateEnabled: autoUpdateEnabled, + }, + UserSetOnce: &telemetrydata.TEventUserProps{ + ClientInitialVersion: "v" + WaveVersion, + }, + }) + err = telemetry.RecordTEvent(ctx, tevent) + if err != nil { + log.Printf("error recording startup event: %v\n", err) + } } func shutdownActivityUpdate() { @@ -160,6 +225,15 @@ func shutdownActivityUpdate() { if err != nil { log.Printf("error updating shutdown activity: %v\n", err) } + err = telemetry.TruncateActivityTEventForShutdown(ctx) + if err != nil { + log.Printf("error truncating activity t-event for shutdown: %v\n", err) + } + tevent := telemetrydata.MakeTEvent("app:shutdown", telemetrydata.TEventProps{}) + err = telemetry.RecordTEvent(ctx, tevent) + if err != nil { + log.Printf("error recording shutdown event: %v\n", err) + } } func createMainWshClient() { @@ -283,15 +357,15 @@ func main() { } createMainWshClient() - sigutil.InstallShutdownSignalHandlers(doShutdown) sigutil.InstallSIGUSR1Handler() - - startupActivityUpdate() + startConfigWatcher() go stdinReadWatch() go telemetryLoop() - configWatcher() + go updateTelemetryCountsLoop() + startupActivityUpdate() // must be after startConfigWatcher() blocklogger.InitBlockLogger() + webListener, err := web.MakeTCPListener("web") if err != nil { log.Printf("error creating web listener: %v\n", err) diff --git a/cmd/wsh/cmd/wshcmd-debug.go b/cmd/wsh/cmd/wshcmd-debug.go index 29a185768..9efac0ff8 100644 --- a/cmd/wsh/cmd/wshcmd-debug.go +++ b/cmd/wsh/cmd/wshcmd-debug.go @@ -24,11 +24,24 @@ var debugBlockIdsCmd = &cobra.Command{ Hidden: true, } +var debugSendTelemetryCmd = &cobra.Command{ + Use: "send-telemetry", + Short: "send telemetry", + RunE: debugSendTelemetryRun, + Hidden: true, +} + func init() { debugCmd.AddCommand(debugBlockIdsCmd) + debugCmd.AddCommand(debugSendTelemetryCmd) rootCmd.AddCommand(debugCmd) } +func debugSendTelemetryRun(cmd *cobra.Command, args []string) error { + err := wshclient.SendTelemetryCommand(RpcClient, nil) + return err +} + func debugBlockIdsRun(cmd *cobra.Command, args []string) error { oref, err := resolveBlockArg() if err != nil { diff --git a/cmd/wsh/cmd/wshcmd-token.go b/cmd/wsh/cmd/wshcmd-token.go index 47b2381c8..2660c1250 100644 --- a/cmd/wsh/cmd/wshcmd-token.go +++ b/cmd/wsh/cmd/wshcmd-token.go @@ -22,9 +22,6 @@ func init() { } func tokenCmdRun(cmd *cobra.Command, args []string) (rtnErr error) { - defer func() { - sendActivity("token", rtnErr == nil) - }() if len(args) != 2 { OutputHelpMessage(cmd) return fmt.Errorf("wsh token requires exactly 2 arguments, got %d", len(args)) diff --git a/db/migrations-wstore/000007_events.down.sql b/db/migrations-wstore/000007_events.down.sql new file mode 100644 index 000000000..7acba0115 --- /dev/null +++ b/db/migrations-wstore/000007_events.down.sql @@ -0,0 +1 @@ +DROP TABLE db_tevent; diff --git a/db/migrations-wstore/000007_events.up.sql b/db/migrations-wstore/000007_events.up.sql new file mode 100644 index 000000000..3c6311960 --- /dev/null +++ b/db/migrations-wstore/000007_events.up.sql @@ -0,0 +1,8 @@ +CREATE TABLE db_tevent ( + uuid varchar(36) PRIMARY KEY, + ts int NOT NULL, + tslocal varchar(100) NOT NULL, + event varchar(50) NOT NULL, + props json NOT NULL, + uploaded boolean NOT NULL DEFAULT 0 +); \ No newline at end of file diff --git a/docs/docs/telemetry.mdx b/docs/docs/telemetry.mdx index 6715ca01c..9b33c4ba7 100644 --- a/docs/docs/telemetry.mdx +++ b/docs/docs/telemetry.mdx @@ -96,6 +96,15 @@ Lastly, some data is sent along with the telemetry that describes how to classif | AutoUpdateChannel | The type of auto update in use. This specifically refers to whether a latest or beta channel is selected. | | CurDay | The current day (in your time zone) when telemetry is sent. It does not include the time of day. | +## Geo Data + +We do not store IP addresses in our telemetry table. However, CloudFlare passes us Geo-Location headers. We store these two header values: + +| Name | Description | +| ------------ | ----------------------------------------------------------------- | +| CFCountry | 2-letter country code (e.g. "US", "FR", or "JP") | +| CFRegionCode | region code (often a provence, region, or state within a country) | + --- ## When Telemetry is Turned Off diff --git a/emain/emain.ts b/emain/emain.ts index 41ea190c6..c2037b70c 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -459,6 +459,31 @@ function getActivityDisplays(): ActivityDisplayType[] { return rtn; } +async function sendDisplaysTDataEvent() { + const displays = getActivityDisplays(); + if (displays.length === 0) { + return; + } + const props: TEventProps = {}; + props["display:count"] = displays.length; + props["display:height"] = displays[0].height; + props["display:width"] = displays[0].width; + props["display:dpr"] = displays[0].dpr; + props["display:all"] = displays; + try { + await RpcApi.RecordTEventCommand( + ElectronWshClient, + { + event: "app:display", + props, + }, + { noresponse: true } + ); + } catch (e) { + console.log("error sending display tdata event", e); + } +} + function logActiveState() { fireAndForget(async () => { const astate = getActivityState(); @@ -472,6 +497,18 @@ function logActiveState() { activity.displays = getActivityDisplays(); try { await RpcApi.ActivityCommand(ElectronWshClient, activity, { noresponse: true }); + await RpcApi.RecordTEventCommand( + ElectronWshClient, + { + event: "app:activity", + props: { + "activity:activeminutes": activity.activeminutes, + "activity:fgminutes": activity.fgminutes, + "activity:openminutes": activity.openminutes, + }, + }, + { noresponse: true } + ); } catch (e) { console.log("error logging active state", e); } finally { @@ -621,6 +658,7 @@ async function appMain() { await relaunchBrowserWindows(); await initDocsite(); setTimeout(runActiveTimer, 5000); // start active timer, wait 5s just to be safe + setTimeout(sendDisplaysTDataEvent, 5000); makeAppMenu(); makeDockTaskbar(); diff --git a/frontend/app/block/blockframe.tsx b/frontend/app/block/blockframe.tsx index e519abde8..fe8e15b9b 100644 --- a/frontend/app/block/blockframe.tsx +++ b/frontend/app/block/blockframe.tsx @@ -12,6 +12,7 @@ import { getConnStatusAtom, getSettingsKeyAtom, globalStore, + recordTEvent, useBlockAtom, WOS, } from "@/app/store/global"; @@ -182,6 +183,7 @@ const BlockFrame_Header = ({ return; } RpcApi.ActivityCommand(TabRpcClient, { nummagnify: 1 }); + recordTEvent("action:magnify", { "block:view": viewName }); }, [magnified]); if (blockData?.meta?.["frame:title"]) { diff --git a/frontend/app/store/global.ts b/frontend/app/store/global.ts index e80c3e7bc..dcd161cf8 100644 --- a/frontend/app/store/global.ts +++ b/frontend/app/store/global.ts @@ -1,6 +1,8 @@ // Copyright 2025, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 +import { RpcApi } from "@/app/store/wshclientapi"; +import { TabRpcClient } from "@/app/store/wshrpcutil"; import { getLayoutModelForTabById, LayoutTreeActionType, @@ -667,6 +669,13 @@ function setActiveTab(tabId: string) { getApi().setActiveTab(tabId); } +function recordTEvent(event: string, props?: TEventProps) { + if (props == null) { + props = {}; + } + RpcApi.RecordTEventCommand(TabRpcClient, { event, props }, { noresponse: true }); +} + export { atoms, counterInc, @@ -695,6 +704,7 @@ export { PLATFORM, pushFlashError, pushNotification, + recordTEvent, refocusNode, registerBlockComponentModel, removeFlashError, diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts index 9c614af4e..dd1a7a630 100644 --- a/frontend/app/store/wshclientapi.ts +++ b/frontend/app/store/wshclientapi.ts @@ -252,6 +252,11 @@ class RpcApiType { return client.wshRpcCall("path", data, opts); } + // command "recordtevent" [call] + RecordTEventCommand(client: WshClient, data: TEvent, opts?: RpcOpts): Promise { + return client.wshRpcCall("recordtevent", data, opts); + } + // command "remotefilecopy" [call] RemoteFileCopyCommand(client: WshClient, data: CommandRemoteFileCopyData, opts?: RpcOpts): Promise { return client.wshRpcCall("remotefilecopy", data, opts); @@ -337,6 +342,11 @@ class RpcApiType { return client.wshRpcCall("routeunannounce", null, opts); } + // command "sendtelemetry" [call] + SendTelemetryCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("sendtelemetry", null, opts); + } + // command "setconfig" [call] SetConfigCommand(client: WshClient, data: SettingsType, opts?: RpcOpts): Promise { return client.wshRpcCall("setconfig", data, opts); diff --git a/frontend/app/tab/tab.tsx b/frontend/app/tab/tab.tsx index d253dd7d0..2024b06c3 100644 --- a/frontend/app/tab/tab.tsx +++ b/frontend/app/tab/tab.tsx @@ -1,7 +1,7 @@ // Copyright 2025, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -import { atoms, globalStore, refocusNode } from "@/app/store/global"; +import { atoms, globalStore, recordTEvent, refocusNode } from "@/app/store/global"; import { RpcApi } from "@/app/store/wshclientapi"; import { TabRpcClient } from "@/app/store/wshrpcutil"; import { Button } from "@/element/button"; @@ -183,7 +183,8 @@ const Tab = memo( click: () => fireAndForget(async () => { await ObjectService.UpdateObjectMeta(oref, preset); - await RpcApi.ActivityCommand(TabRpcClient, { settabtheme: 1 }); + RpcApi.ActivityCommand(TabRpcClient, { settabtheme: 1 }, { noresponse: true }); + recordTEvent("action:settabtheme"); }), }); } diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index b4e6d62d6..a2e51d8a9 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -757,6 +757,57 @@ declare global { allscopes?: boolean; }; + // telemetrydata.TEvent + type TEvent = { + uuid?: string; + ts?: number; + tslocal?: string; + event: string; + props: TEventProps; + }; + + // telemetrydata.TEventProps + type TEventProps = { + "activity:activeminutes"?: number; + "activity:fgminutes"?: number; + "activity:openminutes"?: number; + "action:initiator"?: "keyboard" | "mouse"; + "debug:panictype"?: string; + "block:view"?: string; + "ai:backendtype"?: string; + "wsh:cmd"?: string; + "wsh:haderror"?: boolean; + "conn:conntype"?: string; + "display:height"?: number; + "display:width"?: number; + "display:dpr"?: number; + "display:count"?: number; + "display:all"?: any; + "count:blocks"?: number; + "count:tabs"?: number; + "count:windows"?: number; + "count:workspaces"?: number; + "count:sshconn"?: number; + "count:wslconn"?: number; + "count:views"?: {[key: string]: number}; + $set?: TEventUserProps; + $set_once?: TEventUserProps; + }; + + // telemetrydata.TEventUserProps + type TEventUserProps = { + "client:arch"?: string; + "client:version"?: string; + "client:initial_version"?: string; + "client:buildtime"?: string; + "client:osrelease"?: string; + "client:isdev"?: boolean; + "autoupdate:channel"?: string; + "autoupdate:enabled"?: boolean; + "loc:countrycode"?: string; + "loc:regioncode"?: string; + }; + // waveobj.Tab type Tab = WaveObj & { name: string; diff --git a/pkg/panichandler/panichandler.go b/pkg/panichandler/panichandler.go index deb7441f7..8a71c7976 100644 --- a/pkg/panichandler/panichandler.go +++ b/pkg/panichandler/panichandler.go @@ -11,7 +11,7 @@ import ( // to log NumPanics into the local telemetry system // gets around import cycles -var PanicTelemetryHandler func() +var PanicTelemetryHandler func(panicType string) func PanicHandlerNoTelemetry(debugStr string, recoverVal any) { if recoverVal == nil { @@ -30,8 +30,10 @@ func PanicHandler(debugStr string, recoverVal any) error { debug.PrintStack() if PanicTelemetryHandler != nil { go func() { - defer PanicHandlerNoTelemetry("PanicTelemetryHandler", recover()) - PanicTelemetryHandler() + defer func() { + PanicHandlerNoTelemetry("PanicTelemetryHandler", recover()) + }() + PanicTelemetryHandler(debugStr) }() } if err, ok := recoverVal.(error); ok { diff --git a/pkg/remote/conncontroller/conncontroller.go b/pkg/remote/conncontroller/conncontroller.go index 31fbe8458..973804845 100644 --- a/pkg/remote/conncontroller/conncontroller.go +++ b/pkg/remote/conncontroller/conncontroller.go @@ -25,6 +25,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/userinput" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -556,6 +557,12 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wconfig.ConnKeyword telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"ssh:connecterror": 1}, }, "ssh-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connecterror", + Props: telemetrydata.TEventProps{ + ConnType: "ssh", + }, + }) } else { conn.Infof(ctx, "successfully connected (wsh:%v)\n\n", conn.WshEnabled.Load()) conn.Status = Status_Connected @@ -566,6 +573,12 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wconfig.ConnKeyword telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"ssh:connect": 1}, }, "ssh-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connect", + Props: telemetrydata.TEventProps{ + ConnType: "ssh", + }, + }) } }) conn.FireConnChangeEvent() diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 778762c9b..45fbf0029 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -6,12 +6,17 @@ package telemetry import ( "context" "database/sql/driver" + "encoding/json" + "fmt" "log" "time" + "github.com/google/uuid" "github.com/wavetermdev/waveterm/pkg/panichandler" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/daystr" "github.com/wavetermdev/waveterm/pkg/util/dbutil" + "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/wconfig" "github.com/wavetermdev/waveterm/pkg/wshrpc" @@ -19,6 +24,7 @@ import ( ) const MaxTzNameLen = 50 +const ActivityEventName = "app:activity" type ActivityType struct { Day string `json:"day"` @@ -83,7 +89,9 @@ func AutoUpdateChannel() string { // Wraps UpdateCurrentActivity, spawns goroutine, and logs errors func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { go func() { - defer panichandler.PanicHandlerNoTelemetry("GoUpdateActivityWrap", recover()) + defer func() { + panichandler.PanicHandlerNoTelemetry("GoUpdateActivityWrap", recover()) + }() ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() err := UpdateActivity(ctx, update) @@ -94,6 +102,156 @@ func GoUpdateActivityWrap(update wshrpc.ActivityUpdate, debugStr string) { }() } +func insertTEvent(ctx context.Context, event *telemetrydata.TEvent) error { + if event.Uuid == "" { + return fmt.Errorf("cannot insert TEvent: uuid is empty") + } + if event.Ts == 0 { + return fmt.Errorf("cannot insert TEvent: ts is 0") + } + if event.TsLocal == "" { + return fmt.Errorf("cannot insert TEvent: tslocal is empty") + } + if event.Event == "" { + return fmt.Errorf("cannot insert TEvent: event is empty") + } + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + query := `INSERT INTO db_tevent (uuid, ts, tslocal, event, props) + VALUES (?, ?, ?, ?, ?)` + tx.Exec(query, event.Uuid, event.Ts, event.TsLocal, event.Event, dbutil.QuickJson(event.Props)) + return nil + }) +} + +// merges newActivity into curActivity, returns curActivity +func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetrydata.TEventProps) { + curActivity.ActiveMinutes += newActivity.ActiveMinutes + curActivity.FgMinutes += newActivity.FgMinutes + curActivity.OpenMinutes += newActivity.OpenMinutes +} + +// ignores the timestamp in tevent, and uses the current time +func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { + eventTs := time.Now() + // compute to hour boundary, and round up to next hour + eventTs = eventTs.Truncate(time.Hour).Add(time.Hour) + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + // find event that matches this timestamp with event name "app:activity" + var hasRow bool + var curActivity telemetrydata.TEventProps + uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) + if uuidStr != "" { + hasRow = true + rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE uuid = ?`, uuidStr) + err := json.Unmarshal([]byte(rawProps), &curActivity) + if err != nil { + // ignore, curActivity will just be 0 + log.Printf("error unmarshalling activity props: %v\n", err) + } + } + mergeActivity(&curActivity, tevent.Props) + if hasRow { + query := `UPDATE db_tevent SET props = ? WHERE uuid = ?` + tx.Exec(query, dbutil.QuickJson(curActivity), uuidStr) + } else { + query := `INSERT INTO db_tevent (uuid, ts, tslocal, event, props) VALUES (?, ?, ?, ?, ?)` + tsLocal := utilfn.ConvertToWallClockPT(eventTs).Format(time.RFC3339) + tx.Exec(query, uuid.New().String(), eventTs.UnixMilli(), tsLocal, ActivityEventName, dbutil.QuickJson(curActivity)) + } + return nil + }) +} + +func TruncateActivityTEventForShutdown(ctx context.Context) error { + nowTs := time.Now() + eventTs := nowTs.Truncate(time.Hour).Add(time.Hour) + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + // find event that matches this timestamp with event name "app:activity" + uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName) + if uuidStr == "" { + return nil + } + // we're going to update this app:activity event back to nowTs + tsLocal := utilfn.ConvertToWallClockPT(nowTs).Format(time.RFC3339) + query := `UPDATE db_tevent SET ts = ?, tslocal = ? WHERE uuid = ?` + tx.Exec(query, nowTs.UnixMilli(), tsLocal, uuidStr) + return nil + }) +} + +func GoRecordTEventWrap(tevent *telemetrydata.TEvent) { + if tevent == nil || tevent.Event == "" { + return + } + go func() { + defer func() { + panichandler.PanicHandlerNoTelemetry("GoRecordTEventWrap", recover()) + }() + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + err := RecordTEvent(ctx, tevent) + if err != nil { + // ignore error, just log, since this is not critical + log.Printf("error recording %q telemetry event: %v\n", tevent.Event, err) + } + }() +} + +func RecordTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error { + if tevent == nil { + return nil + } + if tevent.Uuid == "" { + tevent.Uuid = uuid.New().String() + } + err := tevent.Validate(true) + if err != nil { + return err + } + tevent.EnsureTimestamps() + if tevent.Event == ActivityEventName { + return updateActivityTEvent(ctx, tevent) + } + return insertTEvent(ctx, tevent) +} + +func CleanOldTEvents(ctx context.Context) error { + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + // delete events older than 28 days + query := `DELETE FROM db_tevent WHERE ts < ?` + olderThan := time.Now().AddDate(0, 0, -28).UnixMilli() + tx.Exec(query, olderThan) + return nil + }) +} + +func GetNonUploadedTEvents(ctx context.Context, maxEvents int) ([]*telemetrydata.TEvent, error) { + now := time.Now() + return wstore.WithTxRtn(ctx, func(tx *wstore.TxWrap) ([]*telemetrydata.TEvent, error) { + var rtn []*telemetrydata.TEvent + query := `SELECT uuid, ts, tslocal, event, props, uploaded FROM db_tevent WHERE uploaded = 0 AND ts <= ? ORDER BY ts LIMIT ?` + tx.Select(&rtn, query, now.UnixMilli(), maxEvents) + for _, event := range rtn { + if err := event.ConvertRawJSON(); err != nil { + return nil, fmt.Errorf("scan json for event %s: %w", event.Uuid, err) + } + } + return rtn, nil + }) +} + +func MarkTEventsAsUploaded(ctx context.Context, events []*telemetrydata.TEvent) error { + return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { + ids := make([]string, 0, len(events)) + for _, event := range events { + ids = append(ids, event.Uuid) + } + query := `UPDATE db_tevent SET uploaded = 1 WHERE uuid IN (SELECT value FROM json_each(?))` + tx.Exec(query, dbutil.QuickJson(ids)) + return nil + }) +} + func UpdateActivity(ctx context.Context, update wshrpc.ActivityUpdate) error { now := time.Now() dayStr := daystr.GetCurDayStr() diff --git a/pkg/telemetry/telemetrydata/telemetrydata.go b/pkg/telemetry/telemetrydata/telemetrydata.go new file mode 100644 index 000000000..1370a00ba --- /dev/null +++ b/pkg/telemetry/telemetrydata/telemetrydata.go @@ -0,0 +1,200 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package telemetrydata + +import ( + "encoding/json" + "fmt" + "regexp" + "time" + + "github.com/google/uuid" + "github.com/wavetermdev/waveterm/pkg/util/utilfn" +) + +var ValidEventNames = map[string]bool{ + "app:startup": true, + "app:shutdown": true, + "app:activity": true, + "app:display": true, + "app:counts": true, + "action:magnify": true, + "action:settabtheme": true, + "action:runaicmd": true, + "action:createtab": true, + "action:createblock": true, + "wsh:run": true, + "debug:panic": true, + "conn:connect": true, + "conn:connecterror": true, +} + +type TEvent struct { + Uuid string `json:"uuid,omitempty" db:"uuid"` + Ts int64 `json:"ts,omitempty" db:"ts"` + TsLocal string `json:"tslocal,omitempty" db:"tslocal"` // iso8601 format (wall clock converted to PT) + Event string `json:"event" db:"event"` + Props TEventProps `json:"props" db:"-"` // Don't scan directly to map + + // DB fields + Uploaded bool `json:"-" db:"uploaded"` + + // For database scanning + RawProps string `json:"-" db:"props"` +} + +type TEventUserProps struct { + ClientArch string `json:"client:arch,omitempty"` + ClientVersion string `json:"client:version,omitempty"` + ClientInitialVersion string `json:"client:initial_version,omitempty"` + ClientBuildTime string `json:"client:buildtime,omitempty"` + ClientOSRelease string `json:"client:osrelease,omitempty"` + ClientIsDev bool `json:"client:isdev,omitempty"` + AutoUpdateChannel string `json:"autoupdate:channel,omitempty"` + AutoUpdateEnabled bool `json:"autoupdate:enabled,omitempty"` + LocCountryCode string `json:"loc:countrycode,omitempty"` + LocRegionCode string `json:"loc:regioncode,omitempty"` +} + +type TEventProps struct { + TEventUserProps `tstype:"-"` // generally don't need to set these since they will be automatically copied over + + ActiveMinutes int `json:"activity:activeminutes,omitempty"` + FgMinutes int `json:"activity:fgminutes,omitempty"` + OpenMinutes int `json:"activity:openminutes,omitempty"` + + ActionInitiator string `json:"action:initiator,omitempty" tstype:"\"keyboard\" | \"mouse\""` + PanicType string `json:"debug:panictype,omitempty"` + BlockView string `json:"block:view,omitempty"` + AiBackendType string `json:"ai:backendtype,omitempty"` + WshCmd string `json:"wsh:cmd,omitempty"` + WshHadError bool `json:"wsh:haderror,omitempty"` + ConnType string `json:"conn:conntype,omitempty"` + + DisplayHeight int `json:"display:height,omitempty"` + DisplayWidth int `json:"display:width,omitempty"` + DisplayDPR float64 `json:"display:dpr,omitempty"` + DisplayCount int `json:"display:count,omitempty"` + DisplayAll interface{} `json:"display:all,omitempty"` + + CountBlocks int `json:"count:blocks,omitempty"` + CountTabs int `json:"count:tabs,omitempty"` + CountWindows int `json:"count:windows,omitempty"` + CountWorkspaces int `json:"count:workspaces,omitempty"` + CountSSHConn int `json:"count:sshconn,omitempty"` + CountWSLConn int `json:"count:wslconn,omitempty"` + CountViews map[string]int `json:"count:views,omitempty"` + + UserSet *TEventUserProps `json:"$set,omitempty"` + UserSetOnce *TEventUserProps `json:"$set_once,omitempty"` +} + +func MakeTEvent(event string, props TEventProps) *TEvent { + now := time.Now() + // TsLocal gets set in EnsureTimestamps() + return &TEvent{ + Uuid: uuid.New().String(), + Ts: now.UnixMilli(), + Event: event, + Props: props, + } +} + +func MakeUntypedTEvent(event string, propsMap map[string]any) (*TEvent, error) { + if event == "" { + return nil, fmt.Errorf("event name must be non-empty") + } + var props TEventProps + err := utilfn.ReUnmarshal(&props, propsMap) + if err != nil { + return nil, fmt.Errorf("error re-marshalling TEvent props: %w", err) + } + return MakeTEvent(event, props), nil +} + +func (t *TEvent) EnsureTimestamps() { + if t.Ts == 0 { + t.Ts = time.Now().UnixMilli() + } + gtime := time.UnixMilli(t.Ts) + t.TsLocal = utilfn.ConvertToWallClockPT(gtime).Format(time.RFC3339) +} + +func (t *TEvent) UserSetProps() *TEventUserProps { + if t.Props.UserSet == nil { + t.Props.UserSet = &TEventUserProps{} + } + return t.Props.UserSet +} + +func (t *TEvent) UserSetOnceProps() *TEventUserProps { + if t.Props.UserSetOnce == nil { + t.Props.UserSetOnce = &TEventUserProps{} + } + return t.Props.UserSetOnce +} + +func (t *TEvent) ConvertRawJSON() error { + if t.RawProps != "" { + return json.Unmarshal([]byte(t.RawProps), &t.Props) + } + return nil +} + +var eventNameRe = regexp.MustCompile(`^[a-zA-Z0-9.:_/-]+$`) + +// validates a tevent that was just created (not for validating out of the DB, or an uploaded TEvent) +// checks that TS is pretty current (or unset) +func (te *TEvent) Validate(current bool) error { + if te == nil { + return fmt.Errorf("TEvent cannot be nil") + } + if te.Event == "" { + return fmt.Errorf("TEvent.Event cannot be empty") + } + if !eventNameRe.MatchString(te.Event) { + return fmt.Errorf("TEvent.Event invalid: %q", te.Event) + } + if !ValidEventNames[te.Event] { + return fmt.Errorf("TEvent.Event not valid: %q", te.Event) + } + if te.Uuid == "" { + return fmt.Errorf("TEvent.Uuid cannot be empty") + } + _, err := uuid.Parse(te.Uuid) + if err != nil { + return fmt.Errorf("TEvent.Uuid invalid: %v", err) + } + if current { + if te.Ts != 0 { + now := time.Now().UnixMilli() + if te.Ts > now+60000 || te.Ts < now-60000 { + return fmt.Errorf("TEvent.Ts is not current: %d", te.Ts) + } + } + } else { + if te.Ts == 0 { + return fmt.Errorf("TEvent.Ts must be set") + } + if te.TsLocal == "" { + return fmt.Errorf("TEvent.TsLocal must be set") + } + t, err := time.Parse(time.RFC3339, te.TsLocal) + if err != nil { + return fmt.Errorf("TEvent.TsLocal parse error: %v", err) + } + now := time.Now() + if t.Before(now.Add(-30*24*time.Hour)) || t.After(now.Add(2*24*time.Hour)) { + return fmt.Errorf("tslocal out of valid range") + } + } + barr, err := json.Marshal(te.Props) + if err != nil { + return fmt.Errorf("TEvent.Props JSON error: %v", err) + } + if len(barr) > 20000 { + return fmt.Errorf("TEvent.Props too large: %d", len(barr)) + } + return nil +} diff --git a/pkg/util/dbutil/dbutil.go b/pkg/util/dbutil/dbutil.go index cc75416dc..a13a43547 100644 --- a/pkg/util/dbutil/dbutil.go +++ b/pkg/util/dbutil/dbutil.go @@ -11,7 +11,7 @@ import ( "strconv" ) -func QuickSetStr(strVal *string, m map[string]interface{}, name string) { +func QuickSetStr(strVal *string, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -28,7 +28,7 @@ func QuickSetStr(strVal *string, m map[string]interface{}, name string) { *strVal = str } -func QuickSetInt(ival *int, m map[string]interface{}, name string) { +func QuickSetInt(ival *int, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -64,7 +64,7 @@ func QuickSetNullableInt64(ival **int64, m map[string]any, name string) { } } -func QuickSetInt64(ival *int64, m map[string]interface{}, name string) { +func QuickSetInt64(ival *int64, m map[string]any, name string) { v, ok := m[name] if !ok { // leave as zero @@ -82,7 +82,7 @@ func QuickSetInt64(ival *int64, m map[string]interface{}, name string) { } } -func QuickSetBool(bval *bool, m map[string]interface{}, name string) { +func QuickSetBool(bval *bool, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -100,7 +100,7 @@ func QuickSetBool(bval *bool, m map[string]interface{}, name string) { } } -func QuickSetBytes(bval *[]byte, m map[string]interface{}, name string) { +func QuickSetBytes(bval *[]byte, m map[string]any, name string) { v, ok := m[name] if !ok { return @@ -130,7 +130,7 @@ func getByteArr(m map[string]any, name string, def string) ([]byte, bool) { return barr, true } -func QuickSetJson(ptr interface{}, m map[string]interface{}, name string) { +func QuickSetJson(ptr any, m map[string]any, name string) { barr, ok := getByteArr(m, name, "{}") if !ok { return @@ -138,7 +138,7 @@ func QuickSetJson(ptr interface{}, m map[string]interface{}, name string) { json.Unmarshal(barr, ptr) } -func QuickSetNullableJson(ptr interface{}, m map[string]interface{}, name string) { +func QuickSetNullableJson(ptr any, m map[string]any, name string) { barr, ok := getByteArr(m, name, "null") if !ok { return @@ -146,7 +146,7 @@ func QuickSetNullableJson(ptr interface{}, m map[string]interface{}, name string json.Unmarshal(barr, ptr) } -func QuickSetJsonArr(ptr interface{}, m map[string]interface{}, name string) { +func QuickSetJsonArr(ptr any, m map[string]any, name string) { barr, ok := getByteArr(m, name, "[]") if !ok { return @@ -154,7 +154,7 @@ func QuickSetJsonArr(ptr interface{}, m map[string]interface{}, name string) { json.Unmarshal(barr, ptr) } -func CheckNil(v interface{}) bool { +func CheckNil(v any) bool { rv := reflect.ValueOf(v) if !rv.IsValid() { return true @@ -168,7 +168,7 @@ func CheckNil(v interface{}) bool { } } -func QuickNullableJson(v interface{}) string { +func QuickNullableJson(v any) string { if CheckNil(v) { return "null" } @@ -176,7 +176,7 @@ func QuickNullableJson(v interface{}) string { return string(barr) } -func QuickJson(v interface{}) string { +func QuickJson(v any) string { if CheckNil(v) { return "{}" } @@ -184,7 +184,7 @@ func QuickJson(v interface{}) string { return string(barr) } -func QuickJsonBytes(v interface{}) []byte { +func QuickJsonBytes(v any) []byte { if CheckNil(v) { return []byte("{}") } @@ -192,7 +192,7 @@ func QuickJsonBytes(v interface{}) []byte { return barr } -func QuickJsonArr(v interface{}) string { +func QuickJsonArr(v any) string { if CheckNil(v) { return "[]" } @@ -200,7 +200,7 @@ func QuickJsonArr(v interface{}) string { return string(barr) } -func QuickJsonArrBytes(v interface{}) []byte { +func QuickJsonArrBytes(v any) []byte { if CheckNil(v) { return []byte("[]") } @@ -208,7 +208,7 @@ func QuickJsonArrBytes(v interface{}) []byte { return barr } -func QuickScanJson(ptr interface{}, val interface{}) error { +func QuickScanJson(ptr any, val any) error { barrVal, ok := val.([]byte) if !ok { strVal, ok := val.(string) @@ -223,7 +223,7 @@ func QuickScanJson(ptr interface{}, val interface{}) error { return json.Unmarshal(barrVal, ptr) } -func QuickValueJson(v interface{}) (driver.Value, error) { +func QuickValueJson(v any) (driver.Value, error) { if CheckNil(v) { return "{}", nil } @@ -233,3 +233,32 @@ func QuickValueJson(v interface{}) (driver.Value, error) { } return string(barr), nil } + +// on error will return nil unless forceMake is set, in which case it returns make(map[string]any) +func ParseJsonMap(val string, forceMake bool) map[string]any { + var noRtn map[string]any + if forceMake { + noRtn = make(map[string]any) + } + if val == "" { + return noRtn + } + var m map[string]any + err := json.Unmarshal([]byte(val), &m) + if err != nil { + return noRtn + } +return m +} + +func ParseJsonArr[T any](val string) []T { + if val == "" { + return nil + } + var arr []T + err := json.Unmarshal([]byte(val), &arr) + if err != nil { + return nil + } + return arr +} diff --git a/pkg/util/utilfn/compare.go b/pkg/util/utilfn/compare.go index ea3b20758..c4684f9ac 100644 --- a/pkg/util/utilfn/compare.go +++ b/pkg/util/utilfn/compare.go @@ -4,9 +4,29 @@ package utilfn import ( + "bytes" + "encoding/json" "reflect" ) +func CompareAsMarshaledJson(a, b any) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + barrA, err := json.Marshal(a) + if err != nil { + return false + } + barrB, err := json.Marshal(b) + if err != nil { + return false + } + return bytes.Equal(barrA, barrB) +} + // this is a shallow equal, but with special handling for numeric types // it will up convert to float64 and compare func JsonValEqual(a, b any) bool { diff --git a/pkg/util/utilfn/streamtolines.go b/pkg/util/utilfn/streamtolines.go index d9fa3363b..39f2adcc1 100644 --- a/pkg/util/utilfn/streamtolines.go +++ b/pkg/util/utilfn/streamtolines.go @@ -58,7 +58,7 @@ func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]by func StreamToLines(input io.Reader, lineFn func([]byte)) error { var lineBuf lineBuf - readBuf := make([]byte, 16*1024) + readBuf := make([]byte, 64*1024) for { n, err := input.Read(readBuf) streamToLines_processBuf(&lineBuf, readBuf[:n], lineFn) diff --git a/pkg/util/utilfn/utilfn.go b/pkg/util/utilfn/utilfn.go index fb04dd61c..904e8d460 100644 --- a/pkg/util/utilfn/utilfn.go +++ b/pkg/util/utilfn/utilfn.go @@ -31,6 +31,15 @@ import ( ) var HexDigits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'} +var PTLoc *time.Location + +func init() { + loc, err := time.LoadLocation("America/Los_Angeles") + if err != nil { + loc = time.FixedZone("PT", -8*60*60) + } + PTLoc = loc +} func GetStrArr(v interface{}, field string) []string { if v == nil { @@ -76,6 +85,35 @@ func GetBool(v interface{}, field string) bool { return bval } +// converts an int, int64, or float64 to an int64 +// nil or bad type returns 0 +func ConvertInt(val any) int64 { + if val == 0 { + return 0 + } + switch typedVal := val.(type) { + case int: + return int64(typedVal) + case int64: + return typedVal + case float64: + return int64(typedVal) + default: + return 0 + } +} + +func ConvertMap(val any) map[string]any { + if val == nil { + return nil + } + m, ok := val.(map[string]any) + if !ok { + return nil + } + return m +} + var needsQuoteRe = regexp.MustCompile(`[^\w@%:,./=+-]`) // minimum maxlen=6, pass -1 for no max length @@ -926,6 +964,29 @@ func FilterValidArch(arch string) (string, error) { return "", fmt.Errorf("unknown architecture: %s", formatted) } +func ConvertUUIDv4Tov7(uuidv4 string) (string, error) { + // Parse the UUIDv4 + parts := strings.Split(uuidv4, "-") + if len(parts) != 5 { + return "", fmt.Errorf("invalid UUIDv4 format") + } + + // Section 1 and 2: Fixed timestamp for Jan 1, 2024 + section1 := "01823a80" // High 32 bits of the timestamp + section2 := "0000" // Middle 16 bits of the timestamp + + // Section 3: Version (7) and the last 3 bytes of randomness from UUIDv4 + section3 := "7" + parts[2][1:] // Replace the first nibble with '7' for version + + // Section 4 and 5: Copy from the original UUIDv4 + section4 := parts[3] + section5 := parts[4] + + // Combine sections to form UUIDv7 + uuidv7 := fmt.Sprintf("%s-%s-%s-%s-%s", section1, section2, section3, section4, section5) + return uuidv7, nil +} + func TimeoutFromContext(ctx context.Context, defaultTimeout time.Duration) time.Duration { deadline, ok := ctx.Deadline() if !ok { @@ -948,3 +1009,10 @@ func DumpGoRoutineStacks() { n := runtime.Stack(buf, true) os.Stdout.Write(buf[:n]) } + +func ConvertToWallClockPT(t time.Time) time.Time { + year, month, day := t.Date() + hour, min, sec := t.Clock() + pstTime := time.Date(year, month, day, hour, min, sec, 0, PTLoc) + return pstTime +} diff --git a/pkg/waveai/waveai.go b/pkg/waveai/waveai.go index b9198f595..89c9bdfc4 100644 --- a/pkg/waveai/waveai.go +++ b/pkg/waveai/waveai.go @@ -8,6 +8,7 @@ import ( "log" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/wshrpc" ) @@ -63,24 +64,36 @@ func RunAICommand(ctx context.Context, request wshrpc.WaveAIStreamRequest) chan endpoint = "default" } var backend AIBackend + var backendType string if request.Opts.APIType == ApiType_Anthropic { backend = AnthropicBackend{} + backendType = ApiType_Anthropic } else if request.Opts.APIType == ApiType_Perplexity { backend = PerplexityBackend{} + backendType = ApiType_Perplexity } else if request.Opts.APIType == APIType_Google { backend = GoogleBackend{} + backendType = APIType_Google } else if IsCloudAIRequest(request.Opts) { endpoint = "waveterm cloud" request.Opts.APIType = APIType_OpenAI request.Opts.Model = "default" backend = WaveAICloudBackend{} + backendType = "wave" } else { backend = OpenAIBackend{} + backendType = APIType_OpenAI } if backend == nil { log.Printf("no backend found for %s\n", request.Opts.APIType) return nil } + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "action:runaicmd", + Props: telemetrydata.TEventProps{ + AiBackendType: backendType, + }, + }) log.Printf("sending ai chat message to %s endpoint %q using model %s\n", request.Opts.APIType, endpoint, request.Opts.Model) return backend.StreamCompletion(ctx, request) diff --git a/pkg/wcloud/wcloud.go b/pkg/wcloud/wcloud.go index afc607569..dfed44098 100644 --- a/pkg/wcloud/wcloud.go +++ b/pkg/wcloud/wcloud.go @@ -18,6 +18,7 @@ import ( "time" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/daystr" "github.com/wavetermdev/waveterm/pkg/wavebase" ) @@ -43,6 +44,7 @@ const WCloudWebShareUpdateTimeout = 15 * time.Second const MaxUpdatePayloadSize = 1 * (1024 * 1024) const TelemetryUrl = "/telemetry" +const TEventsUrl = "/tevents" const NoTelemetryUrl = "/no-telemetry" const WebShareUpdateUrl = "/auth/web-share-update" @@ -148,11 +150,89 @@ func doRequest(req *http.Request, outputObj interface{}) (*http.Response, error) return resp, nil } -func SendTelemetry(ctx context.Context, clientId string) error { +type TEventsInputType struct { + ClientId string `json:"clientid"` + Events []*telemetrydata.TEvent `json:"events"` +} + +const TEventsBatchSize = 200 +const TEventsMaxBatches = 10 + +// returns (done, num-sent, error) +func sendTEventsBatch(clientId string) (bool, int, error) { + ctx, cancelFn := context.WithTimeout(context.Background(), WCloudDefaultTimeout) + defer cancelFn() + events, err := telemetry.GetNonUploadedTEvents(ctx, TEventsBatchSize) + if err != nil { + return true, 0, fmt.Errorf("cannot get events: %v", err) + } + if len(events) == 0 { + return true, 0, nil + } + log.Printf("[wcloud] sending %d tevents\n", len(events)) + input := TEventsInputType{ + ClientId: clientId, + Events: events, + } + req, err := makeAnonPostReq(ctx, TEventsUrl, input) + if err != nil { + return true, 0, err + } + _, err = doRequest(req, nil) + if err != nil { + return true, 0, err + } + err = telemetry.MarkTEventsAsUploaded(ctx, events) + if err != nil { + return true, 0, fmt.Errorf("error marking activity as uploaded: %v", err) + } + return len(events) < TEventsBatchSize, len(events), nil +} + +func sendTEvents(clientId string) (int, error) { + numIters := 0 + totalEvents := 0 + for { + numIters++ + done, numEvents, err := sendTEventsBatch(clientId) + if err != nil { + log.Printf("error sending telemetry events: %v\n", err) + break + } + totalEvents += numEvents + if done { + break + } + if numIters > TEventsMaxBatches { + log.Printf("sendTEvents, hit %d iterations, stopping\n", numIters) + break + } + } + return totalEvents, nil +} + +func SendAllTelemetry(ctx context.Context, clientId string) error { + defer func() { + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + telemetry.CleanOldTEvents(ctx) + }() if !telemetry.IsTelemetryEnabled() { log.Printf("telemetry disabled, not sending\n") return nil } + _, err := sendTEvents(clientId) + if err != nil { + return err + } + err = sendTelemetry(ctx, clientId) + if err != nil { + return err + } + return nil +} + +func sendTelemetry(ctx context.Context, clientId string) error { activity, err := telemetry.GetNonUploadedActivity(ctx) if err != nil { return fmt.Errorf("cannot get activity: %v", err) diff --git a/pkg/wcore/block.go b/pkg/wcore/block.go index 24786c588..8eb720ab8 100644 --- a/pkg/wcore/block.go +++ b/pkg/wcore/block.go @@ -14,6 +14,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/filestore" "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wps" @@ -106,6 +107,12 @@ func CreateBlock(ctx context.Context, tabId string, blockDef *waveobj.BlockDef, telemetry.UpdateActivity(tctx, wshrpc.ActivityUpdate{ Renderers: map[string]int{blockView: 1}, }) + telemetry.RecordTEvent(tctx, &telemetrydata.TEvent{ + Event: "action:createblock", + Props: telemetrydata.TEventProps{ + BlockView: blockView, + }, + }) }() return blockData, nil } diff --git a/pkg/wcore/workspace.go b/pkg/wcore/workspace.go index f69926c71..5ae340d0f 100644 --- a/pkg/wcore/workspace.go +++ b/pkg/wcore/workspace.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/wavetermdev/waveterm/pkg/eventbus" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/utilfn" "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wconfig" @@ -236,6 +237,9 @@ func CreateTab(ctx context.Context, workspaceId string, tabName string, activate } } telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{NewTab: 1}, "createtab") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "action:createtab", + }) return tab.OID, nil } diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 599cd79c5..ae7125ec8 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -6,6 +6,7 @@ package wshclient import ( + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/wshutil" "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wconfig" @@ -307,6 +308,12 @@ func PathCommand(w *wshutil.WshRpc, data wshrpc.PathCommandData, opts *wshrpc.Rp return resp, err } +// command "recordtevent", wshserver.RecordTEventCommand +func RecordTEventCommand(w *wshutil.WshRpc, data telemetrydata.TEvent, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "recordtevent", data, opts) + return err +} + // command "remotefilecopy", wshserver.RemoteFileCopyCommand func RemoteFileCopyCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteFileCopyData, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "remotefilecopy", data, opts) @@ -405,6 +412,12 @@ func RouteUnannounceCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { return err } +// command "sendtelemetry", wshserver.SendTelemetryCommand +func SendTelemetryCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "sendtelemetry", nil, opts) + return err +} + // command "setconfig", wshserver.SetConfigCommand func SetConfigCommand(w *wshutil.WshRpc, data wshrpc.MetaSettingsType, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "setconfig", data, opts) diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 06de27f5c..09b389fc4 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -13,6 +13,7 @@ import ( "reflect" "github.com/wavetermdev/waveterm/pkg/ijson" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes" "github.com/wavetermdev/waveterm/pkg/vdom" "github.com/wavetermdev/waveterm/pkg/waveobj" @@ -179,9 +180,11 @@ type WshRpcInterface interface { WaveInfoCommand(ctx context.Context) (*WaveInfoData, error) WshActivityCommand(ct context.Context, data map[string]int) error ActivityCommand(ctx context.Context, data ActivityUpdate) error + RecordTEventCommand(ctx context.Context, data telemetrydata.TEvent) error GetVarCommand(ctx context.Context, data CommandVarData) (*CommandVarResponseData, error) SetVarCommand(ctx context.Context, data CommandVarData) error PathCommand(ctx context.Context, data PathCommandData) (string, error) + SendTelemetryCommand(ctx context.Context) error // connection functions ConnStatusCommand(ctx context.Context) ([]ConnStatus, error) diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index ff403cd73..c8e9d95bb 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -27,6 +27,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/remote/conncontroller" "github.com/wavetermdev/waveterm/pkg/remote/fileshare" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/util/envutil" "github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes" "github.com/wavetermdev/waveterm/pkg/util/shellutil" @@ -35,6 +36,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/waveai" "github.com/wavetermdev/waveterm/pkg/wavebase" "github.com/wavetermdev/waveterm/pkg/waveobj" + "github.com/wavetermdev/waveterm/pkg/wcloud" "github.com/wavetermdev/waveterm/pkg/wconfig" "github.com/wavetermdev/waveterm/pkg/wcore" "github.com/wavetermdev/waveterm/pkg/wps" @@ -735,12 +737,29 @@ func (ws *WshServer) WorkspaceListCommand(ctx context.Context) ([]wshrpc.Workspa return rtn, nil } +func (ws *WshServer) RecordTEventCommand(ctx context.Context, data telemetrydata.TEvent) error { + err := telemetry.RecordTEvent(ctx, &data) + if err != nil { + log.Printf("error recording telemetry event: %v", err) + } + return err +} + +func (ws WshServer) SendTelemetryCommand(ctx context.Context) error { + client, err := wstore.DBGetSingleton[*waveobj.Client](ctx) + if err != nil { + return fmt.Errorf("getting client data for telemetry: %v", err) + } + return wcloud.SendAllTelemetry(ctx, client.OID) +} + var wshActivityRe = regexp.MustCompile(`^[a-z:#]+$`) func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int) error { if len(data) == 0 { return nil } + props := telemetrydata.TEventProps{} for key, value := range data { if len(key) > 20 { delete(data, key) @@ -751,11 +770,20 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int if value != 1 { delete(data, key) } + if strings.HasSuffix(key, "#error") { + props.WshHadError = true + } else { + props.WshCmd = key + } } activityUpdate := wshrpc.ActivityUpdate{ WshCmds: data, } telemetry.GoUpdateActivityWrap(activityUpdate, "wsh-activity") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "wsh:run", + Props: props, + }) return nil } diff --git a/pkg/wshutil/wshutil.go b/pkg/wshutil/wshutil.go index 17a422eeb..c27af7b6f 100644 --- a/pkg/wshutil/wshutil.go +++ b/pkg/wshutil/wshutil.go @@ -156,7 +156,9 @@ func installShutdownSignalHandlers(quiet bool) { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) go func() { - defer panichandler.PanicHandlerNoTelemetry("installShutdownSignalHandlers", recover()) + defer func() { + panichandler.PanicHandlerNoTelemetry("installShutdownSignalHandlers", recover()) + }() for sig := range sigCh { DoShutdown(fmt.Sprintf("got signal %v", sig), 1, quiet) break diff --git a/pkg/wslconn/wslconn.go b/pkg/wslconn/wslconn.go index 63d99f197..7fe659490 100644 --- a/pkg/wslconn/wslconn.go +++ b/pkg/wslconn/wslconn.go @@ -20,6 +20,7 @@ import ( "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote/conncontroller" "github.com/wavetermdev/waveterm/pkg/telemetry" + "github.com/wavetermdev/waveterm/pkg/telemetry/telemetrydata" "github.com/wavetermdev/waveterm/pkg/userinput" "github.com/wavetermdev/waveterm/pkg/util/shellutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -534,6 +535,12 @@ func (conn *WslConn) Connect(ctx context.Context) error { telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"wsl:connecterror": 1}, }, "wsl-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connecterror", + Props: telemetrydata.TEventProps{ + ConnType: "wsl", + }, + }) } else { conn.Infof(ctx, "successfully connected (wsh:%v)\n\n", conn.WshEnabled.Load()) conn.Status = Status_Connected @@ -544,6 +551,12 @@ func (conn *WslConn) Connect(ctx context.Context) error { telemetry.GoUpdateActivityWrap(wshrpc.ActivityUpdate{ Conn: map[string]int{"wsl:connect": 1}, }, "wsl-connconnect") + telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{ + Event: "conn:connect", + Props: telemetrydata.TEventProps{ + ConnType: "wsl", + }, + }) } }) conn.FireConnChangeEvent()