From 290c80e4ff6d6a641c0168a52f0de958f0f41efc Mon Sep 17 00:00:00 2001 From: Li ZongYing Date: Wed, 22 Nov 2023 13:38:58 +0800 Subject: [PATCH] fix run is blocked bug --- README_CN.md | 3 +- .../spiders/test_must_ok_spider/spider.go | 2 +- pkg/config/config.go | 13 ++--- pkg/crawler/crawler.go | 4 +- pkg/downloader/downloader.go | 5 +- pkg/middlewares/retry.go | 11 ++-- pkg/pipelines/dump.go | 2 +- pkg/referrer.go | 28 +++++++++-- pkg/scheduler/kafka/requests.go | 25 ++++------ pkg/scheduler/memory/requests.go | 26 +++++----- pkg/scheduler/redis/requests.go | 27 ++++------ pkg/scheduler/unimplemented_scheduler.go | 11 ++-- pkg/spider/base_spider.go | 22 ++++---- pkg/spider/job.go | 8 +-- pkg/spider/task.go | 50 ++++++++++--------- pkg/state.go | 9 +++- pkg/task.go | 12 ++--- 17 files changed, 130 insertions(+), 128 deletions(-) diff --git a/README_CN.md b/README_CN.md index 41f5b12..79f66f5 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1110,10 +1110,9 @@ curl https://github.com/lizongying/go-crawler -x http://localhost:8082 --cacert * panic stop * extra速率限制 * request context -* item context * total -* requests * log +* status ```shell go get -u github.com/lizongying/go-css@latest diff --git a/internal/spiders/test_must_ok_spider/spider.go b/internal/spiders/test_must_ok_spider/spider.go index 7691872..7ef4c80 100644 --- a/internal/spiders/test_must_ok_spider/spider.go +++ b/internal/spiders/test_must_ok_spider/spider.go @@ -23,7 +23,7 @@ func (s *Spider) ParseOk(ctx pkg.Context, response pkg.Response) (err error) { })) if extra.Count > 0 { - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 5) s.logger.Info("manual stop") return } diff --git a/pkg/config/config.go b/pkg/config/config.go index 9ed878c..a570fe9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -145,7 +145,7 @@ type Config struct { EnableJa3 *bool `yaml:"enable_ja3,omitempty" json:"enable_ja3"` EnablePriorityQueue *bool `yaml:"enable_priority_queue,omitempty" json:"enable_priority_queue"` EnableReferrerMiddleware *bool `yaml:"enable_referrer_middleware,omitempty" json:"enable_referrer_middleware"` - ReferrerPolicy *string `yaml:"referrer_policy_middleware,omitempty" json:"referrer_policy_middleware"` + ReferrerPolicy *string `yaml:"referrer_policy,omitempty" json:"referrer_policy"` EnableHttpAuthMiddleware *bool `yaml:"enable_http_auth_middleware,omitempty" json:"enable_http_auth_middleware"` EnableCookieMiddleware *bool `yaml:"enable_cookie_middleware,omitempty" json:"enable_cookie_middleware"` EnableStatsMiddleware *bool `yaml:"enable_stats_middleware,omitempty" json:"enable_stats_middleware"` @@ -318,18 +318,11 @@ func (c *Config) GetEnablePriorityQueue() bool { } func (c *Config) GetReferrerPolicy() pkg.ReferrerPolicy { if c.ReferrerPolicy == nil { - referrerPolicy := string(pkg.DefaultReferrerPolicy) + referrerPolicy := pkg.DefaultReferrerPolicy.String() c.ReferrerPolicy = &referrerPolicy } if *c.ReferrerPolicy != "" { - switch pkg.ReferrerPolicy(*c.ReferrerPolicy) { - case pkg.DefaultReferrerPolicy: - return pkg.DefaultReferrerPolicy - case pkg.NoReferrerPolicy: - return pkg.NoReferrerPolicy - default: - return pkg.DefaultReferrerPolicy - } + return pkg.ReferrerPolicyFromString(*c.ReferrerPolicy) } return pkg.DefaultReferrerPolicy diff --git a/pkg/crawler/crawler.go b/pkg/crawler/crawler.go index f63c1cf..8e6c98c 100644 --- a/pkg/crawler/crawler.go +++ b/pkg/crawler/crawler.go @@ -255,7 +255,7 @@ func (c *Crawler) Start(ctx context.Context) (err error) { c.context.GetCrawler().WithStatus(pkg.CrawlerStatusRunning) c.Signal.CrawlerChanged(c.context) - c.logger.Info("crawler is running", c.context.GetCrawler().GetId()) + c.logger.Info("crawler is running. id:", c.context.GetCrawler().GetId()) c.logger.Info("referrerPolicy", c.config.GetReferrerPolicy()) c.logger.Info("urlLengthLimit", c.config.GetUrlLengthLimit()) @@ -327,7 +327,7 @@ func (c *Crawler) SpiderStopped(_ pkg.Context, _ error) { } func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logger pkg.Logger, mongoDb *mongo.Database, mysql *sql.DB, redis *redis.Client, kafka *kafka.Writer, kafkaReader *kafka.Reader, sqlite pkg.Sqlite, store pkg.Store, mockServer pkg.MockServer, httpApi *api.Api) (crawler pkg.Crawler, err error) { - spider := pkg.NewState() + spider := pkg.NewState("spider") spider.RegisterIsReadyAndIsZero(func() { _ = crawler.Stop(crawler.GetContext()) }) diff --git a/pkg/downloader/downloader.go b/pkg/downloader/downloader.go index b96a471..dbbc981 100644 --- a/pkg/downloader/downloader.go +++ b/pkg/downloader/downloader.go @@ -63,12 +63,11 @@ func (d *Downloader) Download(ctx pkg.Context, request pkg.Request) (response pk response.SetRequest(request) } - err = d.processResponse(ctx, response) - if err != nil { - d.logger.Error(err) + if err = d.processResponse(ctx, response); err != nil { if errors.Is(err, pkg.ErrNeedRetry) { return d.Download(ctx, request) } + d.logger.Error(err) return } diff --git a/pkg/middlewares/retry.go b/pkg/middlewares/retry.go index 0353a5b..be1ebc7 100644 --- a/pkg/middlewares/retry.go +++ b/pkg/middlewares/retry.go @@ -34,28 +34,27 @@ func (m *RetryMiddleware) ProcessResponse(_ pkg.Context, response pkg.Response) if len(request.GetOkHttpCodes()) > 0 { okHttpCodes = request.GetOkHttpCodes() } - if retryMaxTimes > 0 && response.GetResponse() == nil { + + if response.GetResponse() == nil { if request.GetRetryTimes() < retryMaxTimes { request.SetRetryTimes(request.GetRetryTimes() + 1) - m.logger.Info(request.GetUniqueKey(), "retry times:", request.GetRetryTimes(), "SpendTime:", request.GetSpendTime()) + m.logger.Infof("retry times: %d/%d, response nil, SpendTime: %v, UniqueKey: %s\n", request.GetRetryTimes(), retryMaxTimes, request.GetSpendTime(), request.GetUniqueKey()) err = pkg.ErrNeedRetry return } err = fmt.Errorf("response nil") - m.logger.Error(request.GetUniqueKey(), err, request.GetRetryTimes(), retryMaxTimes) return } - if retryMaxTimes > 0 && !utils.InSlice(response.StatusCode(), okHttpCodes) { + if !utils.InSlice(response.StatusCode(), okHttpCodes) { if request.GetRetryTimes() < retryMaxTimes { request.SetRetryTimes(request.GetRetryTimes() + 1) - m.logger.Info(request.GetUniqueKey(), "retry times:", request.GetRetryTimes(), "SpendTime:", request.GetSpendTime()) + m.logger.Infof("retry times: %d/%d, status code: %d, SpendTime: %v, UniqueKey: %s\n", request.GetRetryTimes(), retryMaxTimes, response.StatusCode(), request.GetSpendTime(), request.GetUniqueKey()) err = pkg.ErrNeedRetry return } err = fmt.Errorf("status code error: %d", response.StatusCode()) - m.logger.Error(request.GetUniqueKey(), err, request.GetRetryTimes(), retryMaxTimes) return } diff --git a/pkg/pipelines/dump.go b/pkg/pipelines/dump.go index bdbae7a..4e5ec02 100644 --- a/pkg/pipelines/dump.go +++ b/pkg/pipelines/dump.go @@ -28,7 +28,7 @@ func (m *DumpPipeline) ProcessItem(item pkg.Item) (err error) { m.logger.Debug("Data", utils.JsonStr(data)) m.logger.Debug("referrer", item.Referrer()) - m.logger.Info(m.Spider().Name(), item.GetContext().GetTask().GetId(), "item.Data:", utils.JsonStr(data)) + m.logger.Debug(m.Spider().Name(), item.GetContext().GetTask().GetId(), "item.Data:", utils.JsonStr(data)) return } diff --git a/pkg/referrer.go b/pkg/referrer.go index afb3e67..fb95ec4 100644 --- a/pkg/referrer.go +++ b/pkg/referrer.go @@ -1,8 +1,30 @@ package pkg -type ReferrerPolicy string +import "strings" + +type ReferrerPolicy uint8 const ( - DefaultReferrerPolicy ReferrerPolicy = "DefaultReferrerPolicy" - NoReferrerPolicy ReferrerPolicy = "NoReferrerPolicy" + DefaultReferrerPolicy ReferrerPolicy = iota + NoReferrerPolicy ) + +func (r ReferrerPolicy) String() string { + switch r { + case NoReferrerPolicy: + return "NoReferrerPolicy" + default: + return "DefaultReferrerPolicy" + } +} + +func ReferrerPolicyFromString(referrerPolicy string) ReferrerPolicy { + switch strings.ToLower(referrerPolicy) { + case "1": + return NoReferrerPolicy + case "NoReferrerPolicy": + return NoReferrerPolicy + default: + return DefaultReferrerPolicy + } +} diff --git a/pkg/scheduler/kafka/requests.go b/pkg/scheduler/kafka/requests.go index 17fc10d..5ae6db0 100644 --- a/pkg/scheduler/kafka/requests.go +++ b/pkg/scheduler/kafka/requests.go @@ -12,7 +12,6 @@ import ( "golang.org/x/time/rate" "net/http" "reflect" - "runtime" "strings" "time" ) @@ -76,39 +75,37 @@ out: } ctx.GetRequest().WithStatus(pkg.RequestStatusRunning) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestRunning(ctx, nil) go func(c pkg.Context, request pkg.Request) { var response pkg.Response response, err = s.Request(c, request) if err != nil { ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, err) + s.task.RequestOut() return } go func(ctx pkg.Context, response pkg.Response) { defer func() { if r := recover(); r != nil { - buf := make([]byte, 1<<16) - runtime.Stack(buf, true) - err = errors.New(string(buf)) - s.logger.Error(err) + s.logger.Error(r) + err = errors.New("panic") s.HandleError(ctx, response, err, request.GetErrBack()) } + s.task.MethodOut() + s.task.RequestOut() }() + s.task.MethodIn() if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil { s.logger.Error(err) s.HandleError(ctx, response, err, request.GetErrBack()) ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, err) return } ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, nil) }(c, response) }(ctx, request) } @@ -151,7 +148,6 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro if err != nil { s.logger.Error(err) s.crawler.GetSignal().RequestChanged(request) - ctx.GetTask().RequestPending(ctx, err) return } @@ -163,12 +159,11 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro }); err != nil { s.logger.Error(err) s.crawler.GetSignal().RequestChanged(request) - ctx.GetTask().RequestPending(ctx, err) return } s.crawler.GetSignal().RequestChanged(request) - ctx.GetTask().RequestPending(ctx, nil) + ctx.GetTask().RequestIn() return } @@ -184,7 +179,6 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { bs, err := json.Marshal(extra) if err != nil { s.logger.Error(err) - c.GetTask().RequestPending(c, err) return } @@ -207,17 +201,16 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { Value: bs, }); err != nil { s.logger.Error(err) - c.GetTask().RequestPending(c, err) return } - c.GetTask().RequestPending(c, nil) + c.GetTask().RequestIn() return } func (s *Scheduler) GetExtra(c pkg.Context, extra any) (err error) { defer func() { - s.task.RequestStopped(c, nil) + s.task.RequestOut() }() extraValue := reflect.ValueOf(extra) diff --git a/pkg/scheduler/memory/requests.go b/pkg/scheduler/memory/requests.go index b5fac23..2206912 100644 --- a/pkg/scheduler/memory/requests.go +++ b/pkg/scheduler/memory/requests.go @@ -48,7 +48,6 @@ out: } ctx.GetRequest().WithStatus(pkg.RequestStatusRunning) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestRunning(ctx, nil) go func(request pkg.Request) { c := request.GetContext() var err error @@ -58,32 +57,31 @@ out: if err != nil { ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, err) + s.task.RequestOut() return } go func(ctx pkg.Context, response pkg.Response) { defer func() { - //if r := recover(); r != nil { - // buf := make([]byte, 1<<16) - // runtime.Stack(buf, true) - // err = errors.New(string(buf)) - // //s.logger.Error(err) - // s.HandleError(ctx, response, err, request.GetErrBack()) - //} + if r := recover(); r != nil { + s.logger.Error(r) + err = errors.New("panic") + s.HandleError(ctx, response, err, request.GetErrBack()) + } + s.task.MethodOut() + s.task.RequestOut() }() + s.task.MethodIn() if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil { s.logger.Error(err) s.HandleError(ctx, response, err, request.GetErrBack()) ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, err) return } ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, nil) }(c, response) }(request) } @@ -129,8 +127,8 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro request.WithContext(ctx) s.crawler.GetSignal().RequestChanged(request) - ctx.GetTask().RequestPending(ctx, nil) s.requestChan <- request + ctx.GetTask().RequestIn() return } @@ -151,13 +149,13 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { extraChan.(chan any) <- extra } - c.GetTask().RequestPending(c, nil) + c.GetTask().RequestIn() return } func (s *Scheduler) GetExtra(ctx pkg.Context, extra any) (err error) { defer func() { - s.task.RequestStopped(ctx, nil) + s.task.RequestOut() }() extraValue := reflect.ValueOf(extra) diff --git a/pkg/scheduler/redis/requests.go b/pkg/scheduler/redis/requests.go index 3b682c9..7eb9f63 100644 --- a/pkg/scheduler/redis/requests.go +++ b/pkg/scheduler/redis/requests.go @@ -12,7 +12,6 @@ import ( "golang.org/x/time/rate" "net/http" "reflect" - "runtime" "time" ) @@ -111,14 +110,13 @@ out: } ctx.GetRequest().WithStatus(pkg.RequestStatusRunning) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestRunning(ctx, nil) go func(c pkg.Context, request pkg.Request) { var response pkg.Response response, err = s.Request(c, request) if err != nil { ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, err) + s.task.RequestOut() return } @@ -126,25 +124,23 @@ out: defer func() { if r := recover(); r != nil { s.logger.Error(r) - buf := make([]byte, 1<<16) - runtime.Stack(buf, true) - err = errors.New(string(buf)) - //s.logger.Error(err) + err = errors.New("panic") s.HandleError(ctx, response, err, request.GetErrBack()) } + s.task.MethodOut() + s.task.RequestOut() }() + s.task.MethodIn() if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil { s.logger.Error(err) s.HandleError(ctx, response, err, request.GetErrBack()) ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error()) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, err) return } ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess) s.crawler.GetSignal().RequestChanged(request) - s.task.RequestStopped(ctx, nil) }(c, response) }(ctx, request) } @@ -221,12 +217,12 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro res, err = s.redis.ZAdd(c, s.requestKey, z).Result() if res == 1 { s.crawler.GetSignal().RequestChanged(request) - ctx.GetTask().RequestPending(ctx, err) + ctx.GetTask().RequestIn() } } else { err = s.redis.RPush(c, s.requestKey, bs).Err() s.crawler.GetSignal().RequestChanged(request) - ctx.GetTask().RequestPending(ctx, err) + ctx.GetTask().RequestIn() } return } @@ -245,7 +241,6 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { bs, err := json.Marshal(extra) if err != nil { s.logger.Error(err) - c.GetTask().RequestPending(c, err) return } @@ -263,22 +258,20 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { res, err = s.redis.ZAdd(ctx, extraKey, z).Result() if err != nil { s.logger.Error(err) - c.GetTask().RequestPending(c, err) return } if res == 1 { - c.GetTask().RequestRunning(c, nil) + c.GetTask().RequestIn() } } else { extraKey := fmt.Sprintf("%s:%s:extra:%s", s.config.GetBotName(), name, spider.Name()) if err = s.redis.RPush(ctx, extraKey, bs).Err(); err != nil { s.logger.Error(err) - c.GetTask().RequestPending(c, err) return } - c.GetTask().RequestPending(c, nil) + c.GetTask().RequestIn() } return @@ -286,7 +279,7 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) { func (s *Scheduler) GetExtra(c pkg.Context, extra any) (err error) { defer func() { - s.task.RequestStopped(c, nil) + s.task.RequestOut() }() extraValue := reflect.ValueOf(extra) diff --git a/pkg/scheduler/unimplemented_scheduler.go b/pkg/scheduler/unimplemented_scheduler.go index c746a86..59a8910 100644 --- a/pkg/scheduler/unimplemented_scheduler.go +++ b/pkg/scheduler/unimplemented_scheduler.go @@ -47,7 +47,6 @@ out: WithStatus(pkg.ItemStatusFailure). WithStopReason(err.Error()) s.crawler.GetSignal().ItemChanged(item) - s.task.ItemPending(item.GetContext(), nil) break out default: itemDelay := s.crawler.GetItemDelay() @@ -60,6 +59,7 @@ out: go func(item pkg.Item) { defer func() { s.crawler.ItemConcurrencyChan() <- struct{}{} + s.task.ItemOut() }() contextItem := item.GetContext().GetItem() @@ -67,7 +67,6 @@ out: contextItem. WithStatus(pkg.ItemStatusRunning) s.crawler.GetSignal().ItemChanged(item) - s.task.ItemRunning(item.GetContext(), nil) err := s.spider.Export(item) if err != nil { @@ -80,7 +79,6 @@ out: WithStatus(pkg.ItemStatusSuccess) } s.crawler.GetSignal().ItemChanged(item) - s.task.ItemStopped(item.GetContext(), err) }(item) if itemDelay > 0 { @@ -130,8 +128,8 @@ func (s *UnimplementedScheduler) YieldItem(ctx pkg.Context, item pkg.Item) (err item.WithContext(c) s.crawler.GetSignal().ItemChanged(item) - s.task.ItemPending(c, nil) s.itemChan <- item + s.task.ItemIn() return } func (s *UnimplementedScheduler) HandleError(ctx pkg.Context, response pkg.Response, err error, errBackName string) { @@ -174,9 +172,8 @@ func (s *UnimplementedScheduler) SyncRequest(ctx pkg.Context, request pkg.Reques return } func (s *UnimplementedScheduler) Request(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) { - ctx.GetTask().RequestPending(ctx, nil) - ctx.GetTask().RequestRunning(ctx, nil) + ctx.GetTask().RequestIn() response, err = s.SyncRequest(ctx, request) - s.task.RequestStopped(ctx, err) + s.task.RequestOut() return } diff --git a/pkg/spider/base_spider.go b/pkg/spider/base_spider.go index 5ea564e..04dd3e9 100644 --- a/pkg/spider/base_spider.go +++ b/pkg/spider/base_spider.go @@ -365,24 +365,26 @@ func (s *BaseSpider) Start(c pkg.Context) (err error) { s.context.GetSpider().WithStatus(pkg.SpiderStatusRunning) s.Crawler.GetSignal().SpiderChanged(s.GetContext()) - s.logger.Info("spiderName", s.context.GetSpider().GetName()) - s.logger.Info("allowedDomains", s.GetAllowedDomains()) - s.logger.Info("okHttpCodes", s.OkHttpCodes()) - s.logger.Info("platforms", s.GetPlatforms()) - s.logger.Info("browsers", s.GetBrowsers()) - s.logger.Info("retryMaxTimes", s.retryMaxTimes) - s.logger.Info("redirectMaxTimes", s.redirectMaxTimes) + s.logger.Info("spiderName:", s.context.GetSpider().GetName()) + s.logger.Info("allowedDomains:", s.GetAllowedDomains()) + s.logger.Info("okHttpCodes:", s.OkHttpCodes()) + s.logger.Info("platforms:", s.GetPlatforms()) + s.logger.Info("browsers:", s.GetBrowsers()) + s.logger.Info("retryMaxTimes:", s.retryMaxTimes) + s.logger.Info("redirectMaxTimes:", s.redirectMaxTimes) //s.logger.Info("filter", s.GetFilter()) - s.logger.Info("pipelines", s.PipelineNames()) + for _, v := range s.Pipelines() { + s.logger.Info(v.Name(), "loaded. order:", v.Order()) + } for _, v := range s.GetMiddlewares().Middlewares() { if err = v.Start(ctx, s.spider); err != nil { s.logger.Error(err) return } - s.logger.Info(v.Name(), "loaded") + s.logger.Info(v.Name(), "loaded. order:", v.Order()) } return } @@ -588,7 +590,7 @@ func NewBaseSpider(logger pkg.Logger) (pkg.Spider, error) { jobs: make(map[string]*Job), } - s.job = pkg.NewState() + s.job = pkg.NewState("job") s.job.RegisterIsReadyAndIsZero(func() { _ = s.Stop(s.GetContext()) }) diff --git a/pkg/spider/job.go b/pkg/spider/job.go index 7a68acb..fcadede 100644 --- a/pkg/spider/job.go +++ b/pkg/spider/job.go @@ -138,8 +138,8 @@ func (j *Job) run(ctx context.Context) (err error) { MustEverySpec(j.context.GetJob().GetSpec()). Callback(func() { if j.context.GetJob().GetOnlyOneTask() { - <-j.cronJob - if _, ok := <-j.cronJob; ok { + if _, ok := <-j.cronJob; !ok { + // closed return } } @@ -153,7 +153,7 @@ func (j *Job) run(ctx context.Context) (err error) { } func (j *Job) stop(err error) { - if !utils.InSlice(j.context.GetJob().GetStatus(), []pkg.JobStatus{ + if utils.InSlice(j.context.GetJob().GetStatus(), []pkg.JobStatus{ pkg.JobStatusSuccess, pkg.JobStatusFailure, }) { @@ -218,7 +218,7 @@ func (j *Job) TaskStopped(ctx pkg.Context, err error) { func (j *Job) FromSpider(spider pkg.Spider) *Job { *j = Job{ - task: pkg.NewState(), + task: pkg.NewState("task"), crawler: spider.GetCrawler(), spider: spider, logger: spider.GetLogger(), diff --git a/pkg/spider/task.go b/pkg/spider/task.go index f2349d7..0e16cc2 100644 --- a/pkg/spider/task.go +++ b/pkg/spider/task.go @@ -15,6 +15,7 @@ import ( type Task struct { context pkg.Context + method *pkg.State request *pkg.State item *pkg.State requestAndItem *pkg.MultiState @@ -56,8 +57,6 @@ func (t *Task) start(ctx pkg.Context) (id string, err error) { WithStartTime(time.Now()). WithStats(new(stats.MediaStats)))) t.crawler.GetSignal().TaskChanged(t.context) - - t.logger.Info(t.spider.Name(), id, "task started") } if err = t.StartScheduler(t.context); err != nil { @@ -76,13 +75,14 @@ func (t *Task) start(ctx pkg.Context) (id string, err error) { }() go func() { - defer func() { - //if r := recover(); r != nil { - // s.logger.Error(r) - //} + if r := recover(); r != nil { + t.logger.Error(r) + } + t.MethodOut() }() + t.MethodIn() params := []reflect.Value{ reflect.ValueOf(t.context), reflect.ValueOf(t.context.GetJob().GetArgs()), @@ -129,30 +129,31 @@ func (t *Task) stop(err error) { t.job.TaskStopped(t.context, err) return } -func (t *Task) RequestPending(_ pkg.Context, _ error) { - t.request.BeReady() +func (t *Task) MethodIn() { + if !t.method.IsReady() { + t.method.BeReady() + } + t.method.In() } -func (t *Task) RequestRunning(_ pkg.Context, err error) { - if err != nil { - return +func (t *Task) MethodOut() { + t.method.Out() +} +func (t *Task) RequestIn() { + if !t.request.IsReady() { + t.request.BeReady() } t.request.In() } -func (t *Task) RequestStopped(_ pkg.Context, _ error) { +func (t *Task) RequestOut() { t.request.Out() } -func (t *Task) ItemPending(_ pkg.Context, _ error) { - t.item.BeReady() -} -func (t *Task) ItemRunning(ctx pkg.Context, err error) { - if err != nil { - return +func (t *Task) ItemIn() { + if !t.item.IsReady() { + t.item.BeReady() } t.item.In() } -func (t *Task) ItemStopped(_ pkg.Context, _ error) { - //item := ctx.GetItem() - //item.WithContext() +func (t *Task) ItemOut() { t.item.Out() } func (t *Task) WithJob(job *Job) *Task { @@ -164,11 +165,12 @@ func (t *Task) FromSpider(spider pkg.Spider) *Task { crawler: spider.GetCrawler(), spider: spider, logger: spider.GetLogger(), - request: pkg.NewState(), - item: pkg.NewState(), + method: pkg.NewState("method"), + request: pkg.NewState("request"), + item: pkg.NewState("item"), } - t.requestAndItem = pkg.NewMultiState(t.request, t.item) + t.requestAndItem = pkg.NewMultiState(t.request, t.item, t.method) t.requestAndItem.RegisterIsReadyAndIsZero(func() { t.stop(nil) diff --git a/pkg/state.go b/pkg/state.go index e1738fc..6278538 100644 --- a/pkg/state.go +++ b/pkg/state.go @@ -6,6 +6,7 @@ import ( // State represents a state with ready and count properties. type State struct { + name string isReady bool count atomic.Uint32 fnIsReady []func() @@ -14,8 +15,8 @@ type State struct { } // NewState creates a new State instance. -func NewState() *State { - return &State{} +func NewState(name string) *State { + return &State{name: name} } // RegisterIsReady registers functions to be called when the state is ready. @@ -85,6 +86,10 @@ func (s *State) Clear() { s.count.Store(0) } +func (s *State) Count() uint32 { + return s.count.Load() +} + // MultiState represents a collection of states. type MultiState struct { states []*State diff --git a/pkg/task.go b/pkg/task.go index 01d3f14..4e716b5 100644 --- a/pkg/task.go +++ b/pkg/task.go @@ -4,12 +4,12 @@ type Task interface { Scheduler GetScheduler() Scheduler WithScheduler(Scheduler) Task - RequestPending(ctx Context, err error) - RequestRunning(ctx Context, err error) - RequestStopped(ctx Context, err error) - ItemPending(ctx Context, err error) - ItemRunning(ctx Context, err error) - ItemStopped(ctx Context, err error) + RequestIn() + RequestOut() + ItemIn() + ItemOut() + MethodIn() + MethodOut() } type TaskStatus uint8