Skip to content

Commit

Permalink
chore: report global labels with heartbeat (#1066)
Browse files Browse the repository at this point in the history
* chore: report global labels with heartbeat

* chore: sort import packages

* fix: lint error
  • Loading branch information
kongfei605 authored Sep 29, 2024
1 parent 1be2fe3 commit 7dca179
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 6 deletions.
50 changes: 49 additions & 1 deletion heartbeat/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,33 @@ import (
"log"
"net"
"net/http"
"os"
osExec "os/exec"
"runtime"
"strconv"
"strings"
"time"

cpuUtil "github.com/shirou/gopsutil/v3/cpu"

"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/inputs/system"
cpuUtil "github.com/shirou/gopsutil/v3/cpu"
"flashcat.cloud/categraf/pkg/cmdx"
)

const collinterval = 3

type (
HeartbeatResponse struct {
Data UpdateInfo `json:"dat"`
Msg string `json:"err"`
}
UpdateInfo struct {
NewVersion string `json:"new_version"`
UpdateURL string `json:"download_url"`
}
)

func Work() {
conf := config.Config.Heartbeat

Expand Down Expand Up @@ -114,6 +129,7 @@ func work(ps *system.SystemPS, client *http.Client) {
"cpu_util": cpuUsagePercent,
"mem_util": memUsagePercent,
"unixtime": time.Now().UnixMilli(),
"global_labels": config.GlobalLabels(),
"host_ip": hostIP,
}

Expand Down Expand Up @@ -191,6 +207,38 @@ func work(ps *system.SystemPS, client *http.Client) {
if debug() {
log.Println("D! heartbeat response:", string(bs), "status code:", res.StatusCode)
}

hr := HeartbeatResponse{}
err = json.Unmarshal(bs, &hr)
if err != nil {
log.Println("W! failed to unmarshal heartbeat response:", err)
return
}
if len(hr.Data.NewVersion) != 0 && len(hr.Data.UpdateURL) != 0 && hr.Data.NewVersion != shortVersion && hr.Data.NewVersion != config.Version {
var (
out bytes.Buffer
stderr bytes.Buffer
)
exe, err := os.Executable()
if err != nil {
log.Println("E! failed to get current executable:", err)
return
}
cmd := osExec.Command(exe, "-update", "-update_url", hr.Data.UpdateURL)
cmd.Stdout = &out
cmd.Stderr = &stderr
err, timeout := cmdx.RunTimeout(cmd, time.Second*300)
if timeout {
log.Printf("E! exec %s timeout", cmd.String())
return
}
if err != nil {
log.Println("E! failed to update categraf:", err, "stderr:", stderr.String(), "stdout:",
out.String(), "command:", cmd.String())
return
}
log.Printf("update categraf(%s) from %s success, new version: %s", version(), hr.Data.UpdateURL, hr.Data.NewVersion)
}
}

func memUsage(ps *system.SystemPS) float64 {
Expand Down
2 changes: 1 addition & 1 deletion inputs/apache/apache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ins *Instance) Init() error {
e, err := exporter.New(logger, &ins.Config)

if err != nil {
return fmt.Errorf("could not instantiate mongodb lag exporter: %w", err)
return fmt.Errorf("could not instantiate mongodb lag exporter: %v", err)
}

ins.e = e
Expand Down
5 changes: 3 additions & 2 deletions inputs/mtail/internal/runtime/compiler/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"fmt"
"strings"

"flashcat.cloud/categraf/inputs/mtail/internal/runtime/compiler/position"
"github.com/pkg/errors"

"flashcat.cloud/categraf/inputs/mtail/internal/runtime/compiler/position"
)

type compileError struct {
Expand All @@ -26,7 +27,7 @@ type ErrorList []*compileError
// Add appends an error at a position to the list of errors.
func (p *ErrorList) Add(pos *position.Position, msg string) {
if pos == nil {
pos = &position.Position{"", -1, -1, -1}
pos = &position.Position{Filename: "", Line: -1, Startcol: -1, Endcol: -1}
}
*p = append(*p, &compileError{*pos, msg})
}
Expand Down
2 changes: 1 addition & 1 deletion inputs/mtail/internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
// common change pattern anyway.
newfi, serr := os.Stat(fs.pathname)
if serr != nil {
log.Printf("stream(%s): stat error: %v", serr)
log.Printf("stream(%s): stat error: %v", fs.pathname, serr)
// If this is a NotExist error, then we should wrap up this
// goroutine. The Tailer will create a new logstream if the
// file is in the middle of a rotation and gets recreated
Expand Down
2 changes: 1 addition & 1 deletion inputs/mtail/internal/tailer/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st
if err != nil {
return nil, err
}
log.Println("Parsed url as %v", u)
log.Printf("Parsed url as %v", u)

path := pathname
switch u.Scheme {
Expand Down

0 comments on commit 7dca179

Please sign in to comment.