Skip to content

Commit

Permalink
support view log on web
Browse files Browse the repository at this point in the history
  • Loading branch information
lizongying committed Nov 23, 2023
1 parent 290c80e commit 7d74e17
Show file tree
Hide file tree
Showing 20 changed files with 348 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ curl https://github.com/lizongying/go-crawler -x http://localhost:8082 --cacert
* monitor
* statistics
* panic stop
* extra速率限制
* extra
* request context
* total
* log
Expand Down
70 changes: 70 additions & 0 deletions pkg/api/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package api

import (
"fmt"
"github.com/lizongying/go-crawler/pkg"
"net/http"
)

const UrlLog = "/log"

type RouteLog struct {
Request
Response
crawler pkg.Crawler
logger pkg.Logger
stream pkg.Stream
}

func (h *RouteLog) Pattern() string {
return UrlLog
}

func (h *RouteLog) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

if r.URL.Query().Get("task_id") == "" {
http.Error(w, "TaskId empty", http.StatusInternalServerError)
return
}

flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}

logChannel := make(chan []byte, 100)
h.stream.Register("", logChannel)
defer func() {
h.stream.Unregister("")
close(logChannel)
}()

for {
var message []byte
select {
case message, ok = <-logChannel:
if !ok {
return
}
_, _ = fmt.Fprintf(w, "data: %s\n\n", string(message))
flusher.Flush()
case <-r.Context().Done():
return
}
}
}

func (h *RouteLog) FromCrawler(crawler pkg.Crawler) pkg.Route {
if h == nil {
return new(RouteLog).FromCrawler(crawler)
}

h.logger = crawler.GetLogger()
h.crawler = crawler
h.stream = crawler.GetStream()
return h
}
6 changes: 5 additions & 1 deletion pkg/api/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func (a *Api) loggingMiddleware(next http.Handler) http.Handler {

func (a *Api) keyAuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/user" && r.Header.Get("X-API-Key") != a.accessKey {
token := r.Header.Get("X-API-Key")
if token == "" {
token = r.URL.Query().Get("X-API-Key")
}
if r.URL.Path != "/" && r.URL.Path != "/user" && token != a.accessKey {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
Expand Down
1 change: 1 addition & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (a *App) Run(crawlOptions ...pkg.CrawlOption) {
db.NewKafka,
db.NewKafkaReader,
db.NewRedis,
loggers.NewStream,
fx.Annotate(
loggers.NewLogger,
fx.As(new(pkg.Logger)),
Expand Down
2 changes: 2 additions & 0 deletions pkg/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Crawler interface {
GenUid() uint64

StartFromCLI() bool

GetStream() Stream
}

type CrawlOption func(Crawler)
Expand Down
10 changes: 9 additions & 1 deletion pkg/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lizongying/go-crawler/pkg/cli"
"github.com/lizongying/go-crawler/pkg/config"
crawlerContext "github.com/lizongying/go-crawler/pkg/context"
"github.com/lizongying/go-crawler/pkg/loggers"
"github.com/lizongying/go-crawler/pkg/signals"
"github.com/lizongying/go-crawler/pkg/statistics"
"github.com/lizongying/go-crawler/pkg/utils"
Expand Down Expand Up @@ -51,8 +52,13 @@ type Crawler struct {
itemConcurrencyChan chan struct{}
itemTimer *time.Timer
ug *uid.Uid

stream pkg.Stream
}

func (c *Crawler) GetStream() pkg.Stream {
return c.stream
}
func (c *Crawler) GenUid() uint64 {
return c.ug.Gen()
}
Expand Down Expand Up @@ -326,7 +332,7 @@ func (c *Crawler) SpiderStopped(_ pkg.Context, _ error) {
c.spider.Out()
}

func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logger pkg.Logger, mongoDb *mongo.Database, mysql *sql.DB, redis *redis.Client, kafka *kafka.Writer, kafkaReader *kafka.Reader, sqlite pkg.Sqlite, store pkg.Store, mockServer pkg.MockServer, httpApi *api.Api) (crawler pkg.Crawler, err error) {
func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logger pkg.Logger, mongoDb *mongo.Database, mysql *sql.DB, redis *redis.Client, kafka *kafka.Writer, kafkaReader *kafka.Reader, sqlite pkg.Sqlite, store pkg.Store, mockServer pkg.MockServer, httpApi *api.Api, stream *loggers.Stream) (crawler pkg.Crawler, err error) {
spider := pkg.NewState("spider")
spider.RegisterIsReadyAndIsZero(func() {
_ = crawler.Stop(crawler.GetContext())
Expand Down Expand Up @@ -355,6 +361,7 @@ func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logge
stop: make(chan struct{}),
ug: ug,
spiders: spiders,
stream: stream,
}

httpApi.AddRoutes(new(api.RouteHome).FromCrawler(crawler))
Expand All @@ -370,6 +377,7 @@ func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logge
httpApi.AddRoutes(new(api.RouteRequests).FromCrawler(crawler))
httpApi.AddRoutes(new(api.RouteRecords).FromCrawler(crawler))
httpApi.AddRoutes(new(api.RouteUser).FromCrawler(crawler))
httpApi.AddRoutes(new(api.RouteLog).FromCrawler(crawler))

crawler.SetSignal(new(signals.Signal).FromCrawler(crawler))
crawler.SetStatistics(new(statistics.Statistics).FromCrawler(crawler))
Expand Down
80 changes: 40 additions & 40 deletions pkg/loggers/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (l *Logger) Debugf(format string, v ...any) {
if !l.longFile {
file = file[strings.LastIndex(file, "/")+1:]
}
format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
l.loggerDebug.Printf(format, v...)
}

Expand All @@ -71,7 +71,7 @@ func (l *Logger) Infof(format string, v ...any) {
if !l.longFile {
file = file[strings.LastIndex(file, "/")+1:]
}
format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
l.loggerInfo.Printf(format, v...)
}

Expand All @@ -95,7 +95,7 @@ func (l *Logger) Warnf(format string, v ...any) {
if !l.longFile {
file = file[strings.LastIndex(file, "/")+1:]
}
format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
l.loggerWarn.Printf(format, v...)
}

Expand All @@ -119,57 +119,57 @@ func (l *Logger) Errorf(format string, v ...any) {
if !l.longFile {
file = file[strings.LastIndex(file, "/")+1:]
}
format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format)
l.loggerError.Printf(format, v...)
}

func NewLogger(config *config.Config) (logger *Logger, err error) {
func NewLogger(config *config.Config, stream *Stream) (logger *Logger, err error) {
logger = &Logger{
longFile: config.GetLogLongFile(),
level: config.GetLogLevel(),
}
filename := config.Log.Filename
if filename == "" {
logger.loggerDebug = log.New(os.Stdout, "Debug:", log.Ldate|log.Ltime)
logger.loggerInfo = log.New(os.Stdout, "Info:", log.Ldate|log.Ltime)
logger.loggerWarn = log.New(os.Stdout, "Warn:", log.Ldate|log.Ltime)
logger.loggerError = log.New(os.Stdout, "Error:", log.Ldate|log.Ltime)
return
}

if name != "" {
var multiWriter []io.Writer

multiWriter = append(multiWriter, os.Stdout)

filename := config.Log.Filename
if filename != "" {
filename = strings.ReplaceAll(filename, "{name}", name)
}

if !utils.ExistsDir(filename) {
err = os.MkdirAll(filepath.Dir(filename), 0744)
if err != nil {
log.Panicln(err)
return
if !utils.ExistsDir(filename) {
if err = os.MkdirAll(filepath.Dir(filename), 0744); err != nil {
log.Panicln(err)
return
}
}
}
if !utils.ExistsFile(filename) {
file, errCreateFile := os.Create(filename)
if errCreateFile != nil {
log.Panicln(errCreateFile)
return
}
err = file.Close()
if err != nil {
log.Panicln(err)
return

var file *os.File
if !utils.ExistsFile(filename) {
file, err = os.Create(filename)
if err != nil {
log.Panicln(err)
return
}
} else {
file, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Panicln(err)
return
}
}
}
logFile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Panicln(err)
return

multiWriter = append(multiWriter, file)
}

logger.loggerDebug = log.New(io.MultiWriter(os.Stderr, logFile), "Debug:", log.Ldate|log.Ltime)
logger.loggerInfo = log.New(io.MultiWriter(os.Stderr, logFile), "Info:", log.Ldate|log.Ltime)
logger.loggerWarn = log.New(io.MultiWriter(os.Stderr, logFile), "Warn:", log.Ldate|log.Ltime)
logger.loggerError = log.New(io.MultiWriter(os.Stderr, logFile), "Error:", log.Ldate|log.Ltime)
if stream != nil {
multiWriter = append(multiWriter, stream)
}

writer := io.MultiWriter(multiWriter...)
logger.loggerDebug = log.New(writer, "Debug: ", log.Ldate|log.Ltime|log.Lmsgprefix)
logger.loggerInfo = log.New(writer, "Info: ", log.Ldate|log.Ltime|log.Lmsgprefix)
logger.loggerWarn = log.New(writer, "Warn: ", log.Ldate|log.Ltime|log.Lmsgprefix)
logger.loggerError = log.New(writer, "Error: ", log.Ldate|log.Ltime|log.Lmsgprefix)
return
}
8 changes: 8 additions & 0 deletions pkg/loggers/out.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package loggers

import "io"

type Out struct {
io.Writer
Name string
}
67 changes: 67 additions & 0 deletions pkg/loggers/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package loggers

type Ch struct {
Name string
Channel chan []byte
}

type Stream struct {
channels map[string]chan []byte
register chan Ch
unregister chan string
channel chan []byte
}

func NewStream() (s *Stream) {
s = &Stream{
channels: make(map[string]chan []byte),
register: make(chan Ch),
unregister: make(chan string),
channel: make(chan []byte, 10),
}
go func() {
for msg := range s.channel {
for _, ch := range s.channels {
select {
case ch <- msg:
default:
<-ch
ch <- msg
}
}
}
}()
go func() {
for {
select {
case ch := <-s.register:
s.channels[ch.Name] = ch.Channel
case chName := <-s.unregister:
delete(s.channels, chName)
}
}
}()

return
}

func (s *Stream) Write(p []byte) (n int, err error) {
select {
case s.channel <- p:
default:
<-s.channel
s.channel <- p
}
return
}

func (s *Stream) Register(name string, channel chan []byte) {
s.register <- Ch{
Name: name,
Channel: channel,
}
}

func (s *Stream) Unregister(name string) {
s.unregister <- name
}
Loading

0 comments on commit 7d74e17

Please sign in to comment.