From 2282bc8d9ead63c893af412f9fc9bf744402ca03 Mon Sep 17 00:00:00 2001 From: xackery Date: Wed, 22 Nov 2023 02:48:36 -0800 Subject: [PATCH] Add peqeditor --- peqeditorsql/peqeditorsql.go | 84 +++++++++++++++++------------- peqeditorsql/tail.go | 99 ++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 36 deletions(-) create mode 100644 peqeditorsql/tail.go diff --git a/peqeditorsql/peqeditorsql.go b/peqeditorsql/peqeditorsql.go index d93caa7..6179912 100644 --- a/peqeditorsql/peqeditorsql.go +++ b/peqeditorsql/peqeditorsql.go @@ -7,7 +7,6 @@ import ( "os" "regexp" "sync" - "text/template" "time" "github.com/xackery/talkeq/request" @@ -91,52 +90,65 @@ func (t *PEQEditorSQL) Connect(ctx context.Context) error { } func (t *PEQEditorSQL) loop(ctx context.Context) { - tmpl := template.New("filePattern") - tmpl.Parse(t.config.FilePattern) - - buf := new(bytes.Buffer) - tmpl.Execute(buf, struct { - Year int - Month string - }{ - time.Now().Year(), - time.Now().Format("01"), - }) - - finalPath := fmt.Sprintf("%s/%s", t.config.Path, buf.String()) - tlog.Debugf("[peqeditorsql] tailing file %s", finalPath) - - fi, err := os.Stat(finalPath) + msgChan := make(chan string, 100) + tail1, err := newTailWatch(t.ctx, &tailReq{ + id: 1, + filePattern: t.config.FilePattern, + basePath: t.config.Path, + cfg: tail.Config{ + Follow: true, + MustExist: false, + Poll: true, + Logger: tail.DiscardingLogger, + }, + isNextMonth: false, + }, msgChan) if err != nil { - tlog.Warnf("[peqeditorsql] stat polling failed: %s", err) + tlog.Warnf("[peqeditorsql] tail1 creation failed: %s", err) t.Disconnect(ctx) return } - cfg := tail.Config{ - Follow: true, - MustExist: true, - Poll: true, - Location: &tail.SeekInfo{ - Offset: fi.Size(), - }, - Logger: tail.DiscardingLogger, + + err = tail1.restart(msgChan) + if err != nil { + tlog.Warnf("[peqeditorsql] tail1 start failed: %s", err) + t.Disconnect(ctx) + return } - tailer, err := tail.TailFile(finalPath, cfg) + tail2, err := newTailWatch(t.ctx, &tailReq{ + id: 2, + filePattern: t.config.FilePattern, + basePath: t.config.Path, + cfg: tail.Config{ + Follow: true, + MustExist: false, + Poll: true, + Logger: tail.DiscardingLogger, + }, + isNextMonth: true, + }, msgChan) if err != nil { - tlog.Warnf("[peqeditorsql] tail attempt failed: %s", err) + tlog.Warnf("[peqeditorsql] tail2 creation failed: %s", err) t.Disconnect(ctx) return } - for line := range tailer.Lines { - select { - case <-t.ctx.Done(): - tlog.Debugf("[peqeditorsql] exiting loop") - return - default: - } + err = tail2.restart(msgChan) + if err != nil { + tlog.Warnf("[peqeditorsql] tail2 start failed: %s", err) + t.Disconnect(ctx) + return + } + ticker := time.NewTicker(12 * time.Hour) + select { + case <-t.ctx.Done(): + return + case <-ticker.C: + tail1.restart(msgChan) + tail2.restart(msgChan) + case line := <-msgChan: for routeIndex, route := range t.config.Routes { if !route.IsEnabled { continue @@ -146,7 +158,7 @@ func (t *PEQEditorSQL) loop(ctx context.Context) { tlog.Debugf("[peqeditorsql] compile route %d skipped: %s", routeIndex, err) continue } - matches := pattern.FindAllStringSubmatch(line.Text, -1) + matches := pattern.FindAllStringSubmatch(line, -1) if len(matches) == 0 { continue } diff --git a/peqeditorsql/tail.go b/peqeditorsql/tail.go new file mode 100644 index 0000000..e661fdd --- /dev/null +++ b/peqeditorsql/tail.go @@ -0,0 +1,99 @@ +package peqeditorsql + +import ( + "bytes" + "context" + "fmt" + "os" + "text/template" + "time" + + "github.com/hpcloud/tail" + "github.com/xackery/talkeq/tlog" +) + +// tail wraps the tail tool for each file being watched +type tailWatch struct { + rootCtx context.Context + ctx context.Context + cancel context.CancelFunc + req *tailReq + tailer *tail.Tail +} + +type tailReq struct { + id int + filePattern string + basePath string + cfg tail.Config + isNextMonth bool +} + +func newTailWatch(rootCtx context.Context, req *tailReq, msgChan chan string) (*tailWatch, error) { + e := &tailWatch{ + rootCtx: rootCtx, + req: req, + } + err := e.restart(msgChan) + if err != nil { + return nil, fmt.Errorf("restart: %w", err) + } + + return e, nil +} + +func (e *tailWatch) restart(msgChan chan string) error { + var err error + e.cancel() + time.Sleep(1 * time.Second) + e.ctx, e.cancel = context.WithCancel(context.Background()) + buf := new(bytes.Buffer) + tmpl := template.New("filePattern") + tmpl.Parse(e.req.filePattern) + + month := time.Now().Format("01") + if e.req.isNextMonth { + month = time.Now().AddDate(0, 1, 0).Format("01") + } + + tmpl.Execute(buf, struct { + Year int + Month string + }{ + time.Now().Year(), + month, + }) + finalPath := fmt.Sprintf("%s/%s", e.req.basePath, buf.String()) + + fi, err := os.Stat(finalPath) + if err == nil { + e.req.cfg.Location = &tail.SeekInfo{Offset: fi.Size()} + } + + e.tailer, err = tail.TailFile(finalPath, e.req.cfg) + if err != nil { + return fmt.Errorf("tail: %w", err) + } + go e.loop(msgChan) + return nil +} + +func (e *tailWatch) loop(msgChan chan string) { + defer func() { + tlog.Debugf("[peqeditorsql] tail%d loop exiting for %s", e.req.id, e.tailer.Filename) + e.tailer.Cleanup() + }() + + select { + case <-e.rootCtx.Done(): + return + case <-e.ctx.Done(): + return + case line := <-e.tailer.Lines: + if line.Err != nil { + tlog.Warnf("[peqeditorsql] tail%d error: %s", e.req.id, line.Err) + return + } + msgChan <- line.Text + } +}