Skip to content

Commit

Permalink
fix run is blocked bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lizongying committed Nov 22, 2023
1 parent 91aead8 commit 290c80e
Show file tree
Hide file tree
Showing 17 changed files with 130 additions and 128 deletions.
3 changes: 1 addition & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -1110,10 +1110,9 @@ curl https://github.com/lizongying/go-crawler -x http://localhost:8082 --cacert
* panic stop
* extra速率限制
* request context
* item context
* total
* requests
* log
* status

```shell
go get -u github.com/lizongying/go-css@latest
Expand Down
2 changes: 1 addition & 1 deletion internal/spiders/test_must_ok_spider/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s *Spider) ParseOk(ctx pkg.Context, response pkg.Response) (err error) {
}))

if extra.Count > 0 {
time.Sleep(time.Second * 10)
time.Sleep(time.Second * 5)
s.logger.Info("manual stop")
return
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type Config struct {
EnableJa3 *bool `yaml:"enable_ja3,omitempty" json:"enable_ja3"`
EnablePriorityQueue *bool `yaml:"enable_priority_queue,omitempty" json:"enable_priority_queue"`
EnableReferrerMiddleware *bool `yaml:"enable_referrer_middleware,omitempty" json:"enable_referrer_middleware"`
ReferrerPolicy *string `yaml:"referrer_policy_middleware,omitempty" json:"referrer_policy_middleware"`
ReferrerPolicy *string `yaml:"referrer_policy,omitempty" json:"referrer_policy"`
EnableHttpAuthMiddleware *bool `yaml:"enable_http_auth_middleware,omitempty" json:"enable_http_auth_middleware"`
EnableCookieMiddleware *bool `yaml:"enable_cookie_middleware,omitempty" json:"enable_cookie_middleware"`
EnableStatsMiddleware *bool `yaml:"enable_stats_middleware,omitempty" json:"enable_stats_middleware"`
Expand Down Expand Up @@ -318,18 +318,11 @@ func (c *Config) GetEnablePriorityQueue() bool {
}
func (c *Config) GetReferrerPolicy() pkg.ReferrerPolicy {
if c.ReferrerPolicy == nil {
referrerPolicy := string(pkg.DefaultReferrerPolicy)
referrerPolicy := pkg.DefaultReferrerPolicy.String()
c.ReferrerPolicy = &referrerPolicy
}
if *c.ReferrerPolicy != "" {
switch pkg.ReferrerPolicy(*c.ReferrerPolicy) {
case pkg.DefaultReferrerPolicy:
return pkg.DefaultReferrerPolicy
case pkg.NoReferrerPolicy:
return pkg.NoReferrerPolicy
default:
return pkg.DefaultReferrerPolicy
}
return pkg.ReferrerPolicyFromString(*c.ReferrerPolicy)
}

return pkg.DefaultReferrerPolicy
Expand Down
4 changes: 2 additions & 2 deletions pkg/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (c *Crawler) Start(ctx context.Context) (err error) {
c.context.GetCrawler().WithStatus(pkg.CrawlerStatusRunning)
c.Signal.CrawlerChanged(c.context)

c.logger.Info("crawler is running", c.context.GetCrawler().GetId())
c.logger.Info("crawler is running. id:", c.context.GetCrawler().GetId())
c.logger.Info("referrerPolicy", c.config.GetReferrerPolicy())
c.logger.Info("urlLengthLimit", c.config.GetUrlLengthLimit())

Expand Down Expand Up @@ -327,7 +327,7 @@ func (c *Crawler) SpiderStopped(_ pkg.Context, _ error) {
}

func NewCrawler(spiders []pkg.Spider, cli *cli.Cli, config *config.Config, logger pkg.Logger, mongoDb *mongo.Database, mysql *sql.DB, redis *redis.Client, kafka *kafka.Writer, kafkaReader *kafka.Reader, sqlite pkg.Sqlite, store pkg.Store, mockServer pkg.MockServer, httpApi *api.Api) (crawler pkg.Crawler, err error) {
spider := pkg.NewState()
spider := pkg.NewState("spider")
spider.RegisterIsReadyAndIsZero(func() {
_ = crawler.Stop(crawler.GetContext())
})
Expand Down
5 changes: 2 additions & 3 deletions pkg/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ func (d *Downloader) Download(ctx pkg.Context, request pkg.Request) (response pk
response.SetRequest(request)
}

err = d.processResponse(ctx, response)
if err != nil {
d.logger.Error(err)
if err = d.processResponse(ctx, response); err != nil {
if errors.Is(err, pkg.ErrNeedRetry) {
return d.Download(ctx, request)
}
d.logger.Error(err)
return
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/middlewares/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,27 @@ func (m *RetryMiddleware) ProcessResponse(_ pkg.Context, response pkg.Response)
if len(request.GetOkHttpCodes()) > 0 {
okHttpCodes = request.GetOkHttpCodes()
}
if retryMaxTimes > 0 && response.GetResponse() == nil {

if response.GetResponse() == nil {
if request.GetRetryTimes() < retryMaxTimes {
request.SetRetryTimes(request.GetRetryTimes() + 1)
m.logger.Info(request.GetUniqueKey(), "retry times:", request.GetRetryTimes(), "SpendTime:", request.GetSpendTime())
m.logger.Infof("retry times: %d/%d, response nil, SpendTime: %v, UniqueKey: %s\n", request.GetRetryTimes(), retryMaxTimes, request.GetSpendTime(), request.GetUniqueKey())
err = pkg.ErrNeedRetry
return
}
err = fmt.Errorf("response nil")
m.logger.Error(request.GetUniqueKey(), err, request.GetRetryTimes(), retryMaxTimes)
return
}

if retryMaxTimes > 0 && !utils.InSlice(response.StatusCode(), okHttpCodes) {
if !utils.InSlice(response.StatusCode(), okHttpCodes) {
if request.GetRetryTimes() < retryMaxTimes {
request.SetRetryTimes(request.GetRetryTimes() + 1)
m.logger.Info(request.GetUniqueKey(), "retry times:", request.GetRetryTimes(), "SpendTime:", request.GetSpendTime())
m.logger.Infof("retry times: %d/%d, status code: %d, SpendTime: %v, UniqueKey: %s\n", request.GetRetryTimes(), retryMaxTimes, response.StatusCode(), request.GetSpendTime(), request.GetUniqueKey())
err = pkg.ErrNeedRetry
return
}

err = fmt.Errorf("status code error: %d", response.StatusCode())
m.logger.Error(request.GetUniqueKey(), err, request.GetRetryTimes(), retryMaxTimes)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipelines/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *DumpPipeline) ProcessItem(item pkg.Item) (err error) {
m.logger.Debug("Data", utils.JsonStr(data))

m.logger.Debug("referrer", item.Referrer())
m.logger.Info(m.Spider().Name(), item.GetContext().GetTask().GetId(), "item.Data:", utils.JsonStr(data))
m.logger.Debug(m.Spider().Name(), item.GetContext().GetTask().GetId(), "item.Data:", utils.JsonStr(data))
return
}

Expand Down
28 changes: 25 additions & 3 deletions pkg/referrer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
package pkg

type ReferrerPolicy string
import "strings"

type ReferrerPolicy uint8

const (
DefaultReferrerPolicy ReferrerPolicy = "DefaultReferrerPolicy"
NoReferrerPolicy ReferrerPolicy = "NoReferrerPolicy"
DefaultReferrerPolicy ReferrerPolicy = iota
NoReferrerPolicy
)

func (r ReferrerPolicy) String() string {
switch r {
case NoReferrerPolicy:
return "NoReferrerPolicy"
default:
return "DefaultReferrerPolicy"
}
}

func ReferrerPolicyFromString(referrerPolicy string) ReferrerPolicy {
switch strings.ToLower(referrerPolicy) {
case "1":
return NoReferrerPolicy
case "NoReferrerPolicy":
return NoReferrerPolicy
default:
return DefaultReferrerPolicy
}
}
25 changes: 9 additions & 16 deletions pkg/scheduler/kafka/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"golang.org/x/time/rate"
"net/http"
"reflect"
"runtime"
"strings"
"time"
)
Expand Down Expand Up @@ -76,39 +75,37 @@ out:
}
ctx.GetRequest().WithStatus(pkg.RequestStatusRunning)
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestRunning(ctx, nil)
go func(c pkg.Context, request pkg.Request) {
var response pkg.Response
response, err = s.Request(c, request)
if err != nil {
ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error())
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestStopped(ctx, err)
s.task.RequestOut()
return
}

go func(ctx pkg.Context, response pkg.Response) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
err = errors.New(string(buf))
s.logger.Error(err)
s.logger.Error(r)
err = errors.New("panic")
s.HandleError(ctx, response, err, request.GetErrBack())
}
s.task.MethodOut()
s.task.RequestOut()
}()

s.task.MethodIn()
if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil {
s.logger.Error(err)
s.HandleError(ctx, response, err, request.GetErrBack())
ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error())
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestStopped(ctx, err)
return
}
ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess)
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestStopped(ctx, nil)
}(c, response)
}(ctx, request)
}
Expand Down Expand Up @@ -151,7 +148,6 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro
if err != nil {
s.logger.Error(err)
s.crawler.GetSignal().RequestChanged(request)
ctx.GetTask().RequestPending(ctx, err)
return
}

Expand All @@ -163,12 +159,11 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro
}); err != nil {
s.logger.Error(err)
s.crawler.GetSignal().RequestChanged(request)
ctx.GetTask().RequestPending(ctx, err)
return
}

s.crawler.GetSignal().RequestChanged(request)
ctx.GetTask().RequestPending(ctx, nil)
ctx.GetTask().RequestIn()
return
}

Expand All @@ -184,7 +179,6 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) {
bs, err := json.Marshal(extra)
if err != nil {
s.logger.Error(err)
c.GetTask().RequestPending(c, err)
return
}

Expand All @@ -207,17 +201,16 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) {
Value: bs,
}); err != nil {
s.logger.Error(err)
c.GetTask().RequestPending(c, err)
return
}

c.GetTask().RequestPending(c, nil)
c.GetTask().RequestIn()
return
}

func (s *Scheduler) GetExtra(c pkg.Context, extra any) (err error) {
defer func() {
s.task.RequestStopped(c, nil)
s.task.RequestOut()
}()

extraValue := reflect.ValueOf(extra)
Expand Down
26 changes: 12 additions & 14 deletions pkg/scheduler/memory/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ out:
}
ctx.GetRequest().WithStatus(pkg.RequestStatusRunning)
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestRunning(ctx, nil)
go func(request pkg.Request) {
c := request.GetContext()
var err error
Expand All @@ -58,32 +57,31 @@ out:
if err != nil {
ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error())
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestStopped(ctx, err)
s.task.RequestOut()
return
}

go func(ctx pkg.Context, response pkg.Response) {
defer func() {
//if r := recover(); r != nil {
// buf := make([]byte, 1<<16)
// runtime.Stack(buf, true)
// err = errors.New(string(buf))
// //s.logger.Error(err)
// s.HandleError(ctx, response, err, request.GetErrBack())
//}
if r := recover(); r != nil {
s.logger.Error(r)
err = errors.New("panic")
s.HandleError(ctx, response, err, request.GetErrBack())
}
s.task.MethodOut()
s.task.RequestOut()
}()

s.task.MethodIn()
if err = s.spider.CallBack(request.GetCallBack())(ctx, response); err != nil {
s.logger.Error(err)
s.HandleError(ctx, response, err, request.GetErrBack())
ctx.GetRequest().WithStatus(pkg.RequestStatusFailure).WithStopReason(err.Error())
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestStopped(ctx, err)
return
}
ctx.GetRequest().WithStatus(pkg.RequestStatusSuccess)
s.crawler.GetSignal().RequestChanged(request)
s.task.RequestStopped(ctx, nil)
}(c, response)
}(request)
}
Expand Down Expand Up @@ -129,8 +127,8 @@ func (s *Scheduler) YieldRequest(ctx pkg.Context, request pkg.Request) (err erro

request.WithContext(ctx)
s.crawler.GetSignal().RequestChanged(request)
ctx.GetTask().RequestPending(ctx, nil)
s.requestChan <- request
ctx.GetTask().RequestIn()
return
}

Expand All @@ -151,13 +149,13 @@ func (s *Scheduler) YieldExtra(c pkg.Context, extra any) (err error) {
extraChan.(chan any) <- extra
}

c.GetTask().RequestPending(c, nil)
c.GetTask().RequestIn()
return
}

func (s *Scheduler) GetExtra(ctx pkg.Context, extra any) (err error) {
defer func() {
s.task.RequestStopped(ctx, nil)
s.task.RequestOut()
}()

extraValue := reflect.ValueOf(extra)
Expand Down
Loading

0 comments on commit 290c80e

Please sign in to comment.