Skip to content

Commit

Permalink
adds exitOnInactivity option and basic stats printing
Browse files Browse the repository at this point in the history
  • Loading branch information
gosom committed Sep 1, 2023
1 parent d67dd20 commit b9b5321
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 2 deletions.
3 changes: 2 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ var (
// ErrorNoCacher returned when you try to initialized with a nil Cacher
ErrorNoCacher = errors.New("no cacher set")
// ErrorNoCsvCapable returned when you try to write a csv file without a csv capable Data
ErrorNotCsvCapable = errors.New("not csv capable")
ErrorNotCsvCapable = errors.New("not csv capable")
ErrInactivityTimeout = errors.New("inactivity timeout")
)
101 changes: 100 additions & 1 deletion scrapemate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ func New(options ...func(*ScrapeMate) error) (*ScrapeMate, error) {
return s, nil
}

func WithExitBecauseOfInactivity(duration time.Duration) func(*ScrapeMate) error {
return func(s *ScrapeMate) error {
s.exitOnInactivity = duration > 0
s.exitOnInactivityDuration = duration

return nil
}
}

// WithFailed sets the failed jobs channel for the scrapemate
func WithFailed() func(*ScrapeMate) error {
return func(s *ScrapeMate) error {
Expand Down Expand Up @@ -182,6 +191,10 @@ type ScrapeMate struct {
results chan Result
failedJobs chan IJob
initJob IJob

stats stats
exitOnInactivity bool
exitOnInactivityDuration time.Duration
}

// Start starts the scraper
Expand Down Expand Up @@ -218,6 +231,53 @@ func (s *ScrapeMate) Start() error {
}()
}

wg.Add(1)

go func() {
defer wg.Done()

startTime := time.Now().UTC()
tickerDur := time.Minute

const (
divider = 2
secondsPerMinute = 60
)

if s.exitOnInactivity && s.exitOnInactivityDuration < tickerDur {
tickerDur = s.exitOnInactivityDuration / divider
}

ticker := time.NewTicker(tickerDur)
defer ticker.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
numOfJobsCompleted, numOfJobsFailed, lastActivityAt := s.stats.getStats()
perMinute := float64(numOfJobsCompleted) / time.Now().UTC().Sub(startTime).Seconds() * secondsPerMinute

s.log.Info("scrapemate stats",
"numOfJobsCompleted", numOfJobsCompleted,
"numOfJobsFailed", numOfJobsFailed,
"lastActivityAt", lastActivityAt,
"speed", fmt.Sprintf("%.2f jobs/min", perMinute),
)

if s.exitOnInactivity && time.Now().UTC().Sub(lastActivityAt) > s.exitOnInactivityDuration {
err := fmt.Errorf("%w: %s", ErrInactivityTimeout, lastActivityAt.Format(time.RFC3339))

s.log.Info("exiting because of inactivity", "error", err)
s.cancelFn(err)

return
}
}
}
}()

wg.Wait()

<-s.Done()
Expand Down Expand Up @@ -400,7 +460,12 @@ func (s *ScrapeMate) Done() <-chan struct{} {

// Err returns the error that caused scrapemate's context cancellation
func (s *ScrapeMate) Err() error {
return context.Cause(s.ctx)
err := context.Cause(s.ctx)
if errors.Is(err, ErrInactivityTimeout) {
return nil
}

return err
}

func (s *ScrapeMate) waitForSignal(sigChan <-chan os.Signal) {
Expand Down Expand Up @@ -486,12 +551,16 @@ func (s *ScrapeMate) startWorker(ctx context.Context) {
}

func (s *ScrapeMate) pushToFailedJobs(job IJob) {
s.stats.incJobsFailed()

if s.failedJobs != nil {
s.failedJobs <- job
}
}

func (s *ScrapeMate) finishJob(ctx context.Context, job IJob, ans any, next []IJob) error {
s.stats.incJobsCompleted()

if err := s.pushJobs(ctx, next); err != nil {
return fmt.Errorf("%w: while pushing jobs", err)
}
Expand All @@ -515,3 +584,33 @@ func (s *ScrapeMate) pushJobs(ctx context.Context, jobs []IJob) error {

return nil
}

type stats struct {
l sync.RWMutex
numOfJobsCompleted int64
numOfJobsFailed int64
lastActivityAt time.Time
}

func (o *stats) getStats() (completed, failed int64, lastActivityAt time.Time) {
o.l.RLock()
defer o.l.RUnlock()

return o.numOfJobsCompleted, o.numOfJobsFailed, o.lastActivityAt
}

func (o *stats) incJobsCompleted() {
o.l.Lock()
defer o.l.Unlock()

o.numOfJobsCompleted++
o.lastActivityAt = time.Now().UTC()
}

func (o *stats) incJobsFailed() {
o.l.Lock()
defer o.l.Unlock()

o.numOfJobsFailed++
o.lastActivityAt = time.Now().UTC()
}
42 changes: 42 additions & 0 deletions scrapemate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/gosom/kit/logging"

"github.com/gosom/scrapemate"
"github.com/gosom/scrapemate/mock"
)
Expand Down Expand Up @@ -186,6 +187,15 @@ func Test_New_With_Options(t *testing.T) {
require.Error(t, err)
})
})
t.Run("with exit on inactivity", func(t *testing.T) {
mate, err := scrapemate.New(
scrapemate.WithJobProvider(svc.provider),
scrapemate.WithHTTPFetcher(svc.fetcher),
scrapemate.WithExitBecauseOfInactivity(1*time.Second),
)
require.NoError(t, err)
require.NotNil(t, mate)
})
}

func Test_Done_Err(t *testing.T) {
Expand All @@ -212,6 +222,38 @@ func Test_Done_Err(t *testing.T) {

func Test_Start(t *testing.T) {
svc := getMockedServices(t)
t.Run("exits when inactivity", func(t *testing.T) {
mate, err := scrapemate.New(
scrapemate.WithJobProvider(svc.provider),
scrapemate.WithHTTPFetcher(svc.fetcher),
scrapemate.WithExitBecauseOfInactivity(time.Millisecond*500),
)
require.NoError(t, err)
require.NotNil(t, mate)

svc.provider.EXPECT().Jobs(gomock.Any()).DoAndReturn(func(ctx context.Context) (<-chan scrapemate.Job, <-chan error) {
ch := make(chan scrapemate.Job)
errch := make(chan error)
return ch, errch
})

mateErr := func() <-chan error {
errc := make(chan error)
go func() {
errc <- mate.Start()
}()
return errc
}

select {
case err = <-mateErr():
require.NoError(t, err)
case <-time.After(2 * time.Minute):
require.Fail(t, "should be done")
}

require.NoError(t, mate.Err())
})
t.Run("exits when context is cancelled", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
mate, err := scrapemate.New(
Expand Down
12 changes: 12 additions & 0 deletions scrapemateapp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scrapemateapp

import (
"errors"
"time"

"github.com/go-playground/validator/v10"
"github.com/gosom/scrapemate"
Expand Down Expand Up @@ -89,6 +90,15 @@ func Headfull() func(*jsOptions) {
}
}

// WithExitOnInactivity sets the duration after which the app will exit if there are no more jobs to run.
func WithExitOnInactivity(duration time.Duration) func(*Config) error {
return func(o *Config) error {
o.ExitOnInactivityDuration = duration

return nil
}
}

type jsOptions struct {
// Headfull is a flag to run the browser in headfull mode.
// By default, the browser is run in headless mode.
Expand Down Expand Up @@ -123,6 +133,8 @@ type Config struct {
Writers []scrapemate.ResultWriter `validate:"required,gt=0"`
// InitJob is the job to initialize the app with.
InitJob scrapemate.IJob
// ExitOnInactivityDuration is whether to exit the app when there are no more jobs to run.
ExitOnInactivityDuration time.Duration
}

func (o *Config) validate() error {
Expand Down
1 change: 1 addition & 0 deletions scrapemateapp/scrapemateapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (app *ScrapemateApp) getMate(ctx context.Context) (*scrapemate.ScrapeMate,
scrapemate.WithHTTPFetcher(fetcherInstance),
scrapemate.WithHTMLParser(parser.New()),
scrapemate.WithConcurrency(app.cfg.Concurrency),
scrapemate.WithExitBecauseOfInactivity(app.cfg.ExitOnInactivityDuration),
}

if app.cacher != nil {
Expand Down

0 comments on commit b9b5321

Please sign in to comment.