Skip to content

Commit

Permalink
More efficient database operations (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosom authored Oct 13, 2024
1 parent b72e54e commit 16bed9e
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 88 deletions.
234 changes: 151 additions & 83 deletions postgres/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"database/sql"
"encoding/gob"
"errors"
"fmt"
"sync"
"time"

"github.com/gosom/scrapemate"
Expand All @@ -21,108 +23,56 @@ const (
var _ scrapemate.JobProvider = (*provider)(nil)

type provider struct {
db *sql.DB
db *sql.DB
mu *sync.Mutex
jobc chan scrapemate.IJob
errc chan error
started bool
}

func NewProvider(db *sql.DB) scrapemate.JobProvider {
prov := provider{
db: db,
mu: &sync.Mutex{},
errc: make(chan error, 1),
jobc: make(chan scrapemate.IJob, 100),
}

return &prov
}

//nolint:gocritic // it contains about unnamed results
func (p *provider) Jobs(ctx context.Context) (<-chan scrapemate.IJob, <-chan error) {
outc := make(chan scrapemate.IJob)
errc := make(chan error, 1)
q := `
WITH updated AS (
UPDATE gmaps_jobs
SET status = $1
WHERE id IN (
SELECT id from gmaps_jobs
WHERE status = $2
ORDER BY priority ASC, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1
)
RETURNING *
)
SELECT payload_type, payload from updated ORDER by priority ASC, created_at ASC
`

go func() {
defer close(outc)
defer close(errc)
p.mu.Lock()
if !p.started {
go p.fetchJobs(ctx)

const tickEvery = 100 * time.Millisecond

ticker := time.NewTicker(tickEvery)

defer ticker.Stop()
p.started = true
}
p.mu.Unlock()

go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

rows, err := p.db.QueryContext(ctx, q, statusQueued, statusNew)
if err != nil {
case err := <-p.errc:
errc <- err

return
}

for rows.Next() {
var (
payloadType string
payload []byte
)

if err := rows.Scan(&payloadType, &payload); err != nil {
errc <- err

case job, ok := <-p.jobc:
if !ok {
return
}

var buf bytes.Buffer

buf.Write(payload)

dec := gob.NewDecoder(&buf)

var job scrapemate.IJob

if payloadType == "search" {
j := new(gmaps.GmapJob)

if err := dec.Decode(j); err != nil {
errc <- err

return
}

job = j
} else if payloadType == "place" {
j := new(gmaps.PlaceJob)

if err := dec.Decode(&j); err != nil {
errc <- err

return
}

job = j
} else {
errc <- errors.New("invalid payload type")

select {
case outc <- job:
case <-ctx.Done():
return
}

outc <- job
}

if err := rows.Err(); err != nil {
errc <- err
return
}

if err := rows.Close(); err != nil {
errc <- err
return
}
}
}()
Expand Down Expand Up @@ -166,11 +116,129 @@ func (p *provider) Push(ctx context.Context, job scrapemate.IJob) error {
return err
}

func NewProvider(db *sql.DB) scrapemate.JobProvider {
return &provider{db: db}
func (p *provider) fetchJobs(ctx context.Context) {
defer close(p.jobc)
defer close(p.errc)

q := `
WITH updated AS (
UPDATE gmaps_jobs
SET status = $1
WHERE id IN (
SELECT id from gmaps_jobs
WHERE status = $2
ORDER BY priority ASC, created_at ASC FOR UPDATE SKIP LOCKED
LIMIT 50
)
RETURNING *
)
SELECT payload_type, payload from updated ORDER by priority ASC, created_at ASC
`

baseDelay := time.Second
maxDelay := time.Minute
factor := 2
currentDelay := baseDelay

jobs := make([]scrapemate.IJob, 0, 50)

for {
select {
case <-ctx.Done():
return
default:
}

rows, err := p.db.QueryContext(ctx, q, statusQueued, statusNew)
if err != nil {
p.errc <- err

return
}

for rows.Next() {
var (
payloadType string
payload []byte
)

if err := rows.Scan(&payloadType, &payload); err != nil {
p.errc <- err

return
}

job, err := decodeJob(payloadType, payload)
if err != nil {
p.errc <- err

return
}

jobs = append(jobs, job)
}

if err := rows.Err(); err != nil {
p.errc <- err

return
}

if err := rows.Close(); err != nil {
p.errc <- err

return
}

if len(jobs) > 0 {
for _, job := range jobs {
select {
case p.jobc <- job:
case <-ctx.Done():
return
}
}

jobs = jobs[:0]
} else if len(jobs) == 0 {
select {
case <-time.After(currentDelay):
currentDelay = time.Duration(float64(currentDelay) * float64(factor))
if currentDelay > maxDelay {
currentDelay = maxDelay
}
case <-ctx.Done():
return
}
}
}
}

type encjob struct {
Type string
Data scrapemate.IJob
}

func decodeJob(payloadType string, payload []byte) (scrapemate.IJob, error) {
buf := bytes.NewBuffer(payload)
dec := gob.NewDecoder(buf)

switch payloadType {
case "search":
j := new(gmaps.GmapJob)
if err := dec.Decode(j); err != nil {
return nil, fmt.Errorf("failed to decode search job: %w", err)
}

return j, nil
case "place":
j := new(gmaps.PlaceJob)
if err := dec.Decode(j); err != nil {
return nil, fmt.Errorf("failed to decode place job: %w", err)
}

return j, nil
default:
return nil, fmt.Errorf("invalid payload type: %s", payloadType)
}
}
59 changes: 54 additions & 5 deletions postgres/resultwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/gosom/scrapemate"

Expand All @@ -20,34 +23,80 @@ type resultWriter struct {
}

func (r *resultWriter) Run(ctx context.Context, in <-chan scrapemate.Result) error {
const maxBatchSize = 50

buff := make([]*gmaps.Entry, 0, 50)
lastSave := time.Now().UTC()

for result := range in {
entry, ok := result.Data.(*gmaps.Entry)

if !ok {
return errors.New("invalid data type")
}

if err := r.saveEntry(ctx, entry); err != nil {
buff = append(buff, entry)

if len(buff) >= maxBatchSize || time.Now().UTC().Sub(lastSave) >= time.Minute {
err := r.batchSave(ctx, buff)
if err != nil {
return err
}

buff = buff[:0]
}
}

if len(buff) > 0 {
err := r.batchSave(ctx, buff)
if err != nil {
return err
}
}

return nil
}

func (r *resultWriter) saveEntry(ctx context.Context, entry *gmaps.Entry) error {
func (r *resultWriter) batchSave(ctx context.Context, entries []*gmaps.Entry) error {
if len(entries) == 0 {
return nil
}

q := `INSERT INTO results
(data)
VALUES
($1) ON CONFLICT DO NOTHING
`
elements := make([]string, 0, len(entries))
args := make([]interface{}, 0, len(entries))

for i, entry := range entries {
data, err := json.Marshal(entry)
if err != nil {
return err
}

elements = append(elements, fmt.Sprintf("($%d)", i+1))
args = append(args, data)
}

q += strings.Join(elements, ", ")
q += " ON CONFLICT DO NOTHING"

tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return err
}

defer func() {
_ = tx.Rollback()
}()

data, err := json.Marshal(entry)
_, err = tx.ExecContext(ctx, q, args...)
if err != nil {
return err
}

_, err = r.db.ExecContext(ctx, q, data)
err = tx.Commit()

return err
}
5 changes: 5 additions & 0 deletions runner/databaserunner/databaserunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func openPsqlConn(dsn string) (conn *sql.DB, err error) {
}

err = conn.Ping()
if err != nil {
return
}

conn.SetMaxOpenConns(10)

return
}

0 comments on commit 16bed9e

Please sign in to comment.