From 46cf191225ce10c8f09dc5a4ceab28b542785e58 Mon Sep 17 00:00:00 2001 From: brouillette Date: Tue, 5 Oct 2021 12:16:52 -0400 Subject: [PATCH 1/3] moving genes ingestion to api endpoint --- .gitignore | 3 + etc/example.env | 3 + src/api/main.go | 3 + src/api/models/config.go | 1 + src/api/mvc/genes.go | 244 ++++++++++++++++++++++++++++++++++++ src/gota-poc/main.go | 260 --------------------------------------- 6 files changed, 254 insertions(+), 260 deletions(-) delete mode 100644 src/gota-poc/main.go diff --git a/.gitignore b/.gitignore index 3a45bee7..1115f4e7 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,7 @@ bin/* *.vcf.gz */*/*.csv + +# ignore .gtf files +*/*.gtf* */*/*.gtf* diff --git a/etc/example.env b/etc/example.env index a8dcbb1c..d10aa3d5 100644 --- a/etc/example.env +++ b/etc/example.env @@ -37,6 +37,9 @@ GOHAN_API_INTERNAL_PORT=5000 GOHAN_API_VCF_PATH=/path/to/vcfs/on/host/machine GOHAN_API_CONTAINERIZED_VCF_PATH=/app/vcfs +GOHAN_API_GTF_PATH=/path/to/gtfs/on/host/machine +GOHAN_API_CONTAINERIZED_GTF_PATH=/app/gtfs + GOHAN_API_ES_PI=variants GOHAN_API_ES_PROTO=http GOHAN_API_ES_HOST=elasticsearch diff --git a/src/api/main.go b/src/api/main.go index 9ab54a98..d491eefa 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -33,6 +33,7 @@ func main() { "\tDebug : %t \n\n"+ "\tVCF Directory Path : %s \n"+ + "\tGTF Directory Path : %s \n"+ "\tElasticsearch Url : %s \n"+ "\tElasticsearch Username : %s\n\n"+ @@ -48,6 +49,7 @@ func main() { cfg.Debug, cfg.Api.VcfPath, + cfg.Api.GtfPath, cfg.Elasticsearch.Url, cfg.Elasticsearch.Username, cfg.Drs.Url, cfg.Drs.Username, cfg.AuthX.IsAuthorizationEnabled, @@ -157,6 +159,7 @@ func main() { // -- Genes e.GET("/genes/overview", mvc.GetGenesOverview) + e.GET("/genes/ingest", mvc.GenesIngest) e.GET("/genes/search", mvc.GenesGetByNomenclatureWildcard, // middleware gam.ValidateOptionalChromosomeAttribute) diff --git a/src/api/models/config.go b/src/api/models/config.go index d701b6a8..474cafee 100644 --- a/src/api/models/config.go +++ b/src/api/models/config.go @@ -7,6 +7,7 @@ type Config struct { Url string `yaml:"url" envconfig:"GOHAN_PUBLIC_URL"` Port string `yaml:"port" envconfig:"GOHAN_API_INTERNAL_PORT"` VcfPath string `yaml:"vcfPath" envconfig:"GOHAN_API_VCF_PATH"` + GtfPath string `yaml:"gtfPath" envconfig:"GOHAN_API_GTF_PATH"` } `yaml:"api"` Elasticsearch struct { diff --git a/src/api/mvc/genes.go b/src/api/mvc/genes.go index 2f8036eb..60ba965f 100644 --- a/src/api/mvc/genes.go +++ b/src/api/mvc/genes.go @@ -3,17 +3,261 @@ package mvc import ( "api/contexts" "api/models" + "api/models/constants" assemblyId "api/models/constants/assembly-id" + "api/models/constants/chromosome" + "api/models/ingest/structs" esRepo "api/repositories/elasticsearch" + "bufio" + "compress/gzip" + "crypto/tls" "fmt" + "io" + "log" "net/http" + "net/url" + "os" "strconv" + "strings" "sync" "github.com/labstack/echo" "github.com/mitchellh/mapstructure" ) +func GenesIngest(c echo.Context) error { + // trigger global ingestion background process + go func() { + + cfg := c.(*contexts.GohanContext).Config + gtfPath := cfg.Api.GtfPath + + iz := c.(*contexts.GohanContext).IngestionService + + // TEMP: SECURITY RISK + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + // + assemblyIdMap := map[constants.AssemblyId]string{ + assemblyId.GRCh38: "gencode.v38.annotation.gtf", + assemblyId.GRCh37: "gencode.v19.annotation.gtf", + // SKIP + // assemblyId.NCBI36: "hg18", + // assemblyId.NCBI35: "hg17", + // assemblyId.NCBI34: "hg16", + } + assemblyIdGTFUrlMap := map[constants.AssemblyId]string{ + assemblyId.GRCh38: "http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_38/gencode.v38.annotation.gtf.gz", + assemblyId.GRCh37: "http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_19/gencode.v19.annotation.gtf.gz", + // SKIP + // assemblyId.NCBI36: "", + // assemblyId.NCBI35: "", + // assemblyId.NCBI34: "", + } + + var assemblyWg sync.WaitGroup + + for assId, fileName := range assemblyIdMap { + assemblyWg.Add(1) + go func(_assId constants.AssemblyId, _fileName string, _assemblyWg *sync.WaitGroup) { + defer _assemblyWg.Done() + + var geneWg sync.WaitGroup + gtfFile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName)) + if err != nil { + // log.Fatalf("failed to open file: %s", err) + // Download the file + fullURLFile := assemblyIdGTFUrlMap[_assId] + + // Build fileName from fullPath + fileURL, err := url.Parse(fullURLFile) + if err != nil { + log.Fatal(err) + } + path := fileURL.Path + segments := strings.Split(path, "/") + _fileName = segments[len(segments)-1] + + // Create blank file + file, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, _fileName)) + if err != nil { + log.Fatal(err) + } + client := http.Client{ + CheckRedirect: func(r *http.Request, via []*http.Request) error { + r.URL.Opaque = r.URL.Path + return nil + }, + } + fmt.Printf("Downloading file %s ...\n", _fileName) + + // Put content on file + resp, err := client.Get(fullURLFile) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + size, err := io.Copy(file, resp.Body) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + fmt.Printf("Downloaded a file %s with size %d\n", _fileName, size) + + fmt.Printf("Unzipping %s...\n", _fileName) + gzipfile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + reader, err := gzip.NewReader(gzipfile) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + defer reader.Close() + + newfilename := strings.TrimSuffix(_fileName, ".gz") + + writer, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, newfilename)) + + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + defer writer.Close() + + if _, err = io.Copy(writer, reader); err != nil { + fmt.Println(err) + os.Exit(1) + } + + fmt.Printf("Opening %s\n", newfilename) + gtfFile, _ = os.Open(fmt.Sprintf("%s/%s", gtfPath, newfilename)) + + fmt.Printf("Deleting %s\n", _fileName) + err = os.Remove(fmt.Sprintf("%s/%s", gtfPath, _fileName)) + if err != nil { + fmt.Println(err) + } + } + + defer gtfFile.Close() + + fileScanner := bufio.NewScanner(gtfFile) + fileScanner.Split(bufio.ScanLines) + + fmt.Printf("Ingesting %s\n", string(_assId)) + + var ( + chromHeaderKey = 0 + startKey = 3 + endKey = 4 + nameHeaderKeys = []int{3} + geneNameHeaderKeys []int + ) + + var columnsToPrint []string + if _assId == assemblyId.GRCh38 { + // GRCh38 dataset has multiple name fields (name, name2) and + // also includes gene name fields (geneName, geneName2) + columnsToPrint = append(columnsToPrint, "#chrom", "chromStart", "chromEnd", "name", "name2", "geneName", "geneName2") + nameHeaderKeys = append(nameHeaderKeys, 4) + geneNameHeaderKeys = append(geneNameHeaderKeys, 5, 6) + } else { + columnsToPrint = append(columnsToPrint, "chrom", "txStart", "txEnd", "#name") + } + + for fileScanner.Scan() { + rowText := fileScanner.Text() + if rowText[:2] == "##" { + // Skip header rows + continue + } + + geneWg.Add(1) + go func(rowText string, _chromHeaderKey int, + _startKey int, _endKey int, + _nameHeaderKeys []int, _geneNameHeaderKeys []int, + _assId constants.AssemblyId, + _gwg *sync.WaitGroup) { + // fmt.Printf("row : %s\n", row) + + var ( + start int + end int + geneName string + ) + + rowSplits := strings.Split(rowText, "\t") + + // skip this row if it's not a gene row + // i.e, if it's an exon or transcript + if rowSplits[2] != "gene" { + defer _gwg.Done() + return + } + + //clean chromosome + chromosomeClean := strings.ReplaceAll(rowSplits[_chromHeaderKey], "chr", "") + + if !chromosome.IsValidHumanChromosome(chromosomeClean) { + defer _gwg.Done() + return + } + + // clean start/end + chromStartClean := strings.ReplaceAll(strings.ReplaceAll(rowSplits[_startKey], ",", ""), " ", "") + start, _ = strconv.Atoi(chromStartClean) + + chromEndClean := strings.ReplaceAll(strings.ReplaceAll(rowSplits[_endKey], ",", ""), " ", "") + end, _ = strconv.Atoi(chromEndClean) + + dataClumpSplits := strings.Split(rowSplits[len(rowSplits)-1], ";") + for _, v := range dataClumpSplits { + if strings.Contains(v, "gene_name") { + cleanedItemSplits := strings.Split(strings.TrimSpace(strings.ReplaceAll(v, "\"", "")), " ") + if len(cleanedItemSplits) > 0 { + geneName = cleanedItemSplits[len(cleanedItemSplits)-1] + } + break + } + } + if len(geneName) == 0 { + fmt.Printf("No gene found in row %s\n", rowText) + return + } + + discoveredGene := &models.Gene{ + Name: geneName, + Chrom: chromosomeClean, + Start: start, + End: end, + AssemblyId: _assId, + } + + iz.GeneIngestionBulkIndexingQueue <- &structs.GeneIngestionQueueStructure{ + Gene: discoveredGene, + WaitGroup: _gwg, + } + }(rowText, chromHeaderKey, startKey, endKey, nameHeaderKeys, geneNameHeaderKeys, _assId, &geneWg) + } + + geneWg.Wait() + + fmt.Printf("%s ingestion done!\n", _assId) + }(assId, fileName, &assemblyWg) + } + + assemblyWg.Wait() + }() + + return c.JSON(http.StatusOK, "{\"message\":\"please check in with /genes/overview !\"}") +} + func GenesGetByNomenclatureWildcard(c echo.Context) error { cfg := c.(*contexts.GohanContext).Config es := c.(*contexts.GohanContext).Es7Client diff --git a/src/gota-poc/main.go b/src/gota-poc/main.go deleted file mode 100644 index ae812f4b..00000000 --- a/src/gota-poc/main.go +++ /dev/null @@ -1,260 +0,0 @@ -package main - -import ( - "api/models" - "api/models/constants" - assemblyId "api/models/constants/assembly-id" - "api/models/constants/chromosome" - "api/models/ingest/structs" - "api/services" - "api/utils" - "bufio" - "compress/gzip" - "crypto/tls" - "fmt" - "io" - "log" - "net/http" - "net/url" - "os" - "strconv" - "strings" - "sync" - - "github.com/kelseyhightower/envconfig" -) - -func main() { - - // Gather environment variables - var cfg models.Config - err := envconfig.Process("", &cfg) - if err != nil { - fmt.Println(err) - os.Exit(2) - } - - // TEMP: SECURITY RISK - http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - // - - // Service Connections: - // -- Elasticsearch - es := utils.CreateEsConnection(&cfg) - iz := services.NewIngestionService(es) - iz.Init() - - assemblyIdMap := map[constants.AssemblyId]string{ - assemblyId.GRCh38: "gencode.v38.annotation.gtf", - assemblyId.GRCh37: "gencode.v19.annotation.gtf", - // SKIP - // assemblyId.NCBI36: "hg18", - // assemblyId.NCBI35: "hg17", - // assemblyId.NCBI34: "hg16", - } - assemblyIdGTFUrlMap := map[constants.AssemblyId]string{ - assemblyId.GRCh38: "http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_38/gencode.v38.annotation.gtf.gz", - assemblyId.GRCh37: "http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_19/gencode.v19.annotation.gtf.gz", - // SKIP - // assemblyId.NCBI36: "", - // assemblyId.NCBI35: "", - // assemblyId.NCBI34: "", - } - - var geneWg sync.WaitGroup - - for assId, fileName := range assemblyIdMap { - // Read one file at a time - - gtfFile, err := os.Open(fileName) - if err != nil { - // log.Fatalf("failed to open file: %s", err) - // Download the file - fullURLFile := assemblyIdGTFUrlMap[assId] - - // Build fileName from fullPath - fileURL, err := url.Parse(fullURLFile) - if err != nil { - log.Fatal(err) - } - path := fileURL.Path - segments := strings.Split(path, "/") - fileName = segments[len(segments)-1] - - // Create blank file - file, err := os.Create(fileName) - if err != nil { - log.Fatal(err) - } - client := http.Client{ - CheckRedirect: func(r *http.Request, via []*http.Request) error { - r.URL.Opaque = r.URL.Path - return nil - }, - } - fmt.Printf("Downloading file %s ...\n", fileName) - - // Put content on file - resp, err := client.Get(fullURLFile) - if err != nil { - log.Fatal(err) - } - defer resp.Body.Close() - - size, err := io.Copy(file, resp.Body) - if err != nil { - log.Fatal(err) - } - defer file.Close() - - fmt.Printf("Downloaded a file %s with size %d\n", fileName, size) - - fmt.Printf("Unzipping %s...\n", fileName) - gzipfile, err := os.Open(fileName) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - reader, err := gzip.NewReader(gzipfile) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - defer reader.Close() - - newfilename := strings.TrimSuffix(fileName, ".gz") - - writer, err := os.Create(newfilename) - - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - defer writer.Close() - - if _, err = io.Copy(writer, reader); err != nil { - fmt.Println(err) - os.Exit(1) - } - - fmt.Printf("Opening %s\n", newfilename) - gtfFile, _ = os.Open(newfilename) - - fmt.Printf("Deleting %s\n", fileName) - err = os.Remove(fileName) - if err != nil { - fmt.Println(err) - } - } - - defer gtfFile.Close() - - fileScanner := bufio.NewScanner(gtfFile) - fileScanner.Split(bufio.ScanLines) - - fmt.Printf("Ingesting %s\n", string(assId)) - - var ( - chromHeaderKey = 0 - startKey = 3 - endKey = 4 - nameHeaderKeys = []int{3} - geneNameHeaderKeys []int - ) - - var columnsToPrint []string - if assId == assemblyId.GRCh38 { - // GRCh38 dataset has multiple name fields (name, name2) and - // also includes gene name fields (geneName, geneName2) - columnsToPrint = append(columnsToPrint, "#chrom", "chromStart", "chromEnd", "name", "name2", "geneName", "geneName2") - nameHeaderKeys = append(nameHeaderKeys, 4) - geneNameHeaderKeys = append(geneNameHeaderKeys, 5, 6) - } else { - columnsToPrint = append(columnsToPrint, "chrom", "txStart", "txEnd", "#name") - } - - for fileScanner.Scan() { - rowText := fileScanner.Text() - if rowText[:2] == "##" { - // Skip header rows - continue - } - - geneWg.Add(1) - go func(rowText string, _chromHeaderKey int, - _startKey int, _endKey int, - _nameHeaderKeys []int, _geneNameHeaderKeys []int, - _assId constants.AssemblyId, - _gwg *sync.WaitGroup) { - // fmt.Printf("row : %s\n", row) - - var ( - start int - end int - geneName string - ) - - rowSplits := strings.Split(rowText, "\t") - - // skip this row if it's not a gene row - // i.e, if it's an exon or transcript - if rowSplits[2] != "gene" { - defer _gwg.Done() - return - } - - //clean chromosome - chromosomeClean := strings.ReplaceAll(rowSplits[_chromHeaderKey], "chr", "") - - if !chromosome.IsValidHumanChromosome(chromosomeClean) { - defer _gwg.Done() - return - } - // http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_38/gencode.v38.annotation.gtf.gz - - // clean start/end - chromStartClean := strings.ReplaceAll(strings.ReplaceAll(rowSplits[_startKey], ",", ""), " ", "") - start, _ = strconv.Atoi(chromStartClean) - - chromEndClean := strings.ReplaceAll(strings.ReplaceAll(rowSplits[_endKey], ",", ""), " ", "") - end, _ = strconv.Atoi(chromEndClean) - - dataClumpSplits := strings.Split(rowSplits[len(rowSplits)-1], ";") - for _, v := range dataClumpSplits { - if strings.Contains(v, "gene_name") { - cleanedItemSplits := strings.Split(strings.TrimSpace(strings.ReplaceAll(v, "\"", "")), " ") - if len(cleanedItemSplits) > 0 { - geneName = cleanedItemSplits[len(cleanedItemSplits)-1] - } - break - } - } - if len(geneName) == 0 { - fmt.Printf("No gene found in row %s\n", rowText) - return - } - - discoveredGene := &models.Gene{ - Name: geneName, - Chrom: chromosomeClean, - Start: start, - End: end, - AssemblyId: _assId, - } - - //fmt.Printf("Keys :%d, %d, %d, %d, %d -- %s\n", _chromHeaderKey, _startKey, _endKey, _nameHeaderKeys, _geneNameHeaderKeys, discoveredGene) - - iz.GeneIngestionBulkIndexingQueue <- &structs.GeneIngestionQueueStructure{ - Gene: discoveredGene, - WaitGroup: _gwg, - } - }(rowText, chromHeaderKey, startKey, endKey, nameHeaderKeys, geneNameHeaderKeys, assId, &geneWg) - - // fmt.Printf("Stats : %d\n", iz.GeneIngestionBulkIndexer.Stats()) - } - geneWg.Wait() - - } -} From 8cc66e144956fd463637b2c51030935430074783 Mon Sep 17 00:00:00 2001 From: brouillette Date: Tue, 5 Oct 2021 12:18:43 -0400 Subject: [PATCH 2/3] oops, forgot the docker-compose update --- docker-compose.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index 102ba8b5..e0165a65 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -33,6 +33,7 @@ services: # API - GOHAN_DEBUG=${GOHAN_DEBUG} - GOHAN_API_VCF_PATH=${GOHAN_API_CONTAINERIZED_VCF_PATH} + - GOHAN_API_GTF_PATH=${GOHAN_API_CONTAINERIZED_GTF_PATH} # Elasticsearch - GOHAN_ES_URL=${GOHAN_PRIVATE_ES_URL} From 10ec7196f29af2caad61e5a60730b898b4aa207f Mon Sep 17 00:00:00 2001 From: brouillette Date: Tue, 5 Oct 2021 13:18:44 -0400 Subject: [PATCH 3/3] ingest genes api --- docker-compose.yaml | 1 + src/api/main.go | 3 +- src/api/models/ingest/ingest.go | 19 ++++-- src/api/mvc/genes.go | 105 ++++++++++++++++++++++++++------ src/api/mvc/variants.go | 6 +- src/api/services/ingestion.go | 26 ++++++-- 6 files changed, 128 insertions(+), 32 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index e0165a65..066bb44e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -52,6 +52,7 @@ services: - GOHAN_DRS_BASIC_AUTH_PASSWORD=${GOHAN_DRS_BASIC_AUTH_PASSWORD} volumes: - ${GOHAN_API_VCF_PATH}:${GOHAN_API_CONTAINERIZED_VCF_PATH} + - ${GOHAN_API_GTF_PATH}:${GOHAN_API_CONTAINERIZED_GTF_PATH} elasticsearch: image: ${GOHAN_ES_BASE_IMAGE}:${GOHAN_ES_BASE_VERSION} diff --git a/src/api/main.go b/src/api/main.go index d491eefa..1b067a39 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -159,10 +159,11 @@ func main() { // -- Genes e.GET("/genes/overview", mvc.GetGenesOverview) - e.GET("/genes/ingest", mvc.GenesIngest) e.GET("/genes/search", mvc.GenesGetByNomenclatureWildcard, // middleware gam.ValidateOptionalChromosomeAttribute) + e.GET("/genes/ingestion/requests", mvc.GetAllGeneIngestionRequests) + e.GET("/genes/ingestion/run", mvc.GenesIngest) // Run e.Logger.Fatal(e.Start(":" + cfg.Api.Port)) diff --git a/src/api/models/ingest/ingest.go b/src/api/models/ingest/ingest.go index 13c5e6cd..bc419a42 100644 --- a/src/api/models/ingest/ingest.go +++ b/src/api/models/ingest/ingest.go @@ -7,13 +7,14 @@ import ( type State string const ( - Queued State = "Queued" - Running = "Running" - Done = "Done" - Error = "Error" + Queued State = "Queued" + Downloading = "Downloading" + Running = "Running" + Done = "Done" + Error = "Error" ) -type IngestRequest struct { +type VariantIngestRequest struct { Id uuid.UUID `json:"id"` Filename string `json:"filename"` State State `json:"state"` @@ -22,6 +23,14 @@ type IngestRequest struct { UpdatedAt string `json:"updatedAt"` } +type GeneIngestRequest struct { + Filename string `json:"filename"` + State State `json:"state"` + Message string `json:"message"` + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` +} + type IngestResponseDTO struct { Id uuid.UUID `json:"id"` Filename string `json:"filename"` diff --git a/src/api/mvc/genes.go b/src/api/mvc/genes.go index 60ba965f..a6009d79 100644 --- a/src/api/mvc/genes.go +++ b/src/api/mvc/genes.go @@ -6,6 +6,7 @@ import ( "api/models/constants" assemblyId "api/models/constants/assembly-id" "api/models/constants/chromosome" + "api/models/ingest" "api/models/ingest/structs" esRepo "api/repositories/elasticsearch" "bufio" @@ -20,6 +21,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/labstack/echo" "github.com/mitchellh/mapstructure" @@ -58,10 +60,20 @@ func GenesIngest(c echo.Context) error { for assId, fileName := range assemblyIdMap { assemblyWg.Add(1) - go func(_assId constants.AssemblyId, _fileName string, _assemblyWg *sync.WaitGroup) { + + newRequestState := ingest.GeneIngestRequest{ + Filename: fileName, + State: ingest.Queued, + CreatedAt: fmt.Sprintf("%s", time.Now()), + } + + go func(_assId constants.AssemblyId, _fileName string, _assemblyWg *sync.WaitGroup, reqStat *ingest.GeneIngestRequest) { defer _assemblyWg.Done() - var geneWg sync.WaitGroup + var ( + unzippedFileName string + geneWg sync.WaitGroup + ) gtfFile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName)) if err != nil { // log.Fatalf("failed to open file: %s", err) @@ -80,7 +92,12 @@ func GenesIngest(c echo.Context) error { // Create blank file file, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, _fileName)) if err != nil { - log.Fatal(err) + msg := "Something went wrong: " + err.Error() + fmt.Println(msg) + + reqStat.State = ingest.Error + reqStat.Message = msg + iz.GeneIngestRequestChan <- reqStat } client := http.Client{ CheckRedirect: func(r *http.Request, via []*http.Request) error { @@ -88,69 +105,99 @@ func GenesIngest(c echo.Context) error { return nil }, } + fmt.Printf("Downloading file %s ...\n", _fileName) + reqStat.State = ingest.Downloading + iz.GeneIngestRequestChan <- reqStat // Put content on file resp, err := client.Get(fullURLFile) if err != nil { - log.Fatal(err) + msg := "Something went wrong: " + err.Error() + fmt.Println(msg) + + reqStat.State = ingest.Error + reqStat.Message = msg + iz.GeneIngestRequestChan <- reqStat } defer resp.Body.Close() size, err := io.Copy(file, resp.Body) if err != nil { - log.Fatal(err) + msg := "Something went wrong: " + err.Error() + fmt.Println(msg) + + reqStat.State = ingest.Error + reqStat.Message = msg + iz.GeneIngestRequestChan <- reqStat } defer file.Close() fmt.Printf("Downloaded a file %s with size %d\n", _fileName, size) fmt.Printf("Unzipping %s...\n", _fileName) - gzipfile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName)) + unzippedFile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName)) if err != nil { fmt.Println(err) os.Exit(1) } - reader, err := gzip.NewReader(gzipfile) + reader, err := gzip.NewReader(unzippedFile) if err != nil { - fmt.Println(err) - os.Exit(1) + msg := "Something went wrong: " + err.Error() + fmt.Println(msg) + + reqStat.State = ingest.Error + reqStat.Message = msg + iz.GeneIngestRequestChan <- reqStat } defer reader.Close() - newfilename := strings.TrimSuffix(_fileName, ".gz") + unzippedFileName = strings.TrimSuffix(_fileName, ".gz") - writer, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, newfilename)) + writer, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, unzippedFileName)) if err != nil { - fmt.Println(err) - os.Exit(1) + msg := "Something went wrong: " + err.Error() + fmt.Println(msg) + + reqStat.State = ingest.Error + reqStat.Message = msg + iz.GeneIngestRequestChan <- reqStat } defer writer.Close() if _, err = io.Copy(writer, reader); err != nil { - fmt.Println(err) - os.Exit(1) + msg := "Something went wrong: " + err.Error() + fmt.Println(msg) + + reqStat.State = ingest.Error + reqStat.Message = msg + iz.GeneIngestRequestChan <- reqStat } - fmt.Printf("Opening %s\n", newfilename) - gtfFile, _ = os.Open(fmt.Sprintf("%s/%s", gtfPath, newfilename)) + fmt.Printf("Opening %s\n", unzippedFileName) + gtfFile, _ = os.Open(fmt.Sprintf("%s/%s", gtfPath, unzippedFileName)) fmt.Printf("Deleting %s\n", _fileName) err = os.Remove(fmt.Sprintf("%s/%s", gtfPath, _fileName)) if err != nil { fmt.Println(err) } + } else { + // for the rare occurences where the file wasn't deleted + // after ingestion (i.e. some kind of interruption), this ensures it does + unzippedFileName = _fileName } defer gtfFile.Close() - fileScanner := bufio.NewScanner(gtfFile) fileScanner.Split(bufio.ScanLines) fmt.Printf("Ingesting %s\n", string(_assId)) + reqStat.State = ingest.Running + iz.GeneIngestRequestChan <- reqStat var ( chromHeaderKey = 0 @@ -249,7 +296,16 @@ func GenesIngest(c echo.Context) error { geneWg.Wait() fmt.Printf("%s ingestion done!\n", _assId) - }(assId, fileName, &assemblyWg) + fmt.Printf("Deleting %s\n", unzippedFileName) + err = os.Remove(fmt.Sprintf("%s/%s", gtfPath, unzippedFileName)) + if err != nil { + fmt.Println(err) + } + + reqStat.State = ingest.Done + iz.GeneIngestRequestChan <- reqStat + + }(assId, fileName, &assemblyWg, &newRequestState) } assemblyWg.Wait() @@ -258,6 +314,17 @@ func GenesIngest(c echo.Context) error { return c.JSON(http.StatusOK, "{\"message\":\"please check in with /genes/overview !\"}") } +func GetAllGeneIngestionRequests(c echo.Context) error { + izMap := c.(*contexts.GohanContext).IngestionService.GeneIngestRequestMap + + // transform map of it-to-ingestRequests to an array + m := make([]*ingest.GeneIngestRequest, 0, len(izMap)) + for _, val := range izMap { + m = append(m, val) + } + return c.JSON(http.StatusOK, m) +} + func GenesGetByNomenclatureWildcard(c echo.Context) error { cfg := c.(*contexts.GohanContext).Config es := c.(*contexts.GohanContext).Es7Client diff --git a/src/api/mvc/variants.go b/src/api/mvc/variants.go index 8eb312f6..3d17e08d 100644 --- a/src/api/mvc/variants.go +++ b/src/api/mvc/variants.go @@ -157,7 +157,7 @@ func VariantsIngest(c echo.Context) error { } // if not, execute - newRequestState := &ingest.IngestRequest{ + newRequestState := &ingest.VariantIngestRequest{ Id: uuid.New(), Filename: fileName, State: ingest.Queued, @@ -171,7 +171,7 @@ func VariantsIngest(c echo.Context) error { Message: "Successfully queued..", }) - go func(file string, reqStat *ingest.IngestRequest) { + go func(file string, reqStat *ingest.VariantIngestRequest) { reqStat.State = ingest.Running ingestionService.IngestRequestChan <- reqStat @@ -304,7 +304,7 @@ func GetAllVariantIngestionRequests(c echo.Context) error { izMap := c.(*contexts.GohanContext).IngestionService.IngestRequestMap // transform map of it-to-ingestRequests to an array - m := make([]*ingest.IngestRequest, 0, len(izMap)) + m := make([]*ingest.VariantIngestRequest, 0, len(izMap)) for _, val := range izMap { m = append(m, val) } diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 0ed969e3..252786a5 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -36,8 +36,10 @@ import ( type ( IngestionService struct { Initialized bool - IngestRequestChan chan *ingest.IngestRequest - IngestRequestMap map[string]*ingest.IngestRequest + IngestRequestChan chan *ingest.VariantIngestRequest + IngestRequestMap map[string]*ingest.VariantIngestRequest + GeneIngestRequestChan chan *ingest.GeneIngestRequest + GeneIngestRequestMap map[string]*ingest.GeneIngestRequest IngestionBulkIndexingCapacity int ElasticsearchClient *elasticsearch.Client IngestionBulkIndexingQueue chan *structs.IngestionQueueStructure @@ -53,8 +55,10 @@ func NewIngestionService(es *elasticsearch.Client) *IngestionService { iz := &IngestionService{ Initialized: false, - IngestRequestChan: make(chan *ingest.IngestRequest), - IngestRequestMap: map[string]*ingest.IngestRequest{}, + IngestRequestChan: make(chan *ingest.VariantIngestRequest), + IngestRequestMap: map[string]*ingest.VariantIngestRequest{}, + GeneIngestRequestChan: make(chan *ingest.GeneIngestRequest), + GeneIngestRequestMap: map[string]*ingest.GeneIngestRequest{}, IngestionBulkIndexingCapacity: defaultBulkIndexingCap, IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, defaultBulkIndexingCap), GeneIngestionBulkIndexingQueue: make(chan *structs.GeneIngestionQueueStructure, 10), @@ -107,6 +111,20 @@ func (i *IngestionService) Init() { } }() + go func() { + for { + select { + case newRequest := <-i.GeneIngestRequestChan: + if newRequest.State == ingest.Queued { + fmt.Printf("Received new request for %s", newRequest.Filename) + } + + newRequest.UpdatedAt = fmt.Sprintf("%s", time.Now()) + i.GeneIngestRequestMap[newRequest.Filename] = newRequest + } + } + }() + // spin up a listener for each bulk indexing go func() { for {