Skip to content

Commit

Permalink
improve vulnerability processing (#150)
Browse files Browse the repository at this point in the history
* Optimize UpdateIssueLabels with incremental updates
* Increase visibility timeout
* Improve logging for error processing source
* Remove ioutil
  • Loading branch information
jesusfcr authored Apr 17, 2024
1 parent 371fc3f commit 2572aec
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 16 deletions.
8 changes: 4 additions & 4 deletions cmd/vulnerability-db-consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Copyright 2020 Adevinta
package main

import (
"io/ioutil"
"io"
"os"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -39,8 +39,8 @@ type dbConfig struct {

type sqsConfig struct {
NProcessors uint8 `toml:"number_of_processors"`
WaitTime uint8 `toml:"wait_time"`
Timeout uint8
WaitTime int `toml:"wait_time"`
Timeout int
QueueARN string `toml:"queue_arn"`
Endpoint string `toml:"endpoint"`
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func parseConfig(cfgFilePath string) (*config, error) {
}
defer cfgFile.Close()

cfgData, err := ioutil.ReadAll(cfgFile)
cfgData, err := io.ReadAll(cfgFile)

var conf config
if _, err := toml.Decode(string(cfgData[:]), &conf); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ name = "$PG_NAME"
[sqs]
number_of_processors = $SQS_NUMBER_OF_PROCESSORS
wait_time = 20
timeout = 30
timeout = 600
queue_arn = "$SQS_QUEUE_ARN"
endpoint = "$AWS_SQS_ENDPOINT"

Expand Down
7 changes: 4 additions & 3 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ func (p *CheckProcessor) ProcessMessage(m string) error {
l.Debugf("Processing source with %d source findings: %#v", len(sourceFindings), source)
source, err = p.store.ProcessSourceExecution(source, sourceFindings)
if err != nil {
l.Errorf("Error while processing source: %#v", err)
if !store.IsDuplicateErr(err) {
return err
if store.IsDuplicateErr(err) {
l.Warnf("duplicated source: %#v", err)
return nil
}
return err
}

l.Debug("Sending open findings")
Expand Down
14 changes: 6 additions & 8 deletions pkg/store/issues.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,18 @@ func (db *psqlxStore) UpdateIssueLabels(issueID string, labels []string) error {
return err
}

_, err = tx.Exec("DELETE FROM issue_labels WHERE issue_id = $1", issueID)
l := pq.Array(labels)
_, err = tx.Exec("DELETE FROM issue_labels WHERE issue_id = $1 and label <> ANY($2::TEXT[])", issueID, l)
if err != nil {
tx.Rollback()
return err
}

for _, label := range labels {
_, err := tx.Exec("INSERT INTO issue_labels (issue_id, label) VALUES ($1, $2)", issueID, label)
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("INSERT INTO issue_labels SELECT $1, unnest($2::TEXT[]) ON CONFLICT DO NOTHING", issueID, l)
if err != nil {
tx.Rollback()
return err
}

return tx.Commit()
}

Expand Down

0 comments on commit 2572aec

Please sign in to comment.