diff --git a/src/api/mvc/variants.go b/src/api/mvc/variants.go index 509970ed..fc421755 100644 --- a/src/api/mvc/variants.go +++ b/src/api/mvc/variants.go @@ -113,7 +113,7 @@ func VariantsIngest(c echo.Context) error { } } - fmt.Printf("Found .vcf.gz files: %s\n", vcfGzfiles) + //fmt.Printf("Found .vcf.gz files: %s\n", vcfGzfiles) // Locate fileName from request inside found files for _, fileName := range fileNames { diff --git a/src/api/repositories/elasticsearch/elasticsearch.go b/src/api/repositories/elasticsearch/elasticsearch.go index 315ee945..c21acec9 100644 --- a/src/api/repositories/elasticsearch/elasticsearch.go +++ b/src/api/repositories/elasticsearch/elasticsearch.go @@ -174,7 +174,7 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(es *elasticsearch.Cli // Temp resultString := res.String() - fmt.Println(resultString) + // fmt.Println(resultString) // -- // Declared an empty interface @@ -316,7 +316,7 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(es *elasticsearch.C // Temp resultString := res.String() - fmt.Println(resultString) + // fmt.Println(resultString) // -- // Declared an empty interface @@ -373,7 +373,7 @@ func GetBucketsByKeyword(es *elasticsearch.Client, keyword string) map[string]in // Temp resultString := res.String() - fmt.Println(resultString) + //fmt.Println(resultString) // -- // Declared an empty interface diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index e5585573..aa67bafe 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -19,11 +19,9 @@ import ( "os" "path/filepath" "regexp" - "runtime" "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/Jeffail/gabs" @@ -38,12 +36,18 @@ type ( IngestRequestChan chan *ingest.IngestRequest IngestRequestMap map[string]*ingest.IngestRequest IngestionBulkIndexingCapacity int - IngestionBulkIndexingQueue chan *models.Variant + IngestionBulkIndexingQueue chan *IngestionQueueStructure ElasticsearchClient *elasticsearch.Client + IngestionBulkIndexer esutil.BulkIndexer + } + + IngestionQueueStructure struct { + Variant *models.Variant + WaitGroup *sync.WaitGroup } ) -const defaultBulkIndexingCap int = 50000 +const defaultBulkIndexingCap int = 10000 func NewIngestionService(es *elasticsearch.Client) *IngestionService { @@ -52,9 +56,25 @@ func NewIngestionService(es *elasticsearch.Client) *IngestionService { IngestRequestChan: make(chan *ingest.IngestRequest), IngestRequestMap: map[string]*ingest.IngestRequest{}, IngestionBulkIndexingCapacity: defaultBulkIndexingCap, - IngestionBulkIndexingQueue: make(chan *models.Variant, defaultBulkIndexingCap), + IngestionBulkIndexingQueue: make(chan *IngestionQueueStructure, defaultBulkIndexingCap), ElasticsearchClient: es, } + + // see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster + var numWorkers = defaultBulkIndexingCap / 100 + // the lower the denominator (the number of documents per bulk upload). the higher + // the chances of 100% successful upload, though the longer it may take (negligible) + + bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: "variants", + Client: iz.ElasticsearchClient, + NumWorkers: numWorkers, + // FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?) + FlushInterval: 30 * time.Second, // The periodic flush interval + }) + + iz.IngestionBulkIndexer = bi + iz.Init() return iz @@ -80,7 +100,53 @@ func (i *IngestionService) Init() { // spin up a listener for bulk indexing go func() { + for { + select { + case queuedItem := <-i.IngestionBulkIndexingQueue: + v := queuedItem.Variant + wg := queuedItem.WaitGroup + + // Prepare the data payload: encode article to JSON + data, err := json.Marshal(v) + if err != nil { + log.Fatalf("Cannot encode variant %s: %s\n", v.Id, err) + } + + // Add an item to the BulkIndexer + err = i.IngestionBulkIndexer.Add( + context.Background(), + esutil.BulkIndexerItem{ + // Action field configures the operation to perform (index, create, delete, update) + Action: "index", + + // Body is an `io.Reader` with the payload + Body: bytes.NewReader(data), + + // OnSuccess is called for each successful operation + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + defer wg.Done() + //atomic.AddUint64(&countSuccessful, 1) + }, + + // OnFailure is called for each failed operation + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + defer wg.Done() + //atomic.AddUint64(&countFailed, 1) + if err != nil { + fmt.Printf("ERROR: %s", err) + } else { + fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) + if err != nil { + defer wg.Done() + fmt.Printf("Unexpected error: %s", err) + } + } + } }() i.Initialized = true @@ -198,8 +264,6 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string) { _fileWG.Add(1) go func(line string, drsFileId string, fileWg *sync.WaitGroup) { - defer fileWg.Done() - // ---- break up line rowComponents := strings.Split(line, "\t") @@ -301,21 +365,17 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string) { var resultingVariant models.Variant mapstructure.Decode(tmpVariant, &resultingVariant) - select { - case i.IngestionBulkIndexingQueue <- &resultingVariant: // Put variant in the channel unless it is full - default: - // ingest full capacity queue - i.PerformBulkIndexIngestion() + // pass variant (along with a waitgroup) to the channel + i.IngestionBulkIndexingQueue <- &IngestionQueueStructure{ + Variant: &resultingVariant, + WaitGroup: fileWg, } }(line, drsFileId, &_fileWG) } - // let all lines be queued up and processed as many as possible + // let all lines be queued up and processed _fileWG.Wait() - - // perform final bulk push - i.PerformBulkIndexIngestion() } func (i *IngestionService) FilenameAlreadyRunning(filename string) bool { @@ -326,82 +386,3 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool { } return false } - -func (i *IngestionService) PerformBulkIndexIngestion() { - - // perform bulk indexing - // --- push all data to the bulk indexer - fmt.Printf("Number of CPUs available: %d\n", runtime.NumCPU()) - - close(i.IngestionBulkIndexingQueue) - - var countSuccessful uint64 - var countFailed uint64 - - // see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster - var numWorkers = i.IngestionBulkIndexingCapacity / 100 - // the lower the denominator (the number of documents per bulk upload). the higher - // the chances of 100% successful upload, though the longer it may take (negligible) - - bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Index: "variants", - Client: i.ElasticsearchClient, - NumWorkers: numWorkers, - // FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?) - FlushInterval: 30 * time.Second, // The periodic flush interval - }) - - var wg sync.WaitGroup - - count := 0 - for v := range i.IngestionBulkIndexingQueue { - count = count + 1 - wg.Add(1) - - // Prepare the data payload: encode article to JSON - data, err := json.Marshal(v) - if err != nil { - log.Fatalf("Cannot encode variant %s: %s\n", v.Id, err) - } - - // Add an item to the BulkIndexer - err = bi.Add( - context.Background(), - esutil.BulkIndexerItem{ - // Action field configures the operation to perform (index, create, delete, update) - Action: "index", - - // Body is an `io.Reader` with the payload - Body: bytes.NewReader(data), - - // OnSuccess is called for each successful operation - OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - defer wg.Done() - atomic.AddUint64(&countSuccessful, 1) - }, - - // OnFailure is called for each failed operation - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - defer wg.Done() - atomic.AddUint64(&countFailed, 1) - if err != nil { - fmt.Printf("ERROR: %s", err) - } else { - fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) - } - }, - }, - ) - if err != nil { - defer wg.Done() - fmt.Printf("Unexpected error: %s", err) - } - } - - wg.Wait() - - fmt.Printf("Done processing %d variants, with %d stats!\n", count, bi.Stats()) - - // recreate queue (mimic "reopening") - i.IngestionBulkIndexingQueue = make(chan *models.Variant, defaultBulkIndexingCap) -}