diff --git a/README_CN.md b/README_CN.md index 79f66f5..3f607f4 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1108,7 +1108,7 @@ curl https://github.com/lizongying/go-crawler -x http://localhost:8082 --cacert * monitor * statistics * panic stop -* extra速率限制 +* extra * request context * total * log diff --git a/pkg/api/log.go b/pkg/api/log.go new file mode 100644 index 0000000..eba3d62 --- /dev/null +++ b/pkg/api/log.go @@ -0,0 +1,70 @@ +package api + +import ( + "fmt" + "github.com/lizongying/go-crawler/pkg" + "net/http" +) + +const UrlLog = "/log" + +type RouteLog struct { + Request + Response + crawler pkg.Crawler + logger pkg.Logger + stream pkg.Stream +} + +func (h *RouteLog) Pattern() string { + return UrlLog +} + +func (h *RouteLog) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + if r.URL.Query().Get("task_id") == "" { + http.Error(w, "TaskId empty", http.StatusInternalServerError) + return + } + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + logChannel := make(chan []byte, 100) + h.stream.Register("", logChannel) + defer func() { + h.stream.Unregister("") + close(logChannel) + }() + + for { + var message []byte + select { + case message, ok = <-logChannel: + if !ok { + return + } + _, _ = fmt.Fprintf(w, "data: %s\n\n", string(message)) + flusher.Flush() + case <-r.Context().Done(): + return + } + } +} + +func (h *RouteLog) FromCrawler(crawler pkg.Crawler) pkg.Route { + if h == nil { + return new(RouteLog).FromCrawler(crawler) + } + + h.logger = crawler.GetLogger() + h.crawler = crawler + h.stream = crawler.GetStream() + return h +} diff --git a/pkg/api/middlewares.go b/pkg/api/middlewares.go index 50b410a..d05e283 100644 --- a/pkg/api/middlewares.go +++ b/pkg/api/middlewares.go @@ -28,7 +28,11 @@ func (a *Api) loggingMiddleware(next http.Handler) http.Handler { func (a *Api) keyAuthMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/" && r.URL.Path != "/user" && r.Header.Get("X-API-Key") != a.accessKey { + token := r.Header.Get("X-API-Key") + if token == "" { + token = r.URL.Query().Get("X-API-Key") + } + if r.URL.Path != "/" && r.URL.Path != "/user" && token != a.accessKey { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } diff --git a/pkg/app/app.go b/pkg/app/app.go index 8450750..287e2c7 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -31,6 +31,7 @@ func (a *App) Run(crawlOptions ...pkg.CrawlOption) { db.NewKafka, db.NewKafkaReader, db.NewRedis, + loggers.NewStream, fx.Annotate( loggers.NewLogger, fx.As(new(pkg.Logger)), diff --git a/pkg/crawler.go b/pkg/crawler.go index a7b21cd..54e5ab6 100644 --- a/pkg/crawler.go +++ b/pkg/crawler.go @@ -53,6 +53,8 @@ type Crawler interface { GenUid() uint64 StartFromCLI() bool + + GetStream() Stream } type CrawlOption func(Crawler) diff --git a/pkg/crawler/crawler.go b/pkg/crawler/crawler.go index 8e6c98c..20f6d63 100644 --- a/pkg/crawler/crawler.go +++ b/pkg/crawler/crawler.go @@ -10,6 +10,7 @@ import ( "github.com/lizongying/go-crawler/pkg/cli" "github.com/lizongying/go-crawler/pkg/config" crawlerContext "github.com/lizongying/go-crawler/pkg/context" + "github.com/lizongying/go-crawler/pkg/loggers" "github.com/lizongying/go-crawler/pkg/signals" "github.com/lizongying/go-crawler/pkg/statistics" "github.com/lizongying/go-crawler/pkg/utils" @@ -51,8 +52,13 @@ type Crawler struct { itemConcurrencyChan chan struct{} itemTimer *time.Timer ug *uid.Uid + + stream pkg.Stream } +func (c *Crawler) GetStream() pkg.Stream { + return c.stream +} func (c *Crawler) GenUid() uint64 { return c.ug.Gen() } @@ -326,7 +332,7 @@ func (c *Crawler) SpiderStopped(_ pkg.Context, _ error) { c.spider.Out() } -func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logger pkg.Logger, mongoDb *mongo.Database, mysql *sql.DB, redis *redis.Client, kafka *kafka.Writer, kafkaReader *kafka.Reader, sqlite pkg.Sqlite, store pkg.Store, mockServer pkg.MockServer, httpApi *api.Api) (crawler pkg.Crawler, err error) { +func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logger pkg.Logger, mongoDb *mongo.Database, mysql *sql.DB, redis *redis.Client, kafka *kafka.Writer, kafkaReader *kafka.Reader, sqlite pkg.Sqlite, store pkg.Store, mockServer pkg.MockServer, httpApi *api.Api, stream *loggers.Stream) (crawler pkg.Crawler, err error) { spider := pkg.NewState("spider") spider.RegisterIsReadyAndIsZero(func() { _ = crawler.Stop(crawler.GetContext()) @@ -355,6 +361,7 @@ func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logge stop: make(chan struct{}), ug: ug, spiders: spiders, + stream: stream, } httpApi.AddRoutes(new(api.RouteHome).FromCrawler(crawler)) @@ -370,6 +377,7 @@ func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logge httpApi.AddRoutes(new(api.RouteRequests).FromCrawler(crawler)) httpApi.AddRoutes(new(api.RouteRecords).FromCrawler(crawler)) httpApi.AddRoutes(new(api.RouteUser).FromCrawler(crawler)) + httpApi.AddRoutes(new(api.RouteLog).FromCrawler(crawler)) crawler.SetSignal(new(signals.Signal).FromCrawler(crawler)) crawler.SetStatistics(new(statistics.Statistics).FromCrawler(crawler)) diff --git a/pkg/loggers/logger.go b/pkg/loggers/logger.go index dcad86f..10ccaa6 100644 --- a/pkg/loggers/logger.go +++ b/pkg/loggers/logger.go @@ -47,7 +47,7 @@ func (l *Logger) Debugf(format string, v ...any) { if !l.longFile { file = file[strings.LastIndex(file, "/")+1:] } - format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) + format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) l.loggerDebug.Printf(format, v...) } @@ -71,7 +71,7 @@ func (l *Logger) Infof(format string, v ...any) { if !l.longFile { file = file[strings.LastIndex(file, "/")+1:] } - format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) + format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) l.loggerInfo.Printf(format, v...) } @@ -95,7 +95,7 @@ func (l *Logger) Warnf(format string, v ...any) { if !l.longFile { file = file[strings.LastIndex(file, "/")+1:] } - format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) + format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) l.loggerWarn.Printf(format, v...) } @@ -119,57 +119,57 @@ func (l *Logger) Errorf(format string, v ...any) { if !l.longFile { file = file[strings.LastIndex(file, "/")+1:] } - format = fmt.Sprintf("%s %s", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) + format = fmt.Sprintf("%s %s\n", strings.Join([]string{file, strconv.Itoa(line)}, ":"), format) l.loggerError.Printf(format, v...) } -func NewLogger(config *config.Config) (logger *Logger, err error) { +func NewLogger(config *config.Config, stream *Stream) (logger *Logger, err error) { logger = &Logger{ longFile: config.GetLogLongFile(), level: config.GetLogLevel(), } - filename := config.Log.Filename - if filename == "" { - logger.loggerDebug = log.New(os.Stdout, "Debug:", log.Ldate|log.Ltime) - logger.loggerInfo = log.New(os.Stdout, "Info:", log.Ldate|log.Ltime) - logger.loggerWarn = log.New(os.Stdout, "Warn:", log.Ldate|log.Ltime) - logger.loggerError = log.New(os.Stdout, "Error:", log.Ldate|log.Ltime) - return - } - if name != "" { + var multiWriter []io.Writer + + multiWriter = append(multiWriter, os.Stdout) + + filename := config.Log.Filename + if filename != "" { filename = strings.ReplaceAll(filename, "{name}", name) - } - if !utils.ExistsDir(filename) { - err = os.MkdirAll(filepath.Dir(filename), 0744) - if err != nil { - log.Panicln(err) - return + if !utils.ExistsDir(filename) { + if err = os.MkdirAll(filepath.Dir(filename), 0744); err != nil { + log.Panicln(err) + return + } } - } - if !utils.ExistsFile(filename) { - file, errCreateFile := os.Create(filename) - if errCreateFile != nil { - log.Panicln(errCreateFile) - return - } - err = file.Close() - if err != nil { - log.Panicln(err) - return + + var file *os.File + if !utils.ExistsFile(filename) { + file, err = os.Create(filename) + if err != nil { + log.Panicln(err) + return + } + } else { + file, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Panicln(err) + return + } } - } - logFile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - log.Panicln(err) - return + + multiWriter = append(multiWriter, file) } - logger.loggerDebug = log.New(io.MultiWriter(os.Stderr, logFile), "Debug:", log.Ldate|log.Ltime) - logger.loggerInfo = log.New(io.MultiWriter(os.Stderr, logFile), "Info:", log.Ldate|log.Ltime) - logger.loggerWarn = log.New(io.MultiWriter(os.Stderr, logFile), "Warn:", log.Ldate|log.Ltime) - logger.loggerError = log.New(io.MultiWriter(os.Stderr, logFile), "Error:", log.Ldate|log.Ltime) + if stream != nil { + multiWriter = append(multiWriter, stream) + } + writer := io.MultiWriter(multiWriter...) + logger.loggerDebug = log.New(writer, "Debug: ", log.Ldate|log.Ltime|log.Lmsgprefix) + logger.loggerInfo = log.New(writer, "Info: ", log.Ldate|log.Ltime|log.Lmsgprefix) + logger.loggerWarn = log.New(writer, "Warn: ", log.Ldate|log.Ltime|log.Lmsgprefix) + logger.loggerError = log.New(writer, "Error: ", log.Ldate|log.Ltime|log.Lmsgprefix) return } diff --git a/pkg/loggers/out.go b/pkg/loggers/out.go new file mode 100644 index 0000000..03465e7 --- /dev/null +++ b/pkg/loggers/out.go @@ -0,0 +1,8 @@ +package loggers + +import "io" + +type Out struct { + io.Writer + Name string +} diff --git a/pkg/loggers/stream.go b/pkg/loggers/stream.go new file mode 100644 index 0000000..0f7cbf5 --- /dev/null +++ b/pkg/loggers/stream.go @@ -0,0 +1,67 @@ +package loggers + +type Ch struct { + Name string + Channel chan []byte +} + +type Stream struct { + channels map[string]chan []byte + register chan Ch + unregister chan string + channel chan []byte +} + +func NewStream() (s *Stream) { + s = &Stream{ + channels: make(map[string]chan []byte), + register: make(chan Ch), + unregister: make(chan string), + channel: make(chan []byte, 10), + } + go func() { + for msg := range s.channel { + for _, ch := range s.channels { + select { + case ch <- msg: + default: + <-ch + ch <- msg + } + } + } + }() + go func() { + for { + select { + case ch := <-s.register: + s.channels[ch.Name] = ch.Channel + case chName := <-s.unregister: + delete(s.channels, chName) + } + } + }() + + return +} + +func (s *Stream) Write(p []byte) (n int, err error) { + select { + case s.channel <- p: + default: + <-s.channel + s.channel <- p + } + return +} + +func (s *Stream) Register(name string, channel chan []byte) { + s.register <- Ch{ + Name: name, + Channel: channel, + } +} + +func (s *Stream) Unregister(name string) { + s.unregister <- name +} diff --git a/pkg/state_test.go b/pkg/state_test.go index e26a1c6..9dd9b58 100644 --- a/pkg/state_test.go +++ b/pkg/state_test.go @@ -9,7 +9,7 @@ func TestState_RegisterIsReady(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - state := NewState() + state := NewState("") state.RegisterIsReady(func() { wg.Done() }) @@ -25,7 +25,7 @@ func TestState_RegisterIsZero(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - state := NewState() + state := NewState("") state.RegisterIsZero(func() { wg.Done() }) @@ -42,7 +42,7 @@ func TestState_RegisterIsReadyAndIsZero(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - state := NewState() + state := NewState("") state.RegisterIsReadyAndIsZero(func() { wg.Done() }) @@ -57,7 +57,7 @@ func TestState_RegisterIsReadyAndIsZero(t *testing.T) { } func TestState_BeReady(t *testing.T) { - state := NewState() + state := NewState("") state.BeReady() if !state.IsReady() { @@ -66,7 +66,7 @@ func TestState_BeReady(t *testing.T) { } func TestState_In(t *testing.T) { - state := NewState() + state := NewState("") state.In() if !state.IsReady() { @@ -79,7 +79,7 @@ func TestState_In(t *testing.T) { } func TestState_Out(t *testing.T) { - state := NewState() + state := NewState("") state.In() state.Out() @@ -93,7 +93,7 @@ func TestState_Out(t *testing.T) { } func TestState_IsReady(t *testing.T) { - state := NewState() + state := NewState("") if state.IsReady() { t.Errorf("Expected state to not be ready") @@ -107,7 +107,7 @@ func TestState_IsReady(t *testing.T) { } func TestState_IsZero(t *testing.T) { - state := NewState() + state := NewState("") if !state.IsZero() { t.Errorf("Expected state to be zero") @@ -130,8 +130,8 @@ func TestMultiState_RegisterIsReady(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - state1 := NewState() - state2 := NewState() + state1 := NewState("") + state2 := NewState("") multiState := NewMultiState(state1, state2) multiState.RegisterIsReady(func() { @@ -150,8 +150,8 @@ func TestMultiState_RegisterIsZero(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - state1 := NewState() - state2 := NewState() + state1 := NewState("") + state2 := NewState("") multiState := NewMultiState(state1, state2) multiState.RegisterIsZero(func() { @@ -172,8 +172,8 @@ func TestMultiState_RegisterIsReadyAndIsZero(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - state1 := NewState() - state2 := NewState() + state1 := NewState("") + state2 := NewState("") multiState := NewMultiState(state1, state2) multiState.RegisterIsReadyAndIsZero(func() { @@ -193,8 +193,8 @@ func TestMultiState_RegisterIsReadyAndIsZero(t *testing.T) { } func TestMultiState_IsReady(t *testing.T) { - state1 := NewState() - state2 := NewState() + state1 := NewState("") + state2 := NewState("") multiState := NewMultiState(state1, state2) @@ -216,8 +216,8 @@ func TestMultiState_IsReady(t *testing.T) { } func TestMultiState_IsZero(t *testing.T) { - state1 := NewState() - state2 := NewState() + state1 := NewState("") + state2 := NewState("") multiState := NewMultiState(state1, state2) @@ -251,8 +251,8 @@ func TestMultiState_IsZero(t *testing.T) { } func TestMultiState_IsReadyAndIsZero(t *testing.T) { - state1 := NewState() - state2 := NewState() + state1 := NewState("") + state2 := NewState("") multiState := NewMultiState(state1, state2) diff --git a/pkg/stream.go b/pkg/stream.go new file mode 100644 index 0000000..8f7bcf8 --- /dev/null +++ b/pkg/stream.go @@ -0,0 +1,6 @@ +package pkg + +type Stream interface { + Register(name string, channel chan []byte) + Unregister(name string) +} diff --git a/web/ui/src/components/Log.vue b/web/ui/src/components/Log.vue new file mode 100644 index 0000000..c2ffd44 --- /dev/null +++ b/web/ui/src/components/Log.vue @@ -0,0 +1,68 @@ + + + + + diff --git a/web/ui/src/requests/api.js b/web/ui/src/requests/api.js index 22fb979..3bbe6af 100644 --- a/web/ui/src/requests/api.js +++ b/web/ui/src/requests/api.js @@ -16,6 +16,12 @@ const api = async () => { } } } + +const getLog = async data => { + const {host, config} = await api() + return new EventSource(`${host}/log?X-API-Key=${config.headers['X-API-Key']}&task_id=${data}`); +}; + const getUser = async data => { const {host, config} = await api() return axios.post(host + '/user', data, config); @@ -71,4 +77,17 @@ const getSpider = async data => { return axios.post(host + '/spider', data, config); }; -export {getUser, getNodes, getSpiders, getJobs, runJob, rerunJob, stopJob, getTasks, getRequests, getRecords, getSpider} \ No newline at end of file +export { + getLog, + getUser, + getNodes, + getSpiders, + getJobs, + runJob, + rerunJob, + stopJob, + getTasks, + getRequests, + getRecords, + getSpider +} \ No newline at end of file diff --git a/web/ui/src/views/HomeView.vue b/web/ui/src/views/HomeView.vue index 17e5e74..05891b4 100644 --- a/web/ui/src/views/HomeView.vue +++ b/web/ui/src/views/HomeView.vue @@ -44,7 +44,9 @@ if (checked1.value) { } const changeSwitch = () => { if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { clearInterval(interval) diff --git a/web/ui/src/views/JobsView.vue b/web/ui/src/views/JobsView.vue index ad9e6db..e69ea77 100644 --- a/web/ui/src/views/JobsView.vue +++ b/web/ui/src/views/JobsView.vue @@ -447,11 +447,13 @@ if (checked1.value) { interval = setInterval(refresh, 1000) } const changeSwitch = () => { + clearInterval(interval) if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { - clearInterval(interval) checked1Disable.value = false } } diff --git a/web/ui/src/views/NodesView.vue b/web/ui/src/views/NodesView.vue index fe99349..2c7edd3 100644 --- a/web/ui/src/views/NodesView.vue +++ b/web/ui/src/views/NodesView.vue @@ -319,7 +319,9 @@ if (checked1.value) { } const changeSwitch = () => { if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { clearInterval(interval) diff --git a/web/ui/src/views/RecordsView.vue b/web/ui/src/views/RecordsView.vue index a86f479..1df79c3 100644 --- a/web/ui/src/views/RecordsView.vue +++ b/web/ui/src/views/RecordsView.vue @@ -354,7 +354,9 @@ if (checked1.value) { } const changeSwitch = () => { if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { clearInterval(interval) diff --git a/web/ui/src/views/RequestsView.vue b/web/ui/src/views/RequestsView.vue index af3aa80..9a22bbf 100644 --- a/web/ui/src/views/RequestsView.vue +++ b/web/ui/src/views/RequestsView.vue @@ -356,7 +356,9 @@ if (checked1.value) { } const changeSwitch = () => { if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { clearInterval(interval) diff --git a/web/ui/src/views/SpidersView.vue b/web/ui/src/views/SpidersView.vue index 8e151d1..e680159 100644 --- a/web/ui/src/views/SpidersView.vue +++ b/web/ui/src/views/SpidersView.vue @@ -379,7 +379,9 @@ if (checked1.value) { } const changeSwitch = () => { if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { clearInterval(interval) diff --git a/web/ui/src/views/TasksView.vue b/web/ui/src/views/TasksView.vue index e62ced9..f13cc78 100644 --- a/web/ui/src/views/TasksView.vue +++ b/web/ui/src/views/TasksView.vue @@ -89,7 +89,7 @@ @@ -135,7 +137,7 @@ import { } from "@/stores/tasks"; import {formatDuration, formattedDate} from "@/utils/time"; import {sortBigInt, sortInt, sortStr} from "@/utils/sort"; -import {useNodesStore} from "@/stores/nodes"; +import Log from "@/components/Log.vue"; const filteredInfo = reactive({}); const {query} = useRoute(); @@ -309,8 +311,11 @@ const columns = computed(() => { const tasksStore = useTasksStore(); const open = ref(false); -const showDrawer = () => { + +const taskId = ref('') +const showDrawer = (record) => { open.value = true; + taskId.value = record.id }; const activeKey = ref('1'); @@ -327,7 +332,9 @@ if (checked1.value) { } const changeSwitch = () => { if (checked1.value) { - interval = setInterval(refresh, 1000) + if (!checked1Disable.value) { + interval = setInterval(refresh, 1000) + } checked1Disable.value = true } else { clearInterval(interval)