Skip to content

Commit

Permalink
Add peqeditor
Browse files Browse the repository at this point in the history
  • Loading branch information
xackery committed Nov 22, 2023
1 parent acfbd57 commit 2282bc8
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 36 deletions.
84 changes: 48 additions & 36 deletions peqeditorsql/peqeditorsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"regexp"
"sync"
"text/template"
"time"

"github.com/xackery/talkeq/request"
Expand Down Expand Up @@ -91,52 +90,65 @@ func (t *PEQEditorSQL) Connect(ctx context.Context) error {
}

func (t *PEQEditorSQL) loop(ctx context.Context) {
tmpl := template.New("filePattern")
tmpl.Parse(t.config.FilePattern)

buf := new(bytes.Buffer)
tmpl.Execute(buf, struct {
Year int
Month string
}{
time.Now().Year(),
time.Now().Format("01"),
})

finalPath := fmt.Sprintf("%s/%s", t.config.Path, buf.String())
tlog.Debugf("[peqeditorsql] tailing file %s", finalPath)

fi, err := os.Stat(finalPath)
msgChan := make(chan string, 100)
tail1, err := newTailWatch(t.ctx, &tailReq{
id: 1,
filePattern: t.config.FilePattern,
basePath: t.config.Path,
cfg: tail.Config{
Follow: true,
MustExist: false,
Poll: true,
Logger: tail.DiscardingLogger,
},
isNextMonth: false,
}, msgChan)
if err != nil {
tlog.Warnf("[peqeditorsql] stat polling failed: %s", err)
tlog.Warnf("[peqeditorsql] tail1 creation failed: %s", err)
t.Disconnect(ctx)
return
}
cfg := tail.Config{
Follow: true,
MustExist: true,
Poll: true,
Location: &tail.SeekInfo{
Offset: fi.Size(),
},
Logger: tail.DiscardingLogger,

err = tail1.restart(msgChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail1 start failed: %s", err)
t.Disconnect(ctx)
return
}

tailer, err := tail.TailFile(finalPath, cfg)
tail2, err := newTailWatch(t.ctx, &tailReq{
id: 2,
filePattern: t.config.FilePattern,
basePath: t.config.Path,
cfg: tail.Config{
Follow: true,
MustExist: false,
Poll: true,
Logger: tail.DiscardingLogger,
},
isNextMonth: true,
}, msgChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail attempt failed: %s", err)
tlog.Warnf("[peqeditorsql] tail2 creation failed: %s", err)
t.Disconnect(ctx)
return
}

for line := range tailer.Lines {
select {
case <-t.ctx.Done():
tlog.Debugf("[peqeditorsql] exiting loop")
return
default:
}
err = tail2.restart(msgChan)
if err != nil {
tlog.Warnf("[peqeditorsql] tail2 start failed: %s", err)
t.Disconnect(ctx)
return
}

ticker := time.NewTicker(12 * time.Hour)
select {
case <-t.ctx.Done():
return
case <-ticker.C:
tail1.restart(msgChan)
tail2.restart(msgChan)
case line := <-msgChan:
for routeIndex, route := range t.config.Routes {
if !route.IsEnabled {
continue
Expand All @@ -146,7 +158,7 @@ func (t *PEQEditorSQL) loop(ctx context.Context) {
tlog.Debugf("[peqeditorsql] compile route %d skipped: %s", routeIndex, err)
continue
}
matches := pattern.FindAllStringSubmatch(line.Text, -1)
matches := pattern.FindAllStringSubmatch(line, -1)
if len(matches) == 0 {
continue
}
Expand Down
99 changes: 99 additions & 0 deletions peqeditorsql/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package peqeditorsql

import (
"bytes"
"context"
"fmt"
"os"
"text/template"
"time"

"github.com/hpcloud/tail"
"github.com/xackery/talkeq/tlog"
)

// tail wraps the tail tool for each file being watched
type tailWatch struct {
rootCtx context.Context
ctx context.Context
cancel context.CancelFunc
req *tailReq
tailer *tail.Tail
}

type tailReq struct {
id int
filePattern string
basePath string
cfg tail.Config
isNextMonth bool
}

func newTailWatch(rootCtx context.Context, req *tailReq, msgChan chan string) (*tailWatch, error) {
e := &tailWatch{
rootCtx: rootCtx,
req: req,
}
err := e.restart(msgChan)
if err != nil {
return nil, fmt.Errorf("restart: %w", err)
}

return e, nil
}

func (e *tailWatch) restart(msgChan chan string) error {
var err error
e.cancel()
time.Sleep(1 * time.Second)
e.ctx, e.cancel = context.WithCancel(context.Background())
buf := new(bytes.Buffer)
tmpl := template.New("filePattern")
tmpl.Parse(e.req.filePattern)

month := time.Now().Format("01")
if e.req.isNextMonth {
month = time.Now().AddDate(0, 1, 0).Format("01")
}

tmpl.Execute(buf, struct {
Year int
Month string
}{
time.Now().Year(),
month,
})
finalPath := fmt.Sprintf("%s/%s", e.req.basePath, buf.String())

fi, err := os.Stat(finalPath)
if err == nil {
e.req.cfg.Location = &tail.SeekInfo{Offset: fi.Size()}
}

e.tailer, err = tail.TailFile(finalPath, e.req.cfg)
if err != nil {
return fmt.Errorf("tail: %w", err)
}
go e.loop(msgChan)
return nil
}

func (e *tailWatch) loop(msgChan chan string) {
defer func() {
tlog.Debugf("[peqeditorsql] tail%d loop exiting for %s", e.req.id, e.tailer.Filename)
e.tailer.Cleanup()
}()

select {
case <-e.rootCtx.Done():
return
case <-e.ctx.Done():
return
case line := <-e.tailer.Lines:
if line.Err != nil {
tlog.Warnf("[peqeditorsql] tail%d error: %s", e.req.id, line.Err)
return
}
msgChan <- line.Text
}
}

0 comments on commit 2282bc8

Please sign in to comment.