Skip to content

Commit

Permalink
Merge pull request #11 from lizongying/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
lizongying authored Nov 13, 2023
2 parents 5376111 + 0c4a8ca commit 3bc52a6
Show file tree
Hide file tree
Showing 27 changed files with 396 additions and 50 deletions.
2 changes: 2 additions & 0 deletions internal/spiders/test_must_ok_spider/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/lizongying/go-crawler/pkg/items"
"github.com/lizongying/go-crawler/pkg/mock_servers"
"github.com/lizongying/go-crawler/pkg/request"
"time"
)

type Spider struct {
Expand All @@ -22,6 +23,7 @@ func (s *Spider) ParseOk(ctx pkg.Context, response pkg.Response) (err error) {
}))

if extra.Count > 0 {
time.Sleep(time.Second * 10)
s.logger.Info("manual stop")
return
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/api/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
const UrlSpider = "/spider"

type Req struct {
Id string `json:"id"`
Name string `json:"name"`
}
type Spider struct {
Name string `json:"name"`
Name string `json:"name,omitempty"`
Funcs []string `json:"funcs,omitempty"`
}
type RouteSpider struct {
Request
Expand All @@ -32,15 +34,20 @@ func (h *RouteSpider) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if req.Name == "" {
for _, v := range h.crawler.GetSpiders() {
if v.Name() == req.Name {
var funcs []string
for k1, _ := range v.CallBacks() {
funcs = append(funcs, k1)
}
spider = Spider{
Name: v.Name(),
Name: v.Name(),
Funcs: funcs,
}
break
}
}
}

h.OutJson(w, 0, "", spider)
h.OutJson(w, 0, "", &spider)
}

func (h *RouteSpider) FromCrawler(crawler pkg.Crawler) pkg.Route {
Expand Down
2 changes: 2 additions & 0 deletions pkg/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type ContextCrawler interface {
}

type ContextSpider interface {
GetSpider() Spider
WithSpider(Spider) ContextSpider
GetId() uint64
WithId(uint64) ContextSpider
GetName() string
Expand Down
8 changes: 8 additions & 0 deletions pkg/context/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

type Spider struct {
Spider pkg.Spider `json:"-"`
Context context.Context `json:"-"`
Id uint64 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Expand All @@ -17,6 +18,13 @@ type Spider struct {
UpdateTime utils.Timestamp `json:"update_time,omitempty"`
}

func (c *Spider) GetSpider() pkg.Spider {
return c.Spider
}
func (c *Spider) WithSpider(spider pkg.Spider) pkg.ContextSpider {
c.Spider = spider
return c
}
func (c *Spider) GetId() uint64 {
return c.Id
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *Crawler) RunJob(ctx context.Context, spiderName string, startFunc strin
c.logger.Info("name", spiderName)
c.logger.Info("func", startFunc)
c.logger.Info("args", args)
c.logger.Info("mode", mode)
c.logger.Info("mode", mode.String())
c.logger.Info("spec", spec)

id, err = spider.Run(ctx, startFunc, args, mode, spec, true)
Expand Down
5 changes: 4 additions & 1 deletion pkg/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
JobStatusReady
JobStatusStarting
JobStatusRunning
JobStatusIdle
JobStatusStopping
JobStatusStopped
)
Expand All @@ -25,8 +26,10 @@ func (s *JobStatus) String() string {
case 3:
return "running"
case 4:
return "stopping"
return "idle"
case 5:
return "stopping"
case 6:
return "stopped"
default:
return "unknown"
Expand Down
1 change: 1 addition & 0 deletions pkg/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type Request interface {

type CallBack func(Context, Response) error
type ErrBack func(Context, Response, error)
type StartFunc func(Context, string) error

type RequestStatus uint8

Expand Down
4 changes: 2 additions & 2 deletions pkg/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ type Spider interface {
SetSpider(spider Spider) Spider
CallBacks() map[string]CallBack
CallBack(name string) (callback CallBack)
SetCallBacks(map[string]CallBack) Spider
ErrBacks() map[string]ErrBack
ErrBack(name string) (errBack ErrBack)
SetErrBacks(map[string]ErrBack) Spider
StartFuncs() map[string]StartFunc
StartFunc(name string) (startFunc StartFunc)
GetAllowedDomains() []string
ReplaceAllowedDomains([]string) error
SetAllowedDomain(string)
Expand Down
31 changes: 20 additions & 11 deletions pkg/spider/base_spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type BaseSpider struct {
browsers map[pkg.Browser]struct{}
callBacks map[string]pkg.CallBack
errBacks map[string]pkg.ErrBack
startFuncs map[string]pkg.StartFunc
defaultAllowedDomains map[string]struct{}
allowedDomains map[string]struct{}
retryMaxTimes uint8
Expand Down Expand Up @@ -197,7 +198,7 @@ func (s *BaseSpider) GetSpider() pkg.Spider {
}
func (s *BaseSpider) SetSpider(spider pkg.Spider) pkg.Spider {
s.spider = spider
s.registerParser()
s.registerFuncs()
return s
}
func (s *BaseSpider) CallBacks() map[string]pkg.CallBack {
Expand All @@ -212,10 +213,6 @@ func (s *BaseSpider) CallBack(name string) (callback pkg.CallBack) {
}
return
}
func (s *BaseSpider) SetCallBacks(callBacks map[string]pkg.CallBack) pkg.Spider {
s.callBacks = callBacks
return s
}
func (s *BaseSpider) ErrBacks() map[string]pkg.ErrBack {
return s.errBacks
}
Expand All @@ -228,9 +225,14 @@ func (s *BaseSpider) ErrBack(name string) (errBack pkg.ErrBack) {
}
return
}
func (s *BaseSpider) SetErrBacks(errBacks map[string]pkg.ErrBack) pkg.Spider {
s.errBacks = errBacks
return s
func (s *BaseSpider) StartFuncs() map[string]pkg.StartFunc {
return s.startFuncs
}
func (s *BaseSpider) StartFunc(name string) (startFunc pkg.StartFunc) {
if name != "" {
startFunc = s.startFuncs[name]
}
return
}
func (s *BaseSpider) GetCrawler() pkg.Crawler {
return s.Crawler
Expand All @@ -252,9 +254,10 @@ func (s *BaseSpider) WithOptions(options ...pkg.SpiderOption) pkg.Spider {
s.options = options
return s
}
func (s *BaseSpider) registerParser() {
func (s *BaseSpider) registerFuncs() {
callBacks := make(map[string]pkg.CallBack)
errBacks := make(map[string]pkg.ErrBack)
startFuncs := make(map[string]pkg.StartFunc)
rv := reflect.ValueOf(s.spider)
rt := rv.Type()
l := rt.NumMethod()
Expand All @@ -268,9 +271,14 @@ func (s *BaseSpider) registerParser() {
if ok {
errBacks[name] = errBack
}
startFunc, ok := rv.Method(i).Interface().(func(pkg.Context, string) error)
if ok {
startFuncs[name] = startFunc
}
}
s.SetCallBacks(callBacks)
s.SetErrBacks(errBacks)
s.callBacks = callBacks
s.errBacks = errBacks
s.startFuncs = startFuncs
}

func (s *BaseSpider) Request(ctx pkg.Context, request pkg.Request) (response pkg.Response, err error) {
Expand Down Expand Up @@ -575,6 +583,7 @@ func (s *BaseSpider) FromCrawler(crawler pkg.Crawler) pkg.Spider {
s.WithContext(new(crawlerContext.Context).
WithCrawler(crawler.GetContext().GetCrawler()).
WithSpider(new(crawlerContext.Spider).
WithSpider(s.spider).
WithId(s.Crawler.GenUid()).
WithName(s.spider.Name()).
WithStatus(pkg.SpiderStatusReady)))
Expand Down
18 changes: 14 additions & 4 deletions pkg/spider/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func (j *Job) run(ctx context.Context) (err error) {
j.context.WithJobContext(ctx)
j.context.WithJobSubId(j.crawler.GenUid())

j.context.WithJobStatus(pkg.JobStatusRunning)
j.crawler.GetSignal().JobChanged(j.context)

go func() {
select {
case <-j.context.GetJobContext().Done():
Expand Down Expand Up @@ -149,6 +146,15 @@ func (j *Job) run(ctx context.Context) (err error) {
}

func (j *Job) stop(err error) {
if j.context.GetJobStatus() == pkg.JobStatusStopped {
err = errors.New("job has been finished")
j.logger.Error(err)
return
}

j.context.GetJob().WithStatus(pkg.JobStatusIdle)
j.crawler.GetSignal().JobChanged(j.context)

if err != nil {
if j.context.GetJobMode() == pkg.JobModeCron {
close(j.cronJob)
Expand Down Expand Up @@ -180,11 +186,15 @@ func (j *Job) stop(err error) {
return
}
func (j *Job) startTask() {
// idle when job stopped
if j.context.GetJobStatus() != pkg.JobStatusRunning {
j.context.WithJobStatus(pkg.JobStatusRunning)
j.crawler.GetSignal().JobChanged(j.context)
}
_, _ = new(Task).FromSpider(j.spider).WithJob(j).start(j.context)
j.task.In()
}
func (j *Job) TaskStopped(ctx pkg.Context, _ error) {
j.logger.Info("JobSubId", ctx.GetTask().GetJobSubId(), j.context.GetJobSubId())
if ctx.GetTask().GetJobSubId() == j.context.GetJobSubId() {
j.task.Out()
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type StatisticsNode interface {
DecJob()
IncTask()
DecTask()
IncRequest()
DecRequest()
IncRecord()
DecRecord()
Marshal() (bytes []byte, err error)
Expand All @@ -20,11 +22,14 @@ type StatisticsSpider interface {
WithId(id uint64) StatisticsSpider
GetSpider() string
WithSpider(spider string) StatisticsSpider
WithFuncs(funcs []string) StatisticsSpider
WithNode(node string) StatisticsSpider
IncJob()
DecJob()
IncTask()
DecTask()
IncRequest()
DecRequest()
IncRecord()
DecRecord()
GetLastTaskId() string
Expand All @@ -43,6 +48,8 @@ type StatisticsJob interface {
WithSpider(spider string) StatisticsJob
IncTask()
DecTask()
IncRequest()
DecRequest()
IncRecord()
DecRecord()
WithEnable(enable bool) StatisticsJob
Expand All @@ -52,6 +59,8 @@ type StatisticsTask interface {
WithStatus(status TaskStatus) StatisticsTask
GetId() string
WithId(id string) StatisticsTask
IncRequest()
DecRequest()
IncRecord()
DecRecord()
GetNode() string
Expand Down
7 changes: 7 additions & 0 deletions pkg/statistics/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Job struct {
Node string `json:"node,omitempty"`
Spider string `json:"spider,omitempty"`
Task uint32 `json:"task,omitempty"`
Request uint32 `json:"request,omitempty"`
Record uint32 `json:"record,omitempty"`
Enable bool `json:"enable,omitempty"`
StartTime utils.Timestamp `json:"start_time,omitempty"`
Expand Down Expand Up @@ -73,6 +74,12 @@ func (s *Job) IncTask() {
func (s *Job) DecTask() {
atomic.AddUint32(&s.Task, ^uint32(0))
}
func (s *Job) IncRequest() {
atomic.AddUint32(&s.Request, 1)
}
func (s *Job) DecRequest() {
atomic.AddUint32(&s.Request, ^uint32(0))
}
func (s *Job) IncRecord() {
atomic.AddUint32(&s.Record, 1)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/statistics/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Node struct {
Spider uint32 `json:"spider,omitempty"`
Job uint32 `json:"job,omitempty"`
Task uint32 `json:"task,omitempty"`
Request uint32 `json:"request,omitempty"`
Record uint32 `json:"record,omitempty"`
StartTime utils.Timestamp `json:"start_time"`
FinishTime utils.Timestamp `json:"finish_time"`
Expand Down Expand Up @@ -71,6 +72,12 @@ func (n *Node) IncTask() {
func (n *Node) DecTask() {
atomic.AddUint32(&n.Task, ^uint32(0))
}
func (n *Node) IncRequest() {
atomic.AddUint32(&n.Request, 1)
}
func (n *Node) DecRequest() {
atomic.AddUint32(&n.Request, ^uint32(0))
}
func (n *Node) IncRecord() {
atomic.AddUint32(&n.Record, 1)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/statistics/spider/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ type Spider struct {
Id *utils.Uint64 `json:"id,omitempty"`
Node string `json:"node,omitempty"`
Spider string `json:"spider,omitempty"`
Funcs []string `json:"funcs,omitempty"`
Job uint32 `json:"job,omitempty"`
Task uint32 `json:"task,omitempty"`
Request uint32 `json:"request,omitempty"`
Record uint32 `json:"record,omitempty"`
StartTime utils.Timestamp `json:"start_time,omitempty"`
FinishTime utils.Timestamp `json:"finish_time,omitempty"`
Expand Down Expand Up @@ -57,6 +59,13 @@ func (s *Spider) WithSpider(spider string) pkg.StatisticsSpider {
s.Spider = spider
return s
}
func (s *Spider) GetFuncs() []string {
return s.Funcs
}
func (s *Spider) WithFuncs(funcs []string) pkg.StatisticsSpider {
s.Funcs = funcs
return s
}
func (s *Spider) GetNode() string {
return s.Node
}
Expand All @@ -76,6 +85,12 @@ func (s *Spider) IncTask() {
func (s *Spider) DecTask() {
atomic.AddUint32(&s.Task, ^uint32(0))
}
func (s *Spider) IncRequest() {
atomic.AddUint32(&s.Request, 1)
}
func (s *Spider) DecRequest() {
atomic.AddUint32(&s.Request, ^uint32(0))
}
func (s *Spider) IncRecord() {
atomic.AddUint32(&s.Record, 1)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ func (s *Statistics) spiderChanged(ctx pkg.Context) {
spiderOne, ok := s.Spiders[ctx.GetSpiderName()]
if !ok {
s.Nodes[ctx.GetCrawlerId()].IncSpider()
var funcs []string
for k1, _ := range ctx.GetSpider().GetSpider().StartFuncs() {
funcs = append(funcs, k1)
}
spiderOne = new(statisticsSpider.Spider).
WithId(ctx.GetSpider().GetId()).
WithSpider(ctx.GetSpiderName()).
WithFuncs(funcs).
WithNode(ctx.GetCrawlerId())
s.Spiders[ctx.GetSpiderName()] = spiderOne
}
Expand Down
Loading

0 comments on commit 3bc52a6

Please sign in to comment.