From 3710006be3347f6ce7ed42851746b58061666522 Mon Sep 17 00:00:00 2001 From: Li ZongYing Date: Tue, 21 Nov 2023 18:51:05 +0800 Subject: [PATCH] add request statstics --- README_CN.md | 4 - pkg/api/records.go | 11 +- pkg/api/requests.go | 34 +++ pkg/context.go | 5 + pkg/context/crawler.go | 1 + pkg/context/item.go | 12 +- pkg/context/request.go | 43 ++- pkg/context/spider.go | 1 + pkg/context/task.go | 2 +- pkg/crawler/crawler.go | 1 + pkg/downloader/downloader.go | 2 +- pkg/exporter/exporter.go | 10 +- pkg/item.go | 8 +- pkg/items/items.go | 5 - pkg/middlewares/middlewares.go | 2 +- pkg/middlewares/stats.go | 10 + pkg/pipelines/csv.go | 3 - pkg/pipelines/json_lines.go | 3 - pkg/pipelines/kafka.go | 3 - pkg/pipelines/mongo.go | 3 - pkg/pipelines/mysql.go | 3 - pkg/pipelines/none.go | 3 - pkg/pipelines/sqlite.go | 3 - pkg/request.go | 12 +- pkg/scheduler/kafka/requests.go | 75 ++--- pkg/scheduler/memory/requests.go | 89 +++--- pkg/scheduler/redis/requests.go | 90 +++--- pkg/scheduler/unimplemented_scheduler.go | 65 +++- pkg/signal.go | 4 +- pkg/signals/signal.go | 14 +- pkg/spider/task.go | 24 +- pkg/statistics.go | 4 + pkg/statistics/job/job.go | 14 +- pkg/statistics/record/record.go | 59 +++- pkg/statistics/request/request.go | 102 +++++++ pkg/statistics/spider/spider.go | 1 + pkg/statistics/statistics.go | 138 +++++++-- pkg/statistics/task/task.go | 5 +- pkg/task.go | 20 +- web/ui/src/App.vue | 11 +- web/ui/src/requests/api.js | 7 +- web/ui/src/router/index.js | 5 + web/ui/src/stores/requests.js | 25 ++ web/ui/src/views/RecordsView.vue | 5 - web/ui/src/views/RequestsView.vue | 370 +++++++++++++++++++++++ 45 files changed, 1022 insertions(+), 289 deletions(-) create mode 100644 pkg/api/requests.go create mode 100644 pkg/statistics/request/request.go create mode 100644 web/ui/src/stores/requests.js create mode 100644 web/ui/src/views/RequestsView.vue diff --git a/README_CN.md b/README_CN.md index 1fa03ca..41f5b12 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1109,10 +1109,6 @@ curl https://github.com/lizongying/go-crawler -x http://localhost:8082 --cacert * statistics * panic stop * extra速率限制 -* 没请求完,ctx退出 -* request with ctx -* stat->crawler -* test refer cookie * request context * item context * total diff --git a/pkg/api/records.go b/pkg/api/records.go index daec417..f1a74cd 100644 --- a/pkg/api/records.go +++ b/pkg/api/records.go @@ -18,17 +18,8 @@ func (h *RouteRecords) Pattern() string { return UrlRecords } -func (h *RouteRecords) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *RouteRecords) ServeHTTP(w http.ResponseWriter, _ *http.Request) { records := h.crawler.GetStatistics().GetRecords() - //for _, v := range nodes { - // fmt.Println(v) - // bs, err := v.Marshal() - // if err != nil { - // h.OutJson(w, 1, err.Error(), nil) - // return - // } - // - //} h.OutJson(w, 0, "", records) } diff --git a/pkg/api/requests.go b/pkg/api/requests.go new file mode 100644 index 0000000..5d00b16 --- /dev/null +++ b/pkg/api/requests.go @@ -0,0 +1,34 @@ +package api + +import ( + "github.com/lizongying/go-crawler/pkg" + "net/http" +) + +const UrlRequests = "/requests" + +type RouteRequests struct { + Request + Response + crawler pkg.Crawler + logger pkg.Logger +} + +func (h *RouteRequests) Pattern() string { + return UrlRequests +} + +func (h *RouteRequests) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + requests := h.crawler.GetStatistics().GetRequests() + h.OutJson(w, 0, "", requests) +} + +func (h *RouteRequests) FromCrawler(crawler pkg.Crawler) pkg.Route { + if h == nil { + return new(RouteRequests).FromCrawler(crawler) + } + + h.logger = crawler.GetLogger() + h.crawler = crawler + return h +} diff --git a/pkg/context.go b/pkg/context.go index 71be2a7..174c3ce 100644 --- a/pkg/context.go +++ b/pkg/context.go @@ -119,12 +119,15 @@ type ContextRequest interface { WithStartTime(time.Time) ContextRequest GetStopTime() time.Time WithStopTime(time.Time) ContextRequest + GetUpdateTime() time.Time GetDeadline() time.Time WithDeadline(time.Time) ContextRequest GetCookies() map[string]string WithCookies(map[string]string) ContextRequest GetReferrer() string WithReferrer(string) ContextRequest + GetStopReason() string + WithStopReason(stopReason string) ContextRequest } type ContextItem interface { @@ -139,4 +142,6 @@ type ContextItem interface { GetUpdateTime() time.Time GetSaved() bool WithSaved(bool) ContextItem + GetStopReason() string + WithStopReason(stopReason string) ContextItem } diff --git a/pkg/context/crawler.go b/pkg/context/crawler.go index c03736c..5731982 100644 --- a/pkg/context/crawler.go +++ b/pkg/context/crawler.go @@ -14,6 +14,7 @@ type Crawler struct { StartTime utils.Timestamp `json:"start_time,omitempty"` StopTime utils.Timestamp `json:"stop_time,omitempty"` UpdateTime utils.Timestamp `json:"update_time,omitempty"` + StopReason string `json:"stop_reason,omitempty"` } func (c *Crawler) GetId() string { diff --git a/pkg/context/item.go b/pkg/context/item.go index 41dd546..f835448 100644 --- a/pkg/context/item.go +++ b/pkg/context/item.go @@ -18,6 +18,7 @@ type Item struct { Cookies map[string]string `json:"cookies,omitempty"` Referrer string `json:"referrer,omitempty"` Saved bool `json:"saved,omitempty"` + StopReason string `json:"stop_reason,omitempty"` } func (c *Item) GetId() string { @@ -42,11 +43,11 @@ func (c *Item) WithStatus(status pkg.ItemStatus) pkg.ContextItem { t := time.Now() c.withUpdateTime(t) switch status { - case pkg.ItemStatusPending: + case pkg.ItemStatusRunning: c.withStartTime(t) case pkg.ItemStatusSuccess: c.withStopTime(t) - case pkg.ItemStatusError: + case pkg.ItemStatusFailure: c.withStopTime(t) } @@ -80,3 +81,10 @@ func (c *Item) WithSaved(saved bool) pkg.ContextItem { c.Saved = saved return c } +func (c *Item) GetStopReason() string { + return c.StopReason +} +func (c *Item) WithStopReason(stopReason string) pkg.ContextItem { + c.StopReason = stopReason + return c +} diff --git a/pkg/context/request.go b/pkg/context/request.go index 27e7050..aa32d55 100644 --- a/pkg/context/request.go +++ b/pkg/context/request.go @@ -8,14 +8,16 @@ import ( ) type Request struct { - Context context.Context `json:"-"` - Id string `json:"id,omitempty"` - Status pkg.RequestStatus `json:"status,omitempty"` - StartTime utils.Timestamp `json:"start_time,omitempty"` - StopTime utils.Timestamp `json:"stop_time,omitempty"` - Deadline utils.TimestampNano `json:"deadline,omitempty"` - Cookies map[string]string `json:"cookies,omitempty"` - Referrer string `json:"referrer,omitempty"` + Context context.Context `json:"-"` + Id string `json:"id,omitempty"` + Status pkg.RequestStatus `json:"status,omitempty"` + StartTime utils.Timestamp `json:"start_time,omitempty"` + StopTime utils.Timestamp `json:"stop_time,omitempty"` + UpdateTime utils.Timestamp `json:"update_time,omitempty"` + Deadline utils.TimestampNano `json:"deadline,omitempty"` + Cookies map[string]string `json:"cookies,omitempty"` + Referrer string `json:"referrer,omitempty"` + StopReason string `json:"stop_reason,omitempty"` } func (c *Request) GetId() string { @@ -37,6 +39,17 @@ func (c *Request) GetStatus() pkg.RequestStatus { } func (c *Request) WithStatus(status pkg.RequestStatus) pkg.ContextRequest { c.Status = status + t := time.Now() + c.withUpdateTime(t) + switch status { + case pkg.RequestStatusRunning: + c.WithStartTime(t) + case pkg.RequestStatusSuccess: + c.WithStopTime(t) + case pkg.RequestStatusFailure: + c.WithStopTime(t) + } + return c } func (c *Request) GetStartTime() time.Time { @@ -53,6 +66,13 @@ func (c *Request) WithStopTime(stopTime time.Time) pkg.ContextRequest { c.StopTime = utils.Timestamp{Time: stopTime} return c } +func (c *Request) GetUpdateTime() time.Time { + return c.UpdateTime.Time +} +func (c *Request) withUpdateTime(t time.Time) pkg.ContextRequest { + c.UpdateTime = utils.Timestamp{Time: t} + return c +} func (c *Request) GetDeadline() time.Time { return c.Deadline.Time } @@ -74,3 +94,10 @@ func (c *Request) WithReferrer(referrer string) pkg.ContextRequest { c.Referrer = referrer return c } +func (c *Request) GetStopReason() string { + return c.StopReason +} +func (c *Request) WithStopReason(stopReason string) pkg.ContextRequest { + c.StopReason = stopReason + return c +} diff --git a/pkg/context/spider.go b/pkg/context/spider.go index c9bb4d7..1c3bb2f 100644 --- a/pkg/context/spider.go +++ b/pkg/context/spider.go @@ -16,6 +16,7 @@ type Spider struct { StartTime utils.Timestamp `json:"start_time,omitempty"` StopTime utils.Timestamp `json:"stop_time,omitempty"` UpdateTime utils.Timestamp `json:"update_time,omitempty"` + StopReason string `json:"stop_reason,omitempty"` } func (c *Spider) GetSpider() pkg.Spider { diff --git a/pkg/context/task.go b/pkg/context/task.go index 98cac4b..5dab083 100644 --- a/pkg/context/task.go +++ b/pkg/context/task.go @@ -68,7 +68,7 @@ func (c *Task) WithStatus(status pkg.TaskStatus) pkg.ContextTask { c.WithStartTime(t) case pkg.TaskStatusSuccess: c.WithStopTime(t) - case pkg.TaskStatusError: + case pkg.TaskStatusFailure: c.WithStopTime(t) } diff --git a/pkg/crawler/crawler.go b/pkg/crawler/crawler.go index b274dab..f63c1cf 100644 --- a/pkg/crawler/crawler.go +++ b/pkg/crawler/crawler.go @@ -367,6 +367,7 @@ func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logge httpApi.AddRoutes(new(api.RouteSpiders).FromCrawler(crawler)) httpApi.AddRoutes(new(api.RouteJobs).FromCrawler(crawler)) httpApi.AddRoutes(new(api.RouteTasks).FromCrawler(crawler)) + httpApi.AddRoutes(new(api.RouteRequests).FromCrawler(crawler)) httpApi.AddRoutes(new(api.RouteRecords).FromCrawler(crawler)) httpApi.AddRoutes(new(api.RouteUser).FromCrawler(crawler)) diff --git a/pkg/downloader/downloader.go b/pkg/downloader/downloader.go index 46bfddd..b96a471 100644 --- a/pkg/downloader/downloader.go +++ b/pkg/downloader/downloader.go @@ -132,10 +132,10 @@ func (d *Downloader) FromSpider(spider pkg.Spider) pkg.Downloader { } d.spider = spider + d.logger = spider.GetLogger() d.httpClient = new(http_client.HttpClient).FromSpider(spider) d.browserManager = new(browser.Manager).FromSpider(spider) d.middlewares = new(middlewares.Middlewares).FromSpider(spider) - d.logger = spider.GetLogger() spider.GetCrawler().GetSignal().RegisterSpiderChanged(d.spiderClosed) return d diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go index 44de73d..5370fd1 100644 --- a/pkg/exporter/exporter.go +++ b/pkg/exporter/exporter.go @@ -54,9 +54,13 @@ func (e *Exporter) SetPipeline(pipeline pkg.Pipeline, order uint8) { pipeline.SetName(name) pipeline.SetOrder(order) for k, v := range e.pipelines { - if v.Name() == name && v.Order() != order { - e.DelPipeline(k) - break + if v.Name() == name { + if v.Order() == order { + return + } else { + e.DelPipeline(k) + break + } } } diff --git a/pkg/item.go b/pkg/item.go index 301d607..b1d289f 100644 --- a/pkg/item.go +++ b/pkg/item.go @@ -185,9 +185,9 @@ type ItemStatus uint8 const ( ItemStatusUnknown = iota ItemStatusPending - ItemStatusDoing + ItemStatusRunning ItemStatusSuccess - ItemStatusError + ItemStatusFailure ) func (s *ItemStatus) String() string { @@ -195,11 +195,11 @@ func (s *ItemStatus) String() string { case 1: return "pending" case 2: - return "doing" + return "running" case 3: return "success" case 4: - return "error" + return "failure" default: return "unknown" } diff --git a/pkg/items/items.go b/pkg/items/items.go index a92dd2d..6d77c97 100644 --- a/pkg/items/items.go +++ b/pkg/items/items.go @@ -5,11 +5,6 @@ import ( "github.com/lizongying/go-crawler/pkg" ) -type ItemWithContext struct { - pkg.Context - pkg.Item -} - type ItemNone struct { pkg.ItemUnimplemented } diff --git a/pkg/middlewares/middlewares.go b/pkg/middlewares/middlewares.go index d3e3f32..6981caf 100644 --- a/pkg/middlewares/middlewares.go +++ b/pkg/middlewares/middlewares.go @@ -38,7 +38,7 @@ func (m *Middlewares) SetMiddleware(middleware pkg.Middleware, order uint8) { middleware.SetName(name) middleware.SetOrder(order) for k, v := range m.middlewares { - if v.Name() == name && v.Order() != order { + if v.Name() == name { m.DelMiddleware(k) break } diff --git a/pkg/middlewares/stats.go b/pkg/middlewares/stats.go index 432ab05..1f823ba 100644 --- a/pkg/middlewares/stats.go +++ b/pkg/middlewares/stats.go @@ -3,6 +3,7 @@ package middlewares import ( "fmt" "github.com/lizongying/go-crawler/pkg" + "github.com/lizongying/go-crawler/pkg/utils" "net/http" "sort" ) @@ -14,6 +15,15 @@ type StatsMiddleware struct { func (m *StatsMiddleware) taskStopped(c pkg.Context) (err error) { task := c.GetTask() + if c.GetSpider().GetId() != m.GetSpider().GetContext().GetSpider().GetId() { + return + } + if !utils.InSlice(task.GetStatus(), []pkg.TaskStatus{ + pkg.TaskStatusSuccess, + pkg.TaskStatusFailure, + }) { + return + } var sl []any sl = append(sl, c.GetSpider().GetName(), c.GetTask().GetId()) diff --git a/pkg/pipelines/csv.go b/pkg/pipelines/csv.go index 71d4974..5d0dba0 100644 --- a/pkg/pipelines/csv.go +++ b/pkg/pipelines/csv.go @@ -21,7 +21,6 @@ type CsvPipeline struct { } func (m *CsvPipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { err = errors.New("nil item") @@ -149,8 +148,6 @@ func (m *CsvPipeline) ProcessItem(item pkg.Item) (err error) { } m.logger.Info("item saved:", filename) - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/pipelines/json_lines.go b/pkg/pipelines/json_lines.go index 33d9686..79241db 100644 --- a/pkg/pipelines/json_lines.go +++ b/pkg/pipelines/json_lines.go @@ -19,7 +19,6 @@ type JsonLinesPipeline struct { } func (m *JsonLinesPipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { err = errors.New("nil item") @@ -106,8 +105,6 @@ func (m *JsonLinesPipeline) ProcessItem(item pkg.Item) (err error) { } m.logger.Info("item saved:", filename) - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/pipelines/kafka.go b/pkg/pipelines/kafka.go index 2851fa8..722ebcc 100644 --- a/pkg/pipelines/kafka.go +++ b/pkg/pipelines/kafka.go @@ -20,7 +20,6 @@ type KafkaPipeline struct { } func (m *KafkaPipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { @@ -91,8 +90,6 @@ func (m *KafkaPipeline) ProcessItem(item pkg.Item) (err error) { return } - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/pipelines/mongo.go b/pkg/pipelines/mongo.go index 1aa51c5..2346ebb 100644 --- a/pkg/pipelines/mongo.go +++ b/pkg/pipelines/mongo.go @@ -20,7 +20,6 @@ type MongoPipeline struct { } func (m *MongoPipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { @@ -92,8 +91,6 @@ func (m *MongoPipeline) ProcessItem(item pkg.Item) (err error) { return } - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/pipelines/mysql.go b/pkg/pipelines/mysql.go index a63bd86..3c6972d 100644 --- a/pkg/pipelines/mysql.go +++ b/pkg/pipelines/mysql.go @@ -22,7 +22,6 @@ type MysqlPipeline struct { } func (m *MysqlPipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { @@ -142,8 +141,6 @@ func (m *MysqlPipeline) ProcessItem(item pkg.Item) (err error) { m.logger.Info(itemMysql.GetTable(), "insert success", id) } - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/pipelines/none.go b/pkg/pipelines/none.go index a6db1d8..a8ecf17 100644 --- a/pkg/pipelines/none.go +++ b/pkg/pipelines/none.go @@ -11,7 +11,6 @@ type NonePipeline struct { } func (m *NonePipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { @@ -33,8 +32,6 @@ func (m *NonePipeline) ProcessItem(item pkg.Item) (err error) { return } - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/pipelines/sqlite.go b/pkg/pipelines/sqlite.go index 3f498da..4a9f74a 100644 --- a/pkg/pipelines/sqlite.go +++ b/pkg/pipelines/sqlite.go @@ -22,7 +22,6 @@ type SqlitePipeline struct { } func (m *SqlitePipeline) ProcessItem(item pkg.Item) (err error) { - spider := m.Spider() task := item.GetContext().GetTask() if item == nil { @@ -146,8 +145,6 @@ func (m *SqlitePipeline) ProcessItem(item pkg.Item) (err error) { m.logger.Info(itemSqlite.GetTable(), "insert success", id) } - item.GetContext().GetItem().WithStatus(pkg.ItemStatusSuccess) - spider.GetCrawler().GetSignal().ItemChanged(item) task.IncItemSuccess() return } diff --git a/pkg/request.go b/pkg/request.go index 3c1c23b..b90c294 100644 --- a/pkg/request.go +++ b/pkg/request.go @@ -168,21 +168,21 @@ type RequestStatus uint8 const ( RequestStatusUnknown = iota RequestStatusPending - RequestStatusDoing + RequestStatusRunning RequestStatusSuccess - RequestStatusError + RequestStatusFailure ) -func (s *RequestStatus) String() string { - switch *s { +func (s RequestStatus) String() string { + switch s { case 1: return "pending" case 2: - return "doing" + return "running" case 3: return "success" case 4: - return "error" + return "failure" default: return "unknown" } diff --git a/pkg/scheduler/kafka/requests.go b/pkg/scheduler/kafka/requests.go index efbad30..17fc10d 100644 --- a/pkg/scheduler/kafka/requests.go +++ b/pkg/scheduler/kafka/requests.go @@ -17,31 +17,6 @@ import ( "time" ) -func (s *Scheduler) Request(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) { - if request == nil { - err = errors.New("nil request") - return - } - - s.logger.Debugf("request: %+v", request) - - response, err = s.spider.Download(ctx, request) - if err != nil { - if errors.Is(err, pkg.ErrIgnoreRequest) { - s.logger.Info(err) - err = nil - return - } - - s.HandleError(ctx, response, err, request.GetErrBack()) - return - } - - s.logger.Debugf("request %+v", request) - ctx.GetTask().ReadyRequest() - return -} - func (s *Scheduler) handleRequest(ctx pkg.Context) { slot := "*" value, _ := s.spider.RequestSlotLoad(slot) @@ -71,7 +46,7 @@ out: continue } - c := request.Context + ctx = request.Context s.logger.Debugf("request: %+v", request) if err != nil { s.logger.Warn(err) @@ -96,14 +71,19 @@ out: requestSlot = slotValue.(*rate.Limiter) - err = requestSlot.Wait(ctx.GetTask().GetContext()) - if err != nil { + if err = requestSlot.Wait(ctx.GetTask().GetContext()); err != nil { s.logger.Error(err) } + ctx.GetRequest().WithStatus(pkg.RequestStatusRunning) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestRunning(ctx, nil) go func(c pkg.Context, request pkg.Request) { - response, e := s.Request(c, request) - if e != nil { - s.task.StopRequest() + var response pkg.Response + response, err = s.Request(c, request) + if err != nil { + ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, err) return } @@ -121,10 +101,16 @@ out: if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil { s.logger.Error(err) s.HandleError(ctx, response, err, request.GetErrBack()) + ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, err) + return } - s.task.StopRequest() + ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, nil) }(c, response) - }(c, request) + }(ctx, request) } } } @@ -148,7 +134,7 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro } } - c := new(crawlerContext.Context). + ctx = new(crawlerContext.Context). WithCrawler(ctx.GetCrawler()). WithSpider(ctx.GetSpider()). WithJob(ctx.GetJob()). @@ -156,16 +142,16 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro WithRequest(new(crawlerContext.Request). WithContext(context.Background()). WithId(s.crawler.NextId()). - WithStatus(pkg.RequestStatusPending). - WithStartTime(time.Now())) - s.crawler.GetSignal().RequestChanged(c) + WithStatus(pkg.RequestStatusPending)) - request.WithContext(c) + request.WithContext(ctx) bs, err := request.Marshal() s.logger.Info("request with context:", string(bs)) if err != nil { s.logger.Error(err) + s.crawler.GetSignal().RequestChanged(request) + ctx.GetTask().RequestPending(ctx, err) return } @@ -176,10 +162,13 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro Value: bs, }); err != nil { s.logger.Error(err) + s.crawler.GetSignal().RequestChanged(request) + ctx.GetTask().RequestPending(ctx, err) return } - ctx.GetTask().StartRequest() + s.crawler.GetSignal().RequestChanged(request) + ctx.GetTask().RequestPending(ctx, nil) return } @@ -195,6 +184,7 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { bs, err := json.Marshal(extra) if err != nil { s.logger.Error(err) + c.GetTask().RequestPending(c, err) return } @@ -217,16 +207,17 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { Value: bs, }); err != nil { s.logger.Error(err) + c.GetTask().RequestPending(c, err) return } - c.GetTask().StartRequest() + c.GetTask().RequestPending(c, nil) return } -func (s *Scheduler) GetExtra(_ pkg.Context, extra any) (err error) { +func (s *Scheduler) GetExtra(c pkg.Context, extra any) (err error) { defer func() { - s.task.StopRequest() + s.task.RequestStopped(c, nil) }() extraValue := reflect.ValueOf(extra) diff --git a/pkg/scheduler/memory/requests.go b/pkg/scheduler/memory/requests.go index fed7417..b5fac23 100644 --- a/pkg/scheduler/memory/requests.go +++ b/pkg/scheduler/memory/requests.go @@ -8,35 +8,9 @@ import ( "golang.org/x/time/rate" "net/http" "reflect" - "runtime" "time" ) -func (s *Scheduler) Request(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) { - if request == nil { - err = errors.New("nil request") - return - } - - s.logger.Debugf("request: %+v", request) - - response, err = s.spider.Download(ctx, request) - if err != nil { - if errors.Is(err, pkg.ErrIgnoreRequest) { - s.logger.Info(err) - err = nil - return - } - - s.HandleError(ctx, response, err, request.GetErrBack()) - return - } - - s.logger.Debugf("request %+v", request) - ctx.GetTask().ReadyRequest() - return -} - func (s *Scheduler) handleRequest(ctx pkg.Context) { slot := "*" value, _ := s.spider.RequestSlotLoad(slot) @@ -49,6 +23,7 @@ out: s.logger.Error(ctx.GetTask().GetContext().Err()) break out default: + ctx = request.GetContext() slot = request.GetSlot() if slot == "" { slot = "*" @@ -68,35 +43,47 @@ out: requestSlot = slotValue.(*rate.Limiter) - err := requestSlot.Wait(ctx.GetTask().GetContext()) - if err != nil { + if err := requestSlot.Wait(ctx.GetTask().GetContext()); err != nil { s.logger.Error(err, time.Now(), ctx) } + ctx.GetRequest().WithStatus(pkg.RequestStatusRunning) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestRunning(ctx, nil) go func(request pkg.Request) { c := request.GetContext() - - response, e := s.Request(c, request.GetRequest()) - if e != nil { - s.task.StopRequest() + var err error + + var response pkg.Response + response, err = s.Request(c, request.GetRequest()) + if err != nil { + ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, err) return } go func(ctx pkg.Context, response pkg.Response) { defer func() { - if r := recover(); r != nil { - buf := make([]byte, 1<<16) - runtime.Stack(buf, true) - err = errors.New(string(buf)) - //s.logger.Error(err) - s.HandleError(ctx, response, err, request.GetErrBack()) - } + //if r := recover(); r != nil { + // buf := make([]byte, 1<<16) + // runtime.Stack(buf, true) + // err = errors.New(string(buf)) + // //s.logger.Error(err) + // s.HandleError(ctx, response, err, request.GetErrBack()) + //} }() if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil { s.logger.Error(err) s.HandleError(ctx, response, err, request.GetErrBack()) + ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, err) + return } - s.task.StopRequest() + ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, nil) }(c, response) }(request) } @@ -130,7 +117,7 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro } } - c := new(crawlerContext.Context). + ctx = new(crawlerContext.Context). WithCrawler(ctx.GetCrawler()). WithSpider(ctx.GetSpider()). WithJob(ctx.GetJob()). @@ -138,14 +125,12 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro WithRequest(new(crawlerContext.Request). WithContext(context.Background()). WithId(s.crawler.NextId()). - WithStatus(pkg.RequestStatusPending). - WithStartTime(time.Now())) - s.crawler.GetSignal().RequestChanged(c) + WithStatus(pkg.RequestStatusPending)) - request.WithContext(c) + request.WithContext(ctx) + s.crawler.GetSignal().RequestChanged(request) + ctx.GetTask().RequestPending(ctx, nil) s.requestChan <- request - - ctx.GetTask().StartRequest() return } @@ -166,13 +151,13 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { extraChan.(chan any) <- extra } - c.GetTask().StartRequest() + c.GetTask().RequestPending(c, nil) return } -func (s *Scheduler) GetExtra(_ pkg.Context, extra any) (err error) { +func (s *Scheduler) GetExtra(ctx pkg.Context, extra any) (err error) { defer func() { - s.task.StopRequest() + s.task.RequestStopped(ctx, nil) }() extraValue := reflect.ValueOf(extra) @@ -183,7 +168,7 @@ func (s *Scheduler) GetExtra(_ pkg.Context, extra any) (err error) { name := extraValue.Elem().Type().Name() - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.CloseReasonQueueTimeout())*time.Second) + c, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.CloseReasonQueueTimeout())*time.Second) defer cancel() resultChan := make(chan struct{}) @@ -198,7 +183,7 @@ func (s *Scheduler) GetExtra(_ pkg.Context, extra any) (err error) { select { case <-resultChan: return - case <-ctx.Done(): + case <-c.Done(): close(resultChan) err = pkg.ErrQueueTimeout return diff --git a/pkg/scheduler/redis/requests.go b/pkg/scheduler/redis/requests.go index c94062d..3b682c9 100644 --- a/pkg/scheduler/redis/requests.go +++ b/pkg/scheduler/redis/requests.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/lizongying/go-crawler/pkg" + crawlerContext "github.com/lizongying/go-crawler/pkg/context" request2 "github.com/lizongying/go-crawler/pkg/request" "github.com/redis/go-redis/v9" "golang.org/x/time/rate" @@ -15,31 +16,6 @@ import ( "time" ) -func (s *Scheduler) Request(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) { - if request == nil { - err = errors.New("nil request") - return - } - - s.logger.Debugf("request: %+v", request) - - response, err = s.spider.Download(ctx, request) - if err != nil { - if errors.Is(err, pkg.ErrIgnoreRequest) { - s.logger.Info(err) - err = nil - return - } - - s.HandleError(ctx, response, err, request.GetErrBack()) - return - } - - s.logger.Debugf("request %+v", request) - ctx.GetTask().ReadyRequest() - return -} - func (s *Scheduler) handleRequest(ctx pkg.Context) { slot := "*" value, _ := s.spider.RequestSlotLoad(slot) @@ -93,8 +69,7 @@ out: req = r[1] } - err = s.redis.ZRem(ctx.GetTask().GetContext(), s.requestKey, req).Err() - if err != nil { + if err = s.redis.ZRem(ctx.GetTask().GetContext(), s.requestKey, req).Err(); err != nil { s.logger.Warn(err) continue } @@ -106,7 +81,7 @@ out: continue } - c := request.Context + ctx = request.Context s.logger.Debugf("request: %+v", request) if err != nil { s.logger.Warn(err) @@ -131,14 +106,19 @@ out: requestSlot = slotValue.(*rate.Limiter) - err = requestSlot.Wait(ctx.GetTask().GetContext()) - if err != nil { + if err = requestSlot.Wait(ctx.GetTask().GetContext()); err != nil { s.logger.Error(err) } + ctx.GetRequest().WithStatus(pkg.RequestStatusRunning) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestRunning(ctx, nil) go func(c pkg.Context, request pkg.Request) { - response, e := s.Request(c, request) - if e != nil { - s.task.StopRequest() + var response pkg.Response + response, err = s.Request(c, request) + if err != nil { + ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, err) return } @@ -157,10 +137,16 @@ out: if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil { s.logger.Error(err) s.HandleError(ctx, response, err, request.GetErrBack()) + ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, err) + return } - s.task.StopRequest() + ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess) + s.crawler.GetSignal().RequestChanged(request) + s.task.RequestStopped(ctx, nil) }(c, response) - }(c, request) + }(ctx, request) } } } @@ -202,7 +188,18 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro } } + ctx = new(crawlerContext.Context). + WithCrawler(ctx.GetCrawler()). + WithSpider(ctx.GetSpider()). + WithJob(ctx.GetJob()). + WithTask(ctx.GetTask()). + WithRequest(new(crawlerContext.Request). + WithContext(context.Background()). + WithId(s.crawler.NextId()). + WithStatus(pkg.RequestStatusPending)) + request.WithContext(ctx) + bs, err := request.Marshal() if err != nil { s.logger.Error(err) @@ -223,18 +220,14 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro var res int64 res, err = s.redis.ZAdd(c, s.requestKey, z).Result() if res == 1 { - ctx.GetTask().StartRequest() + s.crawler.GetSignal().RequestChanged(request) + ctx.GetTask().RequestPending(ctx, err) } } else { err = s.redis.RPush(c, s.requestKey, bs).Err() - ctx.GetTask().StartRequest() + s.crawler.GetSignal().RequestChanged(request) + ctx.GetTask().RequestPending(ctx, err) } - - if err != nil { - s.logger.Error(err) - return - } - return } @@ -252,6 +245,7 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { bs, err := json.Marshal(extra) if err != nil { s.logger.Error(err) + c.GetTask().RequestPending(c, err) return } @@ -269,28 +263,30 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { res, err = s.redis.ZAdd(ctx, extraKey, z).Result() if err != nil { s.logger.Error(err) + c.GetTask().RequestPending(c, err) return } if res == 1 { - c.GetTask().StartRequest() + c.GetTask().RequestRunning(c, nil) } } else { extraKey := fmt.Sprintf("%s:%s:extra:%s", s.config.GetBotName(), name, spider.Name()) if err = s.redis.RPush(ctx, extraKey, bs).Err(); err != nil { s.logger.Error(err) + c.GetTask().RequestPending(c, err) return } - c.GetTask().StartRequest() + c.GetTask().RequestPending(c, nil) } return } -func (s *Scheduler) GetExtra(_ pkg.Context, extra any) (err error) { +func (s *Scheduler) GetExtra(c pkg.Context, extra any) (err error) { defer func() { - s.task.StopRequest() + s.task.RequestStopped(c, nil) }() extraValue := reflect.ValueOf(extra) diff --git a/pkg/scheduler/unimplemented_scheduler.go b/pkg/scheduler/unimplemented_scheduler.go index f927f26..c746a86 100644 --- a/pkg/scheduler/unimplemented_scheduler.go +++ b/pkg/scheduler/unimplemented_scheduler.go @@ -1,7 +1,6 @@ package scheduler import ( - "context" "errors" "github.com/lizongying/go-crawler/pkg" crawlerContext "github.com/lizongying/go-crawler/pkg/context" @@ -42,7 +41,13 @@ out: for item := range s.itemChan { select { case <-ctx.GetTask().GetContext().Done(): - s.logger.Error(ctx.GetTask().GetContext().Err()) + err := ctx.GetTask().GetContext().Err() + s.logger.Error(err) + item.GetContext().GetItem(). + WithStatus(pkg.ItemStatusFailure). + WithStopReason(err.Error()) + s.crawler.GetSignal().ItemChanged(item) + s.task.ItemPending(item.GetContext(), nil) break out default: itemDelay := s.crawler.GetItemDelay() @@ -55,12 +60,27 @@ out: go func(item pkg.Item) { defer func() { s.crawler.ItemConcurrencyChan() <- struct{}{} - s.task.StopItem() }() - if err := s.spider.Export(item); err != nil { + contextItem := item.GetContext().GetItem() + + contextItem. + WithStatus(pkg.ItemStatusRunning) + s.crawler.GetSignal().ItemChanged(item) + s.task.ItemRunning(item.GetContext(), nil) + + err := s.spider.Export(item) + if err != nil { s.logger.Error(err) + contextItem. + WithStatus(pkg.ItemStatusFailure). + WithStopReason(err.Error()) + } else { + contextItem. + WithStatus(pkg.ItemStatusSuccess) } + s.crawler.GetSignal().ItemChanged(item) + s.task.ItemStopped(item.GetContext(), err) }(item) if itemDelay > 0 { @@ -104,15 +124,14 @@ func (s *UnimplementedScheduler) YieldItem(ctx pkg.Context, item pkg.Item) (err WithJob(ctx.GetJob()). WithTask(ctx.GetTask()). WithItem(new(crawlerContext.Item). - WithContext(context.Background()). + WithContext(ctx.GetTask().GetContext()). WithId(s.crawler.NextId()). WithStatus(pkg.ItemStatusPending)) - //s.crawler.GetSignal().ItemChanged(c) item.WithContext(c) + s.crawler.GetSignal().ItemChanged(item) + s.task.ItemPending(c, nil) s.itemChan <- item - - s.task.StartItem() return } func (s *UnimplementedScheduler) HandleError(ctx pkg.Context, response pkg.Response, err error, errBackName string) { @@ -131,3 +150,33 @@ func (s *UnimplementedScheduler) HandleError(ctx pkg.Context, response pkg.Respo s.spider.ErrBack(errBackName)(ctx, response, err) ctx.GetTask().IncRequestError() } +func (s *UnimplementedScheduler) SyncRequest(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) { + if request == nil { + err = errors.New("nil request") + return + } + + s.logger.Debugf("request: %+v", request) + + response, err = s.spider.Download(ctx, request) + if err != nil { + if errors.Is(err, pkg.ErrIgnoreRequest) { + s.logger.Info(err) + err = nil + return + } + + s.HandleError(ctx, response, err, request.GetErrBack()) + return + } + + s.logger.Debugf("request %+v", request) + return +} +func (s *UnimplementedScheduler) Request(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) { + ctx.GetTask().RequestPending(ctx, nil) + ctx.GetTask().RequestRunning(ctx, nil) + response, err = s.SyncRequest(ctx, request) + s.task.RequestStopped(ctx, err) + return +} diff --git a/pkg/signal.go b/pkg/signal.go index 4d30c26..d0ea9d4 100644 --- a/pkg/signal.go +++ b/pkg/signal.go @@ -4,7 +4,7 @@ type FnCrawlerChanged func(Context) error type FnSpiderChanged func(Context) error type FnJobChanged func(Context) error type FnTaskChanged func(Context) error -type FnRequestChanged func(Context) error +type FnRequestChanged func(Request) error type FnItemChanged func(Item) error type Signal interface { @@ -18,7 +18,7 @@ type Signal interface { SpiderChanged(Context) JobChanged(Context) TaskChanged(Context) - RequestChanged(Context) + RequestChanged(Request) ItemChanged(Item) FromCrawler(crawler Crawler) Signal } diff --git a/pkg/signals/signal.go b/pkg/signals/signal.go index 45dab4c..1f3639d 100644 --- a/pkg/signals/signal.go +++ b/pkg/signals/signal.go @@ -40,32 +40,32 @@ func (s *Signal) RegisterItemChanged(fn pkg.FnItemChanged) { } func (s *Signal) CrawlerChanged(ctx pkg.Context) { for _, v := range s.crawlerChanged { - v(ctx) + _ = v(ctx) } } func (s *Signal) SpiderChanged(ctx pkg.Context) { for _, v := range s.spiderChanged { - v(ctx) + _ = v(ctx) } } func (s *Signal) JobChanged(ctx pkg.Context) { for _, v := range s.jobChanged { - v(ctx) + _ = v(ctx) } } func (s *Signal) TaskChanged(ctx pkg.Context) { for _, v := range s.taskChanged { - v(ctx) + _ = v(ctx) } } -func (s *Signal) RequestChanged(ctx pkg.Context) { +func (s *Signal) RequestChanged(ctx pkg.Request) { for _, v := range s.requestChanged { - v(ctx) + _ = v(ctx) } } func (s *Signal) ItemChanged(item pkg.Item) { for _, v := range s.itemChanged { - v(item) + _ = v(item) } } func (s *Signal) FromCrawler(crawler pkg.Crawler) pkg.Signal { diff --git a/pkg/spider/task.go b/pkg/spider/task.go index 6c0eb69..f2349d7 100644 --- a/pkg/spider/task.go +++ b/pkg/spider/task.go @@ -54,7 +54,7 @@ func (t *Task) start(ctx pkg.Context) (id string, err error) { WithJobSubId(ctx.GetJob().GetSubId()). WithStatus(pkg.TaskStatusPending). WithStartTime(time.Now()). - WithStats(&stats.MediaStats{}))) + WithStats(new(stats.MediaStats)))) t.crawler.GetSignal().TaskChanged(t.context) t.logger.Info(t.spider.Name(), id, "task started") @@ -121,7 +121,7 @@ func (t *Task) stop(err error) { if err != nil { t.context.GetTask().WithStopReason(err.Error()) - t.context.GetTask().WithStatus(pkg.TaskStatusError) + t.context.GetTask().WithStatus(pkg.TaskStatusFailure) } else { t.context.GetTask().WithStatus(pkg.TaskStatusSuccess) } @@ -129,22 +129,30 @@ func (t *Task) stop(err error) { t.job.TaskStopped(t.context, err) return } -func (t *Task) ReadyRequest() { +func (t *Task) RequestPending(_ pkg.Context, _ error) { t.request.BeReady() } -func (t *Task) StartRequest() { +func (t *Task) RequestRunning(_ pkg.Context, err error) { + if err != nil { + return + } t.request.In() } -func (t *Task) StopRequest() { +func (t *Task) RequestStopped(_ pkg.Context, _ error) { t.request.Out() } -func (t *Task) ReadyItem() { +func (t *Task) ItemPending(_ pkg.Context, _ error) { t.item.BeReady() } -func (t *Task) StartItem() { +func (t *Task) ItemRunning(ctx pkg.Context, err error) { + if err != nil { + return + } t.item.In() } -func (t *Task) StopItem() { +func (t *Task) ItemStopped(_ pkg.Context, _ error) { + //item := ctx.GetItem() + //item.WithContext() t.item.Out() } func (t *Task) WithJob(job *Job) *Task { diff --git a/pkg/statistics.go b/pkg/statistics.go index 6e6ba16..bb4f4ce 100644 --- a/pkg/statistics.go +++ b/pkg/statistics.go @@ -76,6 +76,9 @@ type StatisticsTask interface { WithStopReason(stopReason string) StatisticsTask Marshal() (bytes []byte, err error) } +type StatisticsRequest interface { + Marshal() (bytes []byte, err error) +} type StatisticsRecord interface { Marshal() (bytes []byte, err error) } @@ -84,5 +87,6 @@ type Statistics interface { GetSpiders() []StatisticsSpider GetJobs() []StatisticsJob GetTasks() []StatisticsTask + GetRequests() []StatisticsRequest GetRecords() []StatisticsRecord } diff --git a/pkg/statistics/job/job.go b/pkg/statistics/job/job.go index 60527e2..2df0399 100644 --- a/pkg/statistics/job/job.go +++ b/pkg/statistics/job/job.go @@ -109,13 +109,6 @@ func (s *Job) withUpdateTime(t time.Time) pkg.StatisticsJob { } return s } -func (s *Job) Marshal() (bytes []byte, err error) { - bytes, err = json.Marshal(s) - if err != nil { - return - } - return -} func (s *Job) GetStopReason() string { return s.StopReason } @@ -123,3 +116,10 @@ func (s *Job) WithStopReason(stopReason string) pkg.StatisticsJob { s.StopReason = stopReason return s } +func (s *Job) Marshal() (bytes []byte, err error) { + bytes, err = json.Marshal(s) + if err != nil { + return + } + return +} diff --git a/pkg/statistics/record/record.go b/pkg/statistics/record/record.go index 269ac6f..16c28ee 100644 --- a/pkg/statistics/record/record.go +++ b/pkg/statistics/record/record.go @@ -2,20 +2,27 @@ package record import ( "encoding/json" + "github.com/lizongying/go-crawler/pkg" + "github.com/lizongying/go-crawler/pkg/queue" "github.com/lizongying/go-crawler/pkg/utils" "time" ) type Record struct { - Id string `json:"id,omitempty"` - UniqueKey string `json:"unique_key,omitempty"` - Node string `json:"node,omitempty"` - Spider string `json:"spider,omitempty"` - Job string `json:"job,omitempty"` - Task string `json:"task,omitempty"` - Meta string `json:"meta,omitempty"` - Data string `json:"data,omitempty"` - SaveTime utils.Timestamp `json:"save_time,omitempty"` + Id string `json:"id,omitempty"` + UniqueKey string `json:"unique_key,omitempty"` + Node string `json:"node,omitempty"` + Spider string `json:"spider,omitempty"` + Job string `json:"job,omitempty"` + Task string `json:"task,omitempty"` + Meta string `json:"meta,omitempty"` + Data string `json:"data,omitempty"` + Status pkg.ItemStatus `json:"status,omitempty"` + StartTime utils.Timestamp `json:"start_time,omitempty"` + FinishTime utils.Timestamp `json:"finish_time,omitempty"` + UpdateTime utils.Timestamp `json:"update_time,omitempty"` + StatusList *queue.PriorityQueue `json:"status_list,omitempty"` + StopReason string `json:"stop_reason,omitempty"` } func (r *Record) WithId(id string) *Record { @@ -50,12 +57,42 @@ func (r *Record) WithData(data string) *Record { r.Data = data return r } -func (r *Record) WithSaveTime(t time.Time) *Record { - r.SaveTime = utils.Timestamp{ +func (r *Record) GetStatus() pkg.ItemStatus { + return r.Status +} +func (r *Record) WithStatus(status pkg.ItemStatus) *Record { + r.Status = status + return r +} +func (r *Record) WithStartTime(t time.Time) *Record { + r.StartTime = utils.Timestamp{ + Time: t, + } + return r +} +func (r *Record) WithFinishTime(t time.Time) *Record { + r.FinishTime = utils.Timestamp{ + Time: t, + } + return r +} +func (r *Record) WithUpdateTime(t time.Time) *Record { + r.UpdateTime = utils.Timestamp{ Time: t, } return r } +func (r *Record) AddStatusList(status pkg.SpiderStatus, t time.Time) *Record { + if r.StatusList == nil { + r.StatusList = queue.NewPriorityQueue(10) + } + r.StatusList.Push(queue.NewItem(status, t.UnixNano())) + return r +} +func (r *Record) WithStopReason(stopReason string) *Record { + r.StopReason = stopReason + return r +} func (r *Record) Marshal() (bytes []byte, err error) { bytes, err = json.Marshal(r) if err != nil { diff --git a/pkg/statistics/request/request.go b/pkg/statistics/request/request.go new file mode 100644 index 0000000..c814c53 --- /dev/null +++ b/pkg/statistics/request/request.go @@ -0,0 +1,102 @@ +package request + +import ( + "encoding/json" + "github.com/lizongying/go-crawler/pkg" + "github.com/lizongying/go-crawler/pkg/queue" + "github.com/lizongying/go-crawler/pkg/utils" + "time" +) + +type Request struct { + Id string `json:"id,omitempty"` + UniqueKey string `json:"unique_key,omitempty"` + Node string `json:"node,omitempty"` + Spider string `json:"spider,omitempty"` + Job string `json:"job,omitempty"` + Task string `json:"task,omitempty"` + Meta string `json:"meta,omitempty"` + Data string `json:"data,omitempty"` + Status pkg.RequestStatus `json:"status,omitempty"` + StartTime utils.Timestamp `json:"start_time,omitempty"` + FinishTime utils.Timestamp `json:"finish_time,omitempty"` + UpdateTime utils.Timestamp `json:"update_time,omitempty"` + StatusList *queue.PriorityQueue `json:"status_list,omitempty"` + StopReason string `json:"stop_reason,omitempty"` +} + +func (r *Request) WithId(id string) *Request { + r.Id = id + return r +} +func (r *Request) WithUniqueKey(uniqueKey string) *Request { + r.UniqueKey = uniqueKey + return r +} +func (r *Request) WithNode(node string) *Request { + r.Node = node + return r +} +func (r *Request) WithSpider(spider string) *Request { + r.Spider = spider + return r +} +func (r *Request) WithJob(job string) *Request { + r.Job = job + return r +} +func (r *Request) WithTask(task string) *Request { + r.Task = task + return r +} +func (r *Request) WithMeta(meta string) *Request { + r.Meta = meta + return r +} +func (r *Request) WithData(data string) *Request { + r.Data = data + return r +} +func (r *Request) GetStatus() pkg.RequestStatus { + return r.Status +} +func (r *Request) WithStatus(status pkg.RequestStatus) *Request { + r.Status = status + return r +} +func (r *Request) WithStartTime(t time.Time) *Request { + r.StartTime = utils.Timestamp{ + Time: t, + } + return r +} +func (r *Request) WithFinishTime(t time.Time) *Request { + r.FinishTime = utils.Timestamp{ + Time: t, + } + return r +} +func (r *Request) WithUpdateTime(t time.Time) *Request { + r.UpdateTime = utils.Timestamp{ + Time: t, + } + return r +} +func (r *Request) AddStatusList(status pkg.SpiderStatus, t time.Time) *Request { + if r.StatusList == nil { + r.StatusList = queue.NewPriorityQueue(10) + } + r.StatusList.Push(queue.NewItem(status, t.UnixNano())) + return r +} +func (r *Request) WithStopReason(stopReason string) *Request { + r.StopReason = stopReason + return r +} +func (r *Request) Marshal() (bytes []byte, err error) { + bytes, err = json.Marshal(r) + if err != nil { + return + } + return +} diff --git a/pkg/statistics/spider/spider.go b/pkg/statistics/spider/spider.go index f829c13..830ec00 100644 --- a/pkg/statistics/spider/spider.go +++ b/pkg/statistics/spider/spider.go @@ -27,6 +27,7 @@ type Spider struct { LastTaskFinishTime utils.Timestamp `json:"last_task_finish_time,omitempty"` UpdateTime utils.Timestamp `json:"update_time,omitempty"` StatusList *queue.PriorityQueue `json:"status_list,omitempty"` + StopReason string `json:"stop_reason,omitempty"` } func (s *Spider) WithStatusAndTime(status pkg.SpiderStatus, t time.Time) pkg.StatisticsSpider { diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index 2ac65a9..e45d05d 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -7,6 +7,7 @@ import ( "github.com/lizongying/go-crawler/pkg/statistics/job" "github.com/lizongying/go-crawler/pkg/statistics/node" "github.com/lizongying/go-crawler/pkg/statistics/record" + statisticsRequest "github.com/lizongying/go-crawler/pkg/statistics/request" statisticsSpider "github.com/lizongying/go-crawler/pkg/statistics/spider" "github.com/lizongying/go-crawler/pkg/statistics/task" "github.com/lizongying/go-crawler/pkg/utils" @@ -48,6 +49,14 @@ func (s *Statistics) GetTasks() (tasks []pkg.StatisticsTask) { } return } +func (s *Statistics) GetRequests() (records []pkg.StatisticsRequest) { + for _, v := range s.Tasks.Get("") { + for _, v1 := range v.Value().(task.WithRecords).Requests.Get("") { + records = append(records, v1.Value().(pkg.StatisticsRequest)) + } + } + return +} func (s *Statistics) GetRecords() (records []pkg.StatisticsRecord) { for _, v := range s.Tasks.Get("") { for _, v1 := range v.Value().(task.WithRecords).Records.Get("") { @@ -154,7 +163,8 @@ func (s *Statistics) taskChanged(ctx pkg.Context) (err error) { WithJob(ctx.GetJob().GetId()). WithStatus(ctx.GetTask().GetStatus()). WithStartTime(ctx.GetTask().GetStartTime()), - Records: queue.NewGroupQueue(10), + Records: queue.NewGroupQueue(10), + Requests: queue.NewGroupQueue(10), }, ctx.GetTask().GetStartTime().UnixNano()) @@ -178,7 +188,7 @@ func (s *Statistics) taskChanged(ctx pkg.Context) (err error) { t.WithStartTime(ctx.GetTask().GetStartTime()) case pkg.TaskStatusSuccess: t.WithFinishTime(ctx.GetTask().GetStopTime()) - case pkg.TaskStatusError: + case pkg.TaskStatusFailure: t.WithFinishTime(ctx.GetTask().GetStopTime()) t.WithStopReason(ctx.GetTask().GetStopReason()) } @@ -194,40 +204,127 @@ func (s *Statistics) taskChanged(ctx pkg.Context) (err error) { } return } -func (s *Statistics) itemChanged(item pkg.Item) (err error) { +func (s *Statistics) requestChanged(request pkg.Request) (err error) { defer s.mutex.Unlock() s.mutex.Lock() - s.Nodes[item.GetContext().GetCrawler().GetId()].IncRecord() - s.Spiders[item.GetContext().GetSpider().GetName()].IncRecord() - s.Jobs[item.GetContext().GetJob().GetId()].IncRecord() + ctx := request.GetContext() + contextRequest := ctx.GetRequest() + + // task + for _, v := range s.Tasks.Get(ctx.GetJob().GetId()) { + t := v.Value().(task.WithRecords) + if ctx.GetTask().GetId() == t.Task.GetId() { + var sr *statisticsRequest.Request + for _, v1 := range t.Requests.Get(ctx.GetTask().GetId()) { + r := v1.Value().(*statisticsRequest.Request) + if r.Id == contextRequest.GetId() { + sr = r + break + } + } + + if sr == nil { + s.Nodes[ctx.GetCrawler().GetId()].IncRequest() + s.Spiders[ctx.GetSpider().GetName()].IncRequest() + s.Jobs[ctx.GetJob().GetId()].IncRequest() + + // task + t.Task.IncRequest() + + dataRequest, _ := request.Marshal() + + //request + sr = new(statisticsRequest.Request). + WithId(contextRequest.GetId()). + WithUniqueKey(request.GetUniqueKey()). + WithNode(ctx.GetCrawler().GetId()). + WithSpider(ctx.GetSpider().GetName()). + WithJob(ctx.GetJob().GetId()). + WithTask(ctx.GetTask().GetId()). + WithMeta(request.GetExtra()). + WithData(string(dataRequest)) + + t.Requests.Enqueue(ctx.GetTask().GetId(), + sr, + contextRequest.GetUpdateTime().UnixNano()) + } + sr.WithUpdateTime(contextRequest.GetUpdateTime()) + sr.WithStatus(contextRequest.GetStatus()) + switch contextRequest.GetStatus() { + case pkg.ItemStatusPending: + case pkg.ItemStatusRunning: + sr.WithStartTime(contextRequest.GetStartTime()) + case pkg.ItemStatusSuccess: + sr.WithFinishTime(contextRequest.GetStopTime()) + case pkg.ItemStatusFailure: + sr.WithFinishTime(contextRequest.GetStopTime()) + } + break + } + } + return +} +func (s *Statistics) itemChanged(item pkg.Item) (err error) { + defer s.mutex.Unlock() + s.mutex.Lock() ctx := item.GetContext() + contextItem := ctx.GetItem() // task - for _, v := range s.Tasks.Get(item.GetContext().GetJob().GetId()) { + for _, v := range s.Tasks.Get(ctx.GetJob().GetId()) { t := v.Value().(task.WithRecords) if ctx.GetTask().GetId() == t.Task.GetId() { - // task - t.Task.IncRecord() - - //record - id := item.Id() - if id == nil { - id = item.UniqueKey() + var statisticsRecord *record.Record + for _, v1 := range t.Records.Get(ctx.GetTask().GetId()) { + r := v1.Value().(*record.Record) + if r.Id == contextItem.GetId() { + statisticsRecord = r + break + } } - t.Records.Enqueue(ctx.GetTask().GetId(), - new(record.Record). - WithId(ctx.GetItem().GetId()). + + if statisticsRecord == nil { + s.Nodes[ctx.GetCrawler().GetId()].IncRecord() + s.Spiders[ctx.GetSpider().GetName()].IncRecord() + s.Jobs[ctx.GetJob().GetId()].IncRecord() + + // task + t.Task.IncRecord() + + //record + id := item.Id() + if id == nil { + id = item.UniqueKey() + } + + statisticsRecord = new(record.Record). + WithId(contextItem.GetId()). WithUniqueKey(fmt.Sprintf("%v", id)). - WithSaveTime(ctx.GetItem().GetStopTime()). WithNode(ctx.GetCrawler().GetId()). WithSpider(ctx.GetSpider().GetName()). WithJob(ctx.GetJob().GetId()). WithTask(ctx.GetTask().GetId()). WithMeta(item.MetaJson()). - WithData(item.DataJson()), - ctx.GetItem().GetStopTime().UnixNano()) + WithData(item.DataJson()) + + t.Records.Enqueue(ctx.GetTask().GetId(), + statisticsRecord, + contextItem.GetUpdateTime().UnixNano()) + } + statisticsRecord.WithUpdateTime(contextItem.GetUpdateTime()) + statisticsRecord.WithStatus(contextItem.GetStatus()) + switch contextItem.GetStatus() { + case pkg.ItemStatusPending: + case pkg.ItemStatusRunning: + statisticsRecord.WithStartTime(contextItem.GetStartTime()) + case pkg.ItemStatusSuccess: + statisticsRecord.WithFinishTime(contextItem.GetStopTime()) + case pkg.ItemStatusFailure: + statisticsRecord.WithFinishTime(contextItem.GetStopTime()) + } + break } } return @@ -250,6 +347,7 @@ func (s *Statistics) FromCrawler(crawler pkg.Crawler) pkg.Statistics { signal.RegisterSpiderChanged(s.spiderChanged) signal.RegisterJobChanged(s.jobChanged) signal.RegisterTaskChanged(s.taskChanged) + signal.RegisterRequestChanged(s.requestChanged) signal.RegisterItemChanged(s.itemChanged) return s diff --git a/pkg/statistics/task/task.go b/pkg/statistics/task/task.go index 6a2230d..e34af66 100644 --- a/pkg/statistics/task/task.go +++ b/pkg/statistics/task/task.go @@ -10,8 +10,9 @@ import ( ) type WithRecords struct { - Task pkg.StatisticsTask - Records *queue.GroupQueue + Task pkg.StatisticsTask + Requests *queue.GroupQueue + Records *queue.GroupQueue } type Task struct { diff --git a/pkg/task.go b/pkg/task.go index ffa5b6a..83fdbe3 100644 --- a/pkg/task.go +++ b/pkg/task.go @@ -4,12 +4,12 @@ type Task interface { Scheduler GetScheduler() Scheduler WithScheduler(Scheduler) Task - ReadyRequest() - StartRequest() - StopRequest() - ReadyItem() - StartItem() - StopItem() + RequestPending(ctx Context, err error) + RequestRunning(ctx Context, err error) + RequestStopped(ctx Context, err error) + ItemPending(ctx Context, err error) + ItemRunning(ctx Context, err error) + ItemStopped(ctx Context, err error) } type TaskStatus uint8 @@ -19,11 +19,11 @@ const ( TaskStatusPending TaskStatusRunning TaskStatusSuccess - TaskStatusError + TaskStatusFailure ) -func (s *TaskStatus) String() string { - switch *s { +func (s TaskStatus) String() string { + switch s { case 1: return "pending" case 2: @@ -31,7 +31,7 @@ func (s *TaskStatus) String() string { case 3: return "success" case 4: - return "error" + return "failure" default: return "unknown" } diff --git a/web/ui/src/App.vue b/web/ui/src/App.vue index 09623e4..93b3110 100644 --- a/web/ui/src/App.vue +++ b/web/ui/src/App.vue @@ -136,9 +136,12 @@ router.beforeEach((to, from, next) => { case 'tasks': state.selectedKeys = ['5'] break - case 'records': + case 'requests': state.selectedKeys = ['6'] break + case 'records': + state.selectedKeys = ['7'] + break default: state.selectedKeys = [] } @@ -186,6 +189,12 @@ const items = reactive([ { key: '6', icon: () => h(DatabaseOutlined), + label: Requests, + title: 'Requests', + }, + { + key: '7', + icon: () => h(DatabaseOutlined), label: Records, title: 'Records', } diff --git a/web/ui/src/requests/api.js b/web/ui/src/requests/api.js index 3c640bb..22fb979 100644 --- a/web/ui/src/requests/api.js +++ b/web/ui/src/requests/api.js @@ -56,6 +56,11 @@ const getTasks = async data => { return axios.post(host + '/tasks', data, config); }; +const getRequests = async data => { + const {host, config} = await api() + return axios.post(host + '/requests', data, config); +}; + const getRecords = async data => { const {host, config} = await api() return axios.post(host + '/records', data, config); @@ -66,4 +71,4 @@ const getSpider = async data => { return axios.post(host + '/spider', data, config); }; -export {getUser, getNodes, getSpiders, getJobs, runJob, rerunJob, stopJob, getTasks, getRecords, getSpider} \ No newline at end of file +export {getUser, getNodes, getSpiders, getJobs, runJob, rerunJob, stopJob, getTasks, getRequests, getRecords, getSpider} \ No newline at end of file diff --git a/web/ui/src/router/index.js b/web/ui/src/router/index.js index df17a59..e14eaa2 100644 --- a/web/ui/src/router/index.js +++ b/web/ui/src/router/index.js @@ -28,6 +28,11 @@ const router = createRouter({ name: 'tasks', component: () => import('@/views/TasksView.vue') }, + { + path: '/requests', + name: 'requests', + component: () => import('@/views/RequestsView.vue') + }, { path: '/records', name: 'records', diff --git a/web/ui/src/stores/requests.js b/web/ui/src/stores/requests.js new file mode 100644 index 0000000..82f78e8 --- /dev/null +++ b/web/ui/src/stores/requests.js @@ -0,0 +1,25 @@ +import {defineStore} from 'pinia' +import {computed, reactive} from 'vue'; +import {getRequests} from "@/requests/api"; + +export const useRequestsStore = defineStore('requests', () => { + const requests = reactive([]) + + const GetRequests = () => { + getRequests().then(resp => { + if (resp.data.data === null) { + requests.splice(0, requests.length) + return + } + requests.splice(0, requests.length, ...resp.data.data) + }).catch(e => { + console.log(e); + }) + } + + const Count = computed(() => { + return requests.length + }) + + return {requests, GetRequests, Count} +}) \ No newline at end of file diff --git a/web/ui/src/views/RecordsView.vue b/web/ui/src/views/RecordsView.vue index ecfbb67..da87022 100644 --- a/web/ui/src/views/RecordsView.vue +++ b/web/ui/src/views/RecordsView.vue @@ -243,11 +243,6 @@ const more = reactive({}) const showDrawer = record => { open.value = true; more.data = JSON.stringify(JSON.parse(record.data), null, 2) - .replace(/(".*?":)/g, '$1') - .replace(/: "([^"]+)"[\n,]/g, ': "$1":') - .replace(/: \b(\d+)\b[\n,]/g, ': $1') - .replace(/: \b(true|false)\b[\n,]/g, ': $1') - .replace(/: \b(null)\b[\n,]/g, ': $1'); }; const activeKey = ref('1'); diff --git a/web/ui/src/views/RequestsView.vue b/web/ui/src/views/RequestsView.vue new file mode 100644 index 0000000..2642191 --- /dev/null +++ b/web/ui/src/views/RequestsView.vue @@ -0,0 +1,370 @@ + + +