-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathengine.go
349 lines (292 loc) · 7.07 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package ant
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"github.com/yields/ant/internal/normalize"
"github.com/yields/ant/internal/robots"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
// EngineConfig configures the engine.
type EngineConfig struct {
// Scraper is the scraper to use.
//
// If nil, NewEngine returns an error.
Scraper Scraper
// Deduper is the URL de-duplicator to use.
//
// If nil, DedupeMap is used.
Deduper Deduper
// Fetcher is the page fetcher to use.
//
// If nil, the default HTTP fetcher is used.
Fetcher *Fetcher
// Queue is the URL queue to use.
//
// If nil, the default in-memory queue is used.
Queue Queue
// Limiter is the rate limiter to use.
//
// The limiter is called with each URL before
// it is fetched.
//
// If nil, no limits are used.
Limiter Limiter
// Matcher is the URL matcher to use.
//
// The matcher is called with a URL before it is queued
// if it returns false the URL is discarded.
//
// If nil, all URLs are queued.
Matcher Matcher
// Impolite skips any robots.txt checking.
//
// Note that it does not affect any configured
// ratelimiters or matchers.
//
// By default the engine checks robots.txt, it uses
// the default ant.UserAgent.
Impolite bool
// Workers specifies the amount of workers to use.
//
// Every worker the engine start consumes URLs from the queue
// and starts a goroutine for each URL.
//
// If <= 0, defaults to 1.
Workers int
// Concurrency is the maximum amount of URLs to process
// at any given time.
//
// The engine uses a global semaphore to limit the amount
// of goroutines started by the workers.
//
// If <= 0, there's no limit.
Concurrency int
}
// Engine implements web crawler engine.
type Engine struct {
deduper Deduper
scraper Scraper
fetcher *Fetcher
queue Queue
matcher Matcher
limiter Limiter
robots *robots.Cache
impolite bool
workers int
sema *semaphore.Weighted
}
// NewEngine returns a new engine.
func NewEngine(c EngineConfig) (*Engine, error) {
if c.Scraper == nil {
return nil, errors.New("ant: scraper is required")
}
if c.Deduper == nil {
c.Deduper = DedupeMap()
}
if c.Fetcher == nil {
c.Fetcher = &Fetcher{}
}
if c.Workers <= 0 {
c.Workers = 1
}
if c.Queue == nil {
c.Queue = MemoryQueue(c.Workers)
}
var sema *semaphore.Weighted
if n := int64(c.Concurrency); n > 0 {
sema = semaphore.NewWeighted(n)
}
return &Engine{
scraper: c.Scraper,
deduper: c.Deduper,
fetcher: c.Fetcher,
queue: c.Queue,
matcher: c.Matcher,
limiter: c.Limiter,
robots: robots.NewCache(DefaultClient, 1000),
impolite: c.Impolite,
workers: c.Workers,
sema: sema,
}, nil
}
// Run runs the engine with the given start urls.
func (eng *Engine) Run(ctx context.Context, urls ...string) error {
var eg, subctx = errgroup.WithContext(ctx)
// Enqueue initial URLs.
if err := eng.Enqueue(ctx, urls...); err != nil {
return fmt.Errorf("ant: enqueue - %w", err)
}
// Spawn workers.
for i := 0; i < eng.workers; i++ {
eg.Go(func() error {
defer eng.queue.Close(ctx)
return eng.run(subctx)
})
}
// Wait until all URLs are handled.
eng.queue.Wait()
if err := eng.queue.Close(ctx); err != nil {
return err
}
// Wait until all workers shutdown.
if err := eg.Wait(); err != nil {
return fmt.Errorf("ant: run - %w", err)
}
return nil
}
// Enqueue enqueues the given set of URLs.
//
// The method blocks until all URLs are queued
// or the given context is canceled.
//
// The method will also de-duplicate the URLs, ensuring
// that URLs will not be visited more than once.
func (eng *Engine) Enqueue(ctx context.Context, rawurls ...string) error {
var batch = make(URLs, 0, len(rawurls))
for _, rawurl := range rawurls {
u, err := url.Parse(rawurl)
if err != nil {
return fmt.Errorf("ant: parse url %q - %w", rawurl, err)
}
switch u.Scheme {
case "https", "http":
default:
return fmt.Errorf("ant: cannot enqueue invalid URL %q", u)
}
batch = append(batch, u)
}
return eng.enqueue(ctx, batch)
}
// Enqueue enqueues the given parsed urls.
func (eng *Engine) enqueue(ctx context.Context, batch URLs) error {
for j := range batch {
batch[j] = normalize.URL(batch[j])
}
next, err := eng.dedupe(ctx, eng.matches(batch))
if err != nil {
return err
}
if err := eng.queue.Enqueue(ctx, next); err != nil {
return err
}
return nil
}
// Run runs a single crawl worker.
//
// The worker is in charge of fetching a url from
// the queue, creating a page and then calling the scraper.
func (eng *Engine) run(ctx context.Context) error {
eg, subctx := errgroup.WithContext(ctx)
for {
url, err := eng.queue.Dequeue(ctx)
if errors.Is(err, io.EOF) ||
errors.Is(err, context.Canceled) {
return eg.Wait()
}
if err != nil {
return err
}
if eng.sema != nil {
if err := eng.sema.Acquire(ctx, 1); err != nil {
return err
}
}
eg.Go(func() error {
if eng.sema != nil {
defer eng.sema.Release(1)
}
return eng.process(subctx, url)
})
}
}
// Process processes a single url.
func (eng *Engine) process(ctx context.Context, url *URL) error {
defer eng.queue.Done(ctx, url)
// Check robots.txt.
if !eng.impolite {
allowed, err := eng.robots.Allowed(ctx, robots.Request{
URL: url,
UserAgent: UserAgent.String(),
})
if err != nil {
return err
}
if !allowed {
return nil
}
}
// Potential limits.
if err := eng.limit(ctx, url); err != nil {
return err
}
// Scrape the URL.
urls, err := eng.scrape(ctx, url)
if err != nil {
return err
}
// Enqueue URLs.
if err := eng.enqueue(ctx, urls); err != nil {
return fmt.Errorf("ant: enqueue - %w", err)
}
return nil
}
// Scrape scrapes the given URL and returns the next URLs.
func (eng *Engine) scrape(ctx context.Context, url *URL) (URLs, error) {
page, err := eng.fetcher.Fetch(ctx, url)
if err != nil {
return nil, fmt.Errorf("ant: fetch %q - %w", url, err)
}
if page == nil {
return nil, nil
}
defer page.close()
urls, err := eng.scraper.Scrape(ctx, page)
if err != nil {
return nil, fmt.Errorf("ant: scrape %q - %w", url, err)
}
return urls, nil
}
// Dedupe de-duplicates the given slice of URLs.
func (eng *Engine) dedupe(ctx context.Context, urls URLs) (URLs, error) {
deduped, err := eng.deduper.Dedupe(ctx, urls)
if err != nil {
return nil, fmt.Errorf("ant: dedupe - %w", err)
}
return deduped, nil
}
// Limit runs all configured limiters.
func (eng *Engine) limit(ctx context.Context, url *URL) error {
if eng.limiter != nil {
if err := eng.limiter.Limit(ctx, url); err != nil {
return fmt.Errorf("limit %q - %w", url, err)
}
}
if eng.impolite {
return nil
}
err := eng.robots.Wait(ctx, robots.Request{
URL: url,
UserAgent: UserAgent.String(),
})
if err != nil {
return fmt.Errorf("ant: robots wait - %w", err)
}
return nil
}
// Matches returns all URLs that match the matcher.
func (eng *Engine) matches(urls URLs) URLs {
if eng.matcher != nil {
ret := make(URLs, 0, len(urls))
for _, u := range urls {
if eng.matcher.Match(u) {
ret = append(ret, u)
}
}
return ret
}
return urls
}