Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Commit

Permalink
fix(web): fix bugs and improve the performance (#377)
Browse files Browse the repository at this point in the history
Because

- async process should lock first before inserting data to avoid race
condition
- colly can parse response onResponse

This commit

- fix the bug
- refactor the web crawl, and it will improve the speeds and save the
resources
  • Loading branch information
chuang8511 authored Sep 27, 2024
1 parent 38074c8 commit f0f4f89
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 94 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions operator/web/v0/README.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Crawl the website contents and manipulate html with jquery command. The sequence
| Remove Tags | `remove-tags` | array[string] | A list of tags, classes, and ids to remove from the output. If empty, no tags will be removed. Example: 'script, .ad, #footer' |
| Only Include Tags | `only-include-tags` | array[string] | A list of tags, classes, and ids to include in the output. If empty, all tags will be included. Example: 'script, .ad, #footer' |
| Timeout | `timeout` | integer | The time to wait for the page to load in milliseconds. Min 0, Max 60000. |
| Max Depth | `max-depth` | integer | The max number of depth the crawler will go. If the number is set to 1, the crawler will only scrape the target URL. If the number is set to 0, the crawler will scrape all the pages until the count of pages meets max-k. |
</div>


Expand Down
15 changes: 15 additions & 0 deletions operator/web/v0/config/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@
"maximum": 60000,
"title": "Timeout",
"type": "integer"
},
"max-depth": {
"default": 0,
"description": "The max number of depth the crawler will go. If the number is set to 1, the crawler will only scrape the target URL. If the number is set to 0, the crawler will scrape all the pages until the count of pages meets max-k.",
"instillAcceptFormats": [
"integer"
],
"instillUIOrder": 9,
"instillUpstreamTypes": [
"value",
"reference"
],
"minimum": 0,
"title": "Max Depth",
"type": "integer"
}
},
"required": [
Expand Down
228 changes: 135 additions & 93 deletions operator/web/v0/crawl_website.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package web
import (
"fmt"
"log"
"math/rand"
"net/url"
"strings"
"sync"
"time"

"golang.org/x/exp/rand"
"google.golang.org/protobuf/types/known/structpb"

"github.com/PuerkitoBio/goquery"
colly "github.com/gocolly/colly/v2"

"github.com/instill-ai/component/base"
Expand Down Expand Up @@ -42,6 +45,22 @@ type ScrapeWebsiteInput struct {
OnlyIncludeTags []string `json:"only-include-tags"`
// Timeout: The number of milliseconds to wait before scraping the web page. Min 0, Max 60000.
Timeout int `json:"timeout"`
// MaxDepth: The maximum depth of the pages to scrape.
MaxDepth int `json:"max-depth"`
}

func (inputStruct *ScrapeWebsiteInput) Preset() {
if inputStruct.IncludeLinkHTML == nil {
b := false
inputStruct.IncludeLinkHTML = &b
}
if inputStruct.IncludeLinkText == nil {
b := false
inputStruct.IncludeLinkText = &b
}
if inputStruct.MaxK < 0 {
inputStruct.MaxK = 0
}
}

// ScrapeWebsiteOutput defines the output of the scrape website task
Expand All @@ -52,36 +71,6 @@ type ScrapeWebsiteOutput struct {

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

// randomString generates a random string of length 10-20
func randomString() string {
b := make([]byte, rand.Intn(10)+10)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}

// stripQueryAndTrailingSlash removes query parameters and trailing '/' from a URL
func stripQueryAndTrailingSlash(u *url.URL) *url.URL {
// Remove query parameters by setting RawQuery to an empty string
u.RawQuery = ""

// Remove trailing '/' from the path
u.Path = strings.TrimSuffix(u.Path, "/")

return u
}

// existsInSlice checks if a string exists in a slice
func existsInSlice(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true // Item already exists, so don't add it again
}
}
return false // Item doesn't exist, so add it to the slice
}

// Scrape crawls a webpage and returns a slice of PageInfo
func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, error) {
inputStruct := ScrapeWebsiteInput{}
Expand All @@ -91,96 +80,99 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro
return nil, fmt.Errorf("error converting input to struct: %v", err)
}

inputStruct.Preset()

output := ScrapeWebsiteOutput{}

if inputStruct.IncludeLinkHTML == nil {
b := false
inputStruct.IncludeLinkHTML = &b
}
if inputStruct.IncludeLinkText == nil {
b := false
inputStruct.IncludeLinkText = &b
}
if inputStruct.MaxK < 0 {
inputStruct.MaxK = 0
}
c := initColly(inputStruct)

var mu sync.Mutex
pageLinks := []string{}

c := colly.NewCollector(
colly.Async(),
)
if len(inputStruct.AllowedDomains) > 0 {
c.AllowedDomains = inputStruct.AllowedDomains
}
c.AllowURLRevisit = false

// On every a element which has href attribute call callback
// Wont be called if error occurs
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
// If we set output.Pages to the slice of PageInfo, it will take a longer time if the first html page has a lot of links.
// To improve the small Max-K execution time, we will use a separate slice to store the links.
// However, when K is big, the output length could be less than K.
// So, I set twice the MaxK to stop the scraping.
if inputStruct.MaxK > 0 && len(pageLinks) >= inputStruct.MaxK*2 {
return
}

link := e.Attr("href")
err := c.Visit(e.Request.AbsoluteURL(link))
if err != nil {
log.Println("Error visiting link:", link, "Error:", err)

if util.InSlice(pageLinks, link) {
return
}

pageLinks = append(pageLinks, link)

_ = e.Request.Visit(link)
})

// Set error handler
c.OnError(func(r *colly.Response, err error) {
fmt.Println("Request URL:", r.Request.URL, "failed with response:", r, "\nError:", err)
log.Println("Request URL:", r.Request.URL, "failed with response:", r, "\nError:", err)
})

c.OnRequest(func(r *colly.Request) {

// Before length of output page is over, we should always send request.
if inputStruct.MaxK > 0 && len(output.Pages) >= inputStruct.MaxK {
r.Abort()
return
}

// Set a random user agent to avoid being blocked by websites
r.Headers.Set("User-Agent", randomString())
// Strip query parameters and trailing '/' from the URL
strippedURL := stripQueryAndTrailingSlash(r.URL)
// Check if the URL already exists in the slice
if !existsInSlice(pageLinks, strippedURL.String()) {
// Add the URL to the slice if it doesn't already exist
pageLinks = append(pageLinks, strippedURL.String())
// Scrape the webpage information
doc, err := getDocAfterRequestURL(strippedURL.String(), inputStruct.Timeout)
})

c.OnResponse(func(r *colly.Response) {

strippedURL := stripQueryAndTrailingSlash(r.Request.URL)

page := PageInfo{}

page.Link = strippedURL.String()

html := string(r.Body)
ioReader := strings.NewReader(html)
doc, err := goquery.NewDocumentFromReader(ioReader)

if err != nil {
fmt.Printf("Error parsing %s: %v", strippedURL.String(), err)
return
}

title := util.ScrapeWebpageTitle(doc)
page.Title = title

if *inputStruct.IncludeLinkHTML {
page.LinkHTML = html
}

if *inputStruct.IncludeLinkText {
domain, err := util.GetDomainFromURL(strippedURL.String())

if err != nil {
fmt.Printf("Error parsing %s: %v", strippedURL.String(), err)
log.Printf("Error getting domain from %s: %v", strippedURL.String(), err)
return
}
page := PageInfo{}
title := util.ScrapeWebpageTitle(doc)
page.Title = title
page.Link = strippedURL.String()

if *inputStruct.IncludeLinkHTML || *inputStruct.IncludeLinkText {
html, err := util.ScrapeWebpageHTML(doc)
if err != nil {
fmt.Printf("Error scraping HTML from %s: %v", strippedURL.String(), err)
return
}

if *inputStruct.IncludeLinkHTML {
page.LinkHTML = html
}

if *inputStruct.IncludeLinkText {
domain, err := util.GetDomainFromURL(strippedURL.String())
if err != nil {
fmt.Printf("Error getting domain from %s: %v", strippedURL.String(), err)
return
}
markdown, err := util.ScrapeWebpageHTMLToMarkdown(html, domain)
if err != nil {
fmt.Printf("Error scraping text from %s: %v", strippedURL.String(), err)
return
}
page.LinkText = markdown
}

markdown, err := util.ScrapeWebpageHTMLToMarkdown(html, domain)

if err != nil {
log.Printf("Error scraping text from %s: %v", strippedURL.String(), err)
return
}

page.LinkText = markdown
}

defer mu.Unlock()
mu.Lock()
// If we do not set this condition, the length of output.Pages could be over the limit.
if len(output.Pages) < inputStruct.MaxK {
output.Pages = append(output.Pages, page)
}
})
Expand All @@ -200,3 +192,53 @@ func (e *execution) CrawlWebsite(input *structpb.Struct) (*structpb.Struct, erro
return outputStruct, nil

}

// randomString generates a random string of length 10-20
func randomString() string {
b := make([]byte, rand.Intn(10)+10)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}

// stripQueryAndTrailingSlash removes query parameters and trailing '/' from a URL
func stripQueryAndTrailingSlash(u *url.URL) *url.URL {
// Remove query parameters by setting RawQuery to an empty string
u.RawQuery = ""

// Remove trailing '/' from the path
u.Path = strings.TrimSuffix(u.Path, "/")

return u
}

func initColly(inputStruct ScrapeWebsiteInput) *colly.Collector {
c := colly.NewCollector(
colly.MaxDepth(inputStruct.MaxDepth),
colly.Async(true),
)

// Limit the number of requests to avoid being blocked.
// Set it to 10 first in case sending too many requests at once.
var parallel int
if inputStruct.MaxK < 10 {
parallel = inputStruct.MaxK
} else {
parallel = 10
}

_ = c.Limit(&colly.LimitRule{
DomainGlob: "*",
Parallelism: parallel,
})

c.SetRequestTimeout(time.Duration(inputStruct.Timeout) * time.Millisecond)

if len(inputStruct.AllowedDomains) > 0 {
c.AllowedDomains = inputStruct.AllowedDomains
}
c.AllowURLRevisit = false

return c
}

0 comments on commit f0f4f89

Please sign in to comment.