Skip to content

Commit

Permalink
mega speed up
Browse files Browse the repository at this point in the history
- concurrent file ingestion with speedup
  • Loading branch information
Brennan Brouillette committed Aug 25, 2021
1 parent 033a31d commit 086de18
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 99 deletions.
2 changes: 1 addition & 1 deletion src/api/mvc/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func VariantsIngest(c echo.Context) error {
}
}

fmt.Printf("Found .vcf.gz files: %s\n", vcfGzfiles)
//fmt.Printf("Found .vcf.gz files: %s\n", vcfGzfiles)

// Locate fileName from request inside found files
for _, fileName := range fileNames {
Expand Down
6 changes: 3 additions & 3 deletions src/api/repositories/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(es *elasticsearch.Cli

// Temp
resultString := res.String()
fmt.Println(resultString)
// fmt.Println(resultString)
// --

// Declared an empty interface
Expand Down Expand Up @@ -316,7 +316,7 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(es *elasticsearch.C

// Temp
resultString := res.String()
fmt.Println(resultString)
// fmt.Println(resultString)
// --

// Declared an empty interface
Expand Down Expand Up @@ -373,7 +373,7 @@ func GetBucketsByKeyword(es *elasticsearch.Client, keyword string) map[string]in

// Temp
resultString := res.String()
fmt.Println(resultString)
//fmt.Println(resultString)
// --

// Declared an empty interface
Expand Down
171 changes: 76 additions & 95 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Jeffail/gabs"
Expand All @@ -38,12 +36,18 @@ type (
IngestRequestChan chan *ingest.IngestRequest
IngestRequestMap map[string]*ingest.IngestRequest
IngestionBulkIndexingCapacity int
IngestionBulkIndexingQueue chan *models.Variant
IngestionBulkIndexingQueue chan *IngestionQueueStructure
ElasticsearchClient *elasticsearch.Client
IngestionBulkIndexer esutil.BulkIndexer
}

IngestionQueueStructure struct {
Variant *models.Variant
WaitGroup *sync.WaitGroup
}
)

const defaultBulkIndexingCap int = 50000
const defaultBulkIndexingCap int = 10000

func NewIngestionService(es *elasticsearch.Client) *IngestionService {

Expand All @@ -52,9 +56,25 @@ func NewIngestionService(es *elasticsearch.Client) *IngestionService {
IngestRequestChan: make(chan *ingest.IngestRequest),
IngestRequestMap: map[string]*ingest.IngestRequest{},
IngestionBulkIndexingCapacity: defaultBulkIndexingCap,
IngestionBulkIndexingQueue: make(chan *models.Variant, defaultBulkIndexingCap),
IngestionBulkIndexingQueue: make(chan *IngestionQueueStructure, defaultBulkIndexingCap),
ElasticsearchClient: es,
}

// see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster
var numWorkers = defaultBulkIndexingCap / 100
// the lower the denominator (the number of documents per bulk upload). the higher
// the chances of 100% successful upload, though the longer it may take (negligible)

bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "variants",
Client: iz.ElasticsearchClient,
NumWorkers: numWorkers,
// FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?)
FlushInterval: 30 * time.Second, // The periodic flush interval
})

iz.IngestionBulkIndexer = bi

iz.Init()

return iz
Expand All @@ -80,7 +100,53 @@ func (i *IngestionService) Init() {

// spin up a listener for bulk indexing
go func() {
for {
select {
case queuedItem := <-i.IngestionBulkIndexingQueue:

v := queuedItem.Variant
wg := queuedItem.WaitGroup

// Prepare the data payload: encode article to JSON
data, err := json.Marshal(v)
if err != nil {
log.Fatalf("Cannot encode variant %s: %s\n", v.Id, err)
}

// Add an item to the BulkIndexer
err = i.IngestionBulkIndexer.Add(
context.Background(),
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",

// Body is an `io.Reader` with the payload
Body: bytes.NewReader(data),

// OnSuccess is called for each successful operation
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
defer wg.Done()
//atomic.AddUint64(&countSuccessful, 1)
},

// OnFailure is called for each failed operation
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
defer wg.Done()
//atomic.AddUint64(&countFailed, 1)
if err != nil {
fmt.Printf("ERROR: %s", err)
} else {
fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
defer wg.Done()
fmt.Printf("Unexpected error: %s", err)
}
}
}
}()

i.Initialized = true
Expand Down Expand Up @@ -198,8 +264,6 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string) {

_fileWG.Add(1)
go func(line string, drsFileId string, fileWg *sync.WaitGroup) {
defer fileWg.Done()

// ---- break up line
rowComponents := strings.Split(line, "\t")

Expand Down Expand Up @@ -301,21 +365,17 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string) {
var resultingVariant models.Variant
mapstructure.Decode(tmpVariant, &resultingVariant)

select {
case i.IngestionBulkIndexingQueue <- &resultingVariant: // Put variant in the channel unless it is full
default:
// ingest full capacity queue
i.PerformBulkIndexIngestion()
// pass variant (along with a waitgroup) to the channel
i.IngestionBulkIndexingQueue <- &IngestionQueueStructure{
Variant: &resultingVariant,
WaitGroup: fileWg,
}

}(line, drsFileId, &_fileWG)
}

// let all lines be queued up and processed as many as possible
// let all lines be queued up and processed
_fileWG.Wait()

// perform final bulk push
i.PerformBulkIndexIngestion()
}

func (i *IngestionService) FilenameAlreadyRunning(filename string) bool {
Expand All @@ -326,82 +386,3 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool {
}
return false
}

func (i *IngestionService) PerformBulkIndexIngestion() {

// perform bulk indexing
// --- push all data to the bulk indexer
fmt.Printf("Number of CPUs available: %d\n", runtime.NumCPU())

close(i.IngestionBulkIndexingQueue)

var countSuccessful uint64
var countFailed uint64

// see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster
var numWorkers = i.IngestionBulkIndexingCapacity / 100
// the lower the denominator (the number of documents per bulk upload). the higher
// the chances of 100% successful upload, though the longer it may take (negligible)

bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: "variants",
Client: i.ElasticsearchClient,
NumWorkers: numWorkers,
// FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?)
FlushInterval: 30 * time.Second, // The periodic flush interval
})

var wg sync.WaitGroup

count := 0
for v := range i.IngestionBulkIndexingQueue {
count = count + 1
wg.Add(1)

// Prepare the data payload: encode article to JSON
data, err := json.Marshal(v)
if err != nil {
log.Fatalf("Cannot encode variant %s: %s\n", v.Id, err)
}

// Add an item to the BulkIndexer
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",

// Body is an `io.Reader` with the payload
Body: bytes.NewReader(data),

// OnSuccess is called for each successful operation
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
defer wg.Done()
atomic.AddUint64(&countSuccessful, 1)
},

// OnFailure is called for each failed operation
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
defer wg.Done()
atomic.AddUint64(&countFailed, 1)
if err != nil {
fmt.Printf("ERROR: %s", err)
} else {
fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
defer wg.Done()
fmt.Printf("Unexpected error: %s", err)
}
}

wg.Wait()

fmt.Printf("Done processing %d variants, with %d stats!\n", count, bi.Stats())

// recreate queue (mimic "reopening")
i.IngestionBulkIndexingQueue = make(chan *models.Variant, defaultBulkIndexingCap)
}

0 comments on commit 086de18

Please sign in to comment.