From 5db7fbcc27ab703e90637c93cf1078a2835fe147 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Fri, 5 Apr 2024 13:19:50 +0200 Subject: [PATCH 01/20] Restructure workflows --- example/nextflow.config | 1 + main.nf | 92 +++++--------------- nextflow.config | 1 + {workflows => subworkflows}/benchmarking.nf | 0 {workflows => subworkflows}/clustering.nf | 0 {workflows => subworkflows}/counts.nf | 0 {workflows => subworkflows}/doublets.nf | 0 {workflows => subworkflows}/integration.nf | 0 {workflows => subworkflows}/preprocessing.nf | 0 workflows/build.nf | 77 ++++++++++++++++ workflows/extend.nf | 10 +++ sub-atlas.nf => workflows/sub.nf | 25 +++--- 12 files changed, 122 insertions(+), 84 deletions(-) rename {workflows => subworkflows}/benchmarking.nf (100%) rename {workflows => subworkflows}/clustering.nf (100%) rename {workflows => subworkflows}/counts.nf (100%) rename {workflows => subworkflows}/doublets.nf (100%) rename {workflows => subworkflows}/integration.nf (100%) rename {workflows => subworkflows}/preprocessing.nf (100%) create mode 100644 workflows/build.nf create mode 100644 workflows/extend.nf rename sub-atlas.nf => workflows/sub.nf (58%) diff --git a/example/nextflow.config b/example/nextflow.config index dafa4e4..d3900ce 100644 --- a/example/nextflow.config +++ b/example/nextflow.config @@ -1,5 +1,6 @@ params { samplesheet = "samplesheet.csv" + mode = "build" benchmark_hvgs = 100 scshc = false diff --git a/main.nf b/main.nf index 21be7c3..54b872a 100644 --- a/main.nf +++ b/main.nf @@ -5,84 +5,34 @@ import static com.xlson.groovycsv.CsvParser.parseCsv nextflow.enable.dsl = 2 -// Modules -include { CELLTYPIST } from "./modules/celltypist.nf" -include { CELL_CYCLE } from "./modules/cell_cycle.nf" -include { CELL_QC } from "./modules/cell_qc.nf" -include { MERGE } from "./modules/merge.nf" -include { RENAME_INTEGRATIONS } from "./modules/rename_integrations.nf" - // Workflows -include { PREPROCESSING } from "./workflows/preprocessing.nf" -include { COUNTS } from "./workflows/counts.nf" -include { INTEGRATION } from "./workflows/integration.nf" -include { DOUBLETS } from "./workflows/doublets.nf" -include { CLUSTERING } from "./workflows/clustering.nf" +include { BUILD } from "./workflows/build" +include { EXTEND } from "./workflows/extend" +include { SUB } from "./workflows/sub" -if (params.samplesheet) { ch_samplesheet = file(params.samplesheet) } else { exit 1, 'Samplesheet not specified!' } +// Modules +include { MERGE } from "./modules/merge.nf" +include { RENAME_INTEGRATIONS } from "./modules/rename_integrations.nf" workflow { - PREPROCESSING(ch_samplesheet) - - ch_adata_integration = PREPROCESSING.out.integration - ch_adata_intersection = PREPROCESSING.out.intersection - ch_adata_counts = PREPROCESSING.out.counts - ch_adata_transfer = PREPROCESSING.out.transfer - ch_hvgs = PREPROCESSING.out.hvgs - - COUNTS(ch_adata_counts, params.normalization_method) - - CELLTYPIST( - ch_adata_intersection, - params.celltypist_model ?: "" - ) - - CELL_CYCLE( - ch_adata_intersection, - "human" - ) + if (params.mode == "build") { + BUILD() + subworkflow = BUILD.out + } else if (params.mode == "extend") { + EXTEND() + subworkflow = EXTEND.out + } else if (params.mode == "sub") { + SUB() + subworkflow = SUB.out + } else { + error "Invalid mode: ${params.mode}. Must be one of 'build', 'extend', or 'sub'" + } - CELL_QC(ch_adata_intersection) - - INTEGRATION( - ch_adata_integration, - ch_hvgs, - Channel.from(params.integration_methods).mix(Channel.value("unintegrated")), - ch_adata_transfer, - Channel.value(params.benchmark_hvgs) - ) - - DOUBLETS( - ch_adata_intersection, - ch_adata_integration, - ch_hvgs, - INTEGRATION.out.model, - INTEGRATION.out.integrated, - COUNTS.out - ) - - CLUSTERING( - DOUBLETS.out.integrations, - Channel.from(params.leiden_resolutions), - CELLTYPIST.out, - Channel.value(params.entropy_initial_smoothness) - ) - - ch_obs = CLUSTERING.out.obs.mix( - CELL_CYCLE.out, - CELLTYPIST.out, - INTEGRATION.out.scanvi_labels, - CELL_QC.out - ) - - ch_obsm = CLUSTERING.out.obsm.mix( - DOUBLETS.out.obsm - ) MERGE ( - DOUBLETS.out.counts, - ch_obsm.map{ meta, obsm -> obsm}.collect(), - ch_obs.map{ meta, obs -> obs}.collect() + subworkflow.adata, + subworkflow.obsm.map{ meta, obsm -> obsm}.collect(), + subworkflow.obs.map{ meta, obs -> obs}.collect() ) RENAME_INTEGRATIONS(MERGE.out.adata) diff --git a/nextflow.config b/nextflow.config index 894f10c..337fad5 100644 --- a/nextflow.config +++ b/nextflow.config @@ -2,6 +2,7 @@ params { samplesheet = null outdir = "output" + mode = null celltypist_model = null diff --git a/workflows/benchmarking.nf b/subworkflows/benchmarking.nf similarity index 100% rename from workflows/benchmarking.nf rename to subworkflows/benchmarking.nf diff --git a/workflows/clustering.nf b/subworkflows/clustering.nf similarity index 100% rename from workflows/clustering.nf rename to subworkflows/clustering.nf diff --git a/workflows/counts.nf b/subworkflows/counts.nf similarity index 100% rename from workflows/counts.nf rename to subworkflows/counts.nf diff --git a/workflows/doublets.nf b/subworkflows/doublets.nf similarity index 100% rename from workflows/doublets.nf rename to subworkflows/doublets.nf diff --git a/workflows/integration.nf b/subworkflows/integration.nf similarity index 100% rename from workflows/integration.nf rename to subworkflows/integration.nf diff --git a/workflows/preprocessing.nf b/subworkflows/preprocessing.nf similarity index 100% rename from workflows/preprocessing.nf rename to subworkflows/preprocessing.nf diff --git a/workflows/build.nf b/workflows/build.nf new file mode 100644 index 0000000..366c296 --- /dev/null +++ b/workflows/build.nf @@ -0,0 +1,77 @@ +// Modules +include { CELLTYPIST } from "../modules/celltypist.nf" +include { CELL_CYCLE } from "../modules/cell_cycle.nf" +include { CELL_QC } from "../modules/cell_qc.nf" + +// Workflows +include { PREPROCESSING } from "../subworkflows/preprocessing.nf" +include { COUNTS } from "../subworkflows/counts.nf" +include { INTEGRATION } from "../subworkflows/integration.nf" +include { DOUBLETS } from "../subworkflows/doublets.nf" +include { CLUSTERING } from "../subworkflows/clustering.nf" + +if (params.samplesheet) { ch_samplesheet = file(params.samplesheet) } else { exit 1, 'Samplesheet not specified!' } + +workflow BUILD { + PREPROCESSING(ch_samplesheet) + + ch_adata_integration = PREPROCESSING.out.integration + ch_adata_intersection = PREPROCESSING.out.intersection + ch_adata_counts = PREPROCESSING.out.counts + ch_adata_transfer = PREPROCESSING.out.transfer + ch_hvgs = PREPROCESSING.out.hvgs + + COUNTS(ch_adata_counts, params.normalization_method) + + CELLTYPIST( + ch_adata_intersection, + params.celltypist_model ?: "" + ) + + CELL_CYCLE( + ch_adata_intersection, + "human" + ) + + CELL_QC(ch_adata_intersection) + + INTEGRATION( + ch_adata_integration, + ch_hvgs, + Channel.from(params.integration_methods).mix(Channel.value("unintegrated")), + ch_adata_transfer, + Channel.value(params.benchmark_hvgs) + ) + + DOUBLETS( + ch_adata_intersection, + ch_adata_integration, + ch_hvgs, + INTEGRATION.out.model, + INTEGRATION.out.integrated, + COUNTS.out + ) + + CLUSTERING( + DOUBLETS.out.integrations, + Channel.from(params.leiden_resolutions), + CELLTYPIST.out, + Channel.value(params.entropy_initial_smoothness) + ) + + ch_obs = CLUSTERING.out.obs.mix( + CELL_CYCLE.out, + CELLTYPIST.out, + INTEGRATION.out.scanvi_labels, + CELL_QC.out + ) + + ch_obsm = CLUSTERING.out.obsm.mix( + DOUBLETS.out.obsm + ) + + emit: + adata = DOUBLETS.out.counts + obsm = ch_obsm + obs = ch_obs +} diff --git a/workflows/extend.nf b/workflows/extend.nf new file mode 100644 index 0000000..c1491fd --- /dev/null +++ b/workflows/extend.nf @@ -0,0 +1,10 @@ +workflow EXTEND { + ch_adata = Channel.empty() + ch_obsm = Channel.empty() + ch_obs = Channel.empty() + + emit: + adata = ch_adata + obsm = ch_obsm + obs = ch_obs +} \ No newline at end of file diff --git a/sub-atlas.nf b/workflows/sub.nf similarity index 58% rename from sub-atlas.nf rename to workflows/sub.nf index 648aef6..7b08d37 100644 --- a/sub-atlas.nf +++ b/workflows/sub.nf @@ -1,13 +1,13 @@ -include { CLEAN_ADATA } from './modules/clean_adata.nf' -include { SPLIT_CATEGORIES } from './modules/split_categories.nf' -include { CLUSTERING } from './workflows/clustering.nf' -include { MERGE } from './modules/merge.nf' +include { CLEAN_ADATA } from '../modules/clean_adata.nf' +include { SPLIT_CATEGORIES } from '../modules/split_categories.nf' +include { CLUSTERING } from '../subworkflows/clustering.nf' +include { MERGE } from '../modules/merge.nf' -ch_adata_input = Channel.fromPath(params.input) - .map{ adata -> [[id: 'input'], adata]} -ch_categories = params.category_annotation ? Channel.fromPath(params.category_annotation) : [] +workflow SUB { + ch_adata_input = Channel.fromPath(params.input) + .map{ adata -> [[id: 'input'], adata]} + ch_categories = params.category_annotation ? Channel.fromPath(params.category_annotation) : [] -workflow { CLEAN_ADATA(ch_adata_input, params.integration) ch_adata = CLEAN_ADATA.out.adata @@ -35,9 +35,8 @@ workflow { .map{ meta, obs -> obs} .collect() - MERGE( - ch_adata, - ch_obsm, - ch_obs - ) + emit: + adata = ch_adata + obsm = ch_obsm + obs = ch_obs } \ No newline at end of file From fd289080acc6034c1ff5f5dac5bb2ef89bf0b977 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Fri, 5 Apr 2024 14:41:06 +0200 Subject: [PATCH 02/20] Implement mode-specific configs --- conf/build-extend.config | 57 ++++++++++++++++++++++++++++++++ conf/modes.config | 71 ++++++++++++++++++++++++++++++++++++++++ example/nextflow.config | 1 - example/run.sh | 2 +- main.nf | 2 +- nextflow.config | 28 ++-------------- 6 files changed, 133 insertions(+), 28 deletions(-) create mode 100644 conf/build-extend.config create mode 100644 conf/modes.config diff --git a/conf/build-extend.config b/conf/build-extend.config new file mode 100644 index 0000000..0d73853 --- /dev/null +++ b/conf/build-extend.config @@ -0,0 +1,57 @@ +params { + if (mode == "build" || mode == "extend") { + samplesheet = null + celltypist_model = null + + leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] + integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] + + min_cells = 50 + scshc = false + entropy = false + entropy_initial_smoothness = 0.5 + cell_cycle = true + benchmark_hvgs = 0 + integration_hvgs = 10000 + normalization_method = "log_total" + upset_only = false + decontX = true + + has_extended = false + has_celltypes = true + custom_metadata = [] + custom_hvgs = [] + } +} + +process { + if (params.mode == "build" || params.mode == "extend") { + withName: CELLTYPIST { + ext.when = { params.celltypist_model != null } + } + + withName: CELL_CYCLE { + ext.when = { params.cell_cycle } + } + + withName: SCSHC_CLUSTERING { + ext.when = { params.scshc } + } + + withName: SCSHC_CLUSTERING_QC { + ext.when = { params.scshc } + } + + withName: ENTROPY { + ext.when = { params.entropy } + } + + withName: BENCHMARK_INTEGRATIONS { + ext.when = { params.benchmark_hvgs > 0 } + } + + withName: MERGE_DATASETS { + ext.when = { !params.upset_only } + } + } +} \ No newline at end of file diff --git a/conf/modes.config b/conf/modes.config new file mode 100644 index 0000000..7b42c6e --- /dev/null +++ b/conf/modes.config @@ -0,0 +1,71 @@ +profiles { + build { + params { + mode = "build" + + samplesheet = null + celltypist_model = null + + leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] + integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] + + min_cells = 50 + scshc = false + entropy = false + entropy_initial_smoothness = 0.5 + cell_cycle = true + benchmark_hvgs = 0 + integration_hvgs = 10000 + normalization_method = "log_total" + upset_only = false + decontX = true + + has_extended = false + has_celltypes = true + custom_metadata = [] + custom_hvgs = [] + } + + process { + withName: CELLTYPIST { + ext.when = { params.celltypist_model != null } + } + + withName: CELL_CYCLE { + ext.when = { params.cell_cycle } + } + + withName: SCSHC_CLUSTERING { + ext.when = { params.scshc } + } + + withName: SCSHC_CLUSTERING_QC { + ext.when = { params.scshc } + } + + withName: ENTROPY { + ext.when = { params.entropy } + } + + withName: BENCHMARK_INTEGRATIONS { + ext.when = { params.benchmark_hvgs > 0 } + } + + withName: MERGE_DATASETS { + ext.when = { !params.upset_only } + } + } + } + + extend { + params { + mode = "extend" + } + } + + sub { + params { + mode = "sub" + } + } +} \ No newline at end of file diff --git a/example/nextflow.config b/example/nextflow.config index d3900ce..dafa4e4 100644 --- a/example/nextflow.config +++ b/example/nextflow.config @@ -1,6 +1,5 @@ params { samplesheet = "samplesheet.csv" - mode = "build" benchmark_hvgs = 100 scshc = false diff --git a/example/run.sh b/example/run.sh index 4b38b28..aeda69f 100755 --- a/example/run.sh +++ b/example/run.sh @@ -1,3 +1,3 @@ #!/bin/bash -nextflow run .. -resume -profile apptainer \ No newline at end of file +nextflow run .. -resume -profile apptainer,build \ No newline at end of file diff --git a/main.nf b/main.nf index 54b872a..2d1fab6 100644 --- a/main.nf +++ b/main.nf @@ -25,7 +25,7 @@ workflow { SUB() subworkflow = SUB.out } else { - error "Invalid mode: ${params.mode}. Must be one of 'build', 'extend', or 'sub'" + error "Invalid mode: ${params.mode}. Please enable one of the following profiles: 'build', 'extend', or 'sub'" } diff --git a/nextflow.config b/nextflow.config index 337fad5..18d6c64 100644 --- a/nextflow.config +++ b/nextflow.config @@ -1,32 +1,10 @@ // default parameters params { - samplesheet = null outdir = "output" - mode = null - - celltypist_model = null - - leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] - integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] - - min_cells = 50 - scshc = false - entropy = false - entropy_initial_smoothness = 0.5 - cell_cycle = true - benchmark_hvgs = 0 - integration_hvgs = 10000 - normalization_method = "log_total" - upset_only = false - decontX = true - - has_extended = false - has_celltypes = true - custom_metadata = [] - custom_hvgs = [] - publish_mode = "symlink" + mode = null + resource_scale = 1 max_cpus = 60 max_memory = 300.GB @@ -108,7 +86,7 @@ profiles { } includeConfig "conf/base.config" -includeConfig "conf/modules.config" +includeConfig "conf/modes.config" def check_max(obj, type) { if (type == 'memory') { From cdd7beb3636464fddfede063e4166c5edda17b62 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Fri, 5 Apr 2024 14:52:26 +0200 Subject: [PATCH 03/20] Add mode-specific config files --- conf/build-extend.config | 57 ---------------------------- conf/modes.config | 69 ++++------------------------------ conf/modes/build-extend.config | 53 ++++++++++++++++++++++++++ conf/modes/build.config | 0 conf/modes/extend.config | 0 conf/modes/sub.config | 0 conf/modules.config | 29 -------------- 7 files changed, 61 insertions(+), 147 deletions(-) delete mode 100644 conf/build-extend.config create mode 100644 conf/modes/build-extend.config create mode 100644 conf/modes/build.config create mode 100644 conf/modes/extend.config create mode 100644 conf/modes/sub.config delete mode 100644 conf/modules.config diff --git a/conf/build-extend.config b/conf/build-extend.config deleted file mode 100644 index 0d73853..0000000 --- a/conf/build-extend.config +++ /dev/null @@ -1,57 +0,0 @@ -params { - if (mode == "build" || mode == "extend") { - samplesheet = null - celltypist_model = null - - leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] - integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] - - min_cells = 50 - scshc = false - entropy = false - entropy_initial_smoothness = 0.5 - cell_cycle = true - benchmark_hvgs = 0 - integration_hvgs = 10000 - normalization_method = "log_total" - upset_only = false - decontX = true - - has_extended = false - has_celltypes = true - custom_metadata = [] - custom_hvgs = [] - } -} - -process { - if (params.mode == "build" || params.mode == "extend") { - withName: CELLTYPIST { - ext.when = { params.celltypist_model != null } - } - - withName: CELL_CYCLE { - ext.when = { params.cell_cycle } - } - - withName: SCSHC_CLUSTERING { - ext.when = { params.scshc } - } - - withName: SCSHC_CLUSTERING_QC { - ext.when = { params.scshc } - } - - withName: ENTROPY { - ext.when = { params.entropy } - } - - withName: BENCHMARK_INTEGRATIONS { - ext.when = { params.benchmark_hvgs > 0 } - } - - withName: MERGE_DATASETS { - ext.when = { !params.upset_only } - } - } -} \ No newline at end of file diff --git a/conf/modes.config b/conf/modes.config index 7b42c6e..a8705e7 100644 --- a/conf/modes.config +++ b/conf/modes.config @@ -1,71 +1,18 @@ profiles { build { - params { - mode = "build" - - samplesheet = null - celltypist_model = null - - leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] - integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] - - min_cells = 50 - scshc = false - entropy = false - entropy_initial_smoothness = 0.5 - cell_cycle = true - benchmark_hvgs = 0 - integration_hvgs = 10000 - normalization_method = "log_total" - upset_only = false - decontX = true - - has_extended = false - has_celltypes = true - custom_metadata = [] - custom_hvgs = [] - } - - process { - withName: CELLTYPIST { - ext.when = { params.celltypist_model != null } - } - - withName: CELL_CYCLE { - ext.when = { params.cell_cycle } - } - - withName: SCSHC_CLUSTERING { - ext.when = { params.scshc } - } - - withName: SCSHC_CLUSTERING_QC { - ext.when = { params.scshc } - } - - withName: ENTROPY { - ext.when = { params.entropy } - } - - withName: BENCHMARK_INTEGRATIONS { - ext.when = { params.benchmark_hvgs > 0 } - } - - withName: MERGE_DATASETS { - ext.when = { !params.upset_only } - } - } + params.mode = "build" + includeConfig "modes/build.config" + includeConfig "modes/build-extend.config" } extend { - params { - mode = "extend" - } + params.mode = "extend" + includeConfig "modes/extend.config" + includeConfig "modes/build-extend.config" } sub { - params { - mode = "sub" - } + params.mode = "sub" + includeConfig "modes/sub.config" } } \ No newline at end of file diff --git a/conf/modes/build-extend.config b/conf/modes/build-extend.config new file mode 100644 index 0000000..a33ae85 --- /dev/null +++ b/conf/modes/build-extend.config @@ -0,0 +1,53 @@ +params { + samplesheet = null + celltypist_model = null + + leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] + integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] + + min_cells = 50 + scshc = false + entropy = false + entropy_initial_smoothness = 0.5 + cell_cycle = true + benchmark_hvgs = 0 + integration_hvgs = 10000 + normalization_method = "log_total" + upset_only = false + decontX = true + + has_extended = false + has_celltypes = true + custom_metadata = [] + custom_hvgs = [] +} + +process { + withName: CELLTYPIST { + ext.when = { params.celltypist_model != null } + } + + withName: CELL_CYCLE { + ext.when = { params.cell_cycle } + } + + withName: SCSHC_CLUSTERING { + ext.when = { params.scshc } + } + + withName: SCSHC_CLUSTERING_QC { + ext.when = { params.scshc } + } + + withName: ENTROPY { + ext.when = { params.entropy } + } + + withName: BENCHMARK_INTEGRATIONS { + ext.when = { params.benchmark_hvgs > 0 } + } + + withName: MERGE_DATASETS { + ext.when = { !params.upset_only } + } +} \ No newline at end of file diff --git a/conf/modes/build.config b/conf/modes/build.config new file mode 100644 index 0000000..e69de29 diff --git a/conf/modes/extend.config b/conf/modes/extend.config new file mode 100644 index 0000000..e69de29 diff --git a/conf/modes/sub.config b/conf/modes/sub.config new file mode 100644 index 0000000..e69de29 diff --git a/conf/modules.config b/conf/modules.config deleted file mode 100644 index 3f9b522..0000000 --- a/conf/modules.config +++ /dev/null @@ -1,29 +0,0 @@ -process { - withName: CELLTYPIST { - ext.when = { params.celltypist_model != null } - } - - withName: CELL_CYCLE { - ext.when = { params.cell_cycle } - } - - withName: SCSHC_CLUSTERING { - ext.when = { params.scshc } - } - - withName: SCSHC_CLUSTERING_QC { - ext.when = { params.scshc } - } - - withName: ENTROPY { - ext.when = { params.entropy } - } - - withName: BENCHMARK_INTEGRATIONS { - ext.when = { params.benchmark_hvgs > 0 } - } - - withName: MERGE_DATASETS { - ext.when = { !params.upset_only } - } -} \ No newline at end of file From a2c73ab8dc1c0cefd8fd545d9da853450d437e67 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 08:47:21 +0200 Subject: [PATCH 04/20] Improve code style --- workflows/build.nf | 8 ++++++-- workflows/sub.nf | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/workflows/build.nf b/workflows/build.nf index 366c296..53f32ce 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -10,9 +10,13 @@ include { INTEGRATION } from "../subworkflows/integration.nf" include { DOUBLETS } from "../subworkflows/doublets.nf" include { CLUSTERING } from "../subworkflows/clustering.nf" -if (params.samplesheet) { ch_samplesheet = file(params.samplesheet) } else { exit 1, 'Samplesheet not specified!' } - workflow BUILD { + if (!params.samplesheet) { + exit 1, 'Samplesheet not specified!' + } + + ch_samplesheet = file(params.samplesheet) + PREPROCESSING(ch_samplesheet) ch_adata_integration = PREPROCESSING.out.integration diff --git a/workflows/sub.nf b/workflows/sub.nf index 7b08d37..70a26be 100644 --- a/workflows/sub.nf +++ b/workflows/sub.nf @@ -6,6 +6,7 @@ include { MERGE } from '../modules/merge.nf' workflow SUB { ch_adata_input = Channel.fromPath(params.input) .map{ adata -> [[id: 'input'], adata]} + ch_categories = params.category_annotation ? Channel.fromPath(params.category_annotation) : [] CLEAN_ADATA(ch_adata_input, params.integration) From e99c909f8497113e87022a82a59fca9079d5c6c2 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 09:14:26 +0200 Subject: [PATCH 05/20] Relocate build example run --- example/{ => build}/.gitignore | 0 example/{ => build}/minimal_example.h5ad | Bin example/{ => build}/nextflow.config | 0 example/build/run.sh | 3 +++ example/{ => build}/samplesheet.csv | 0 example/run.sh | 3 --- 6 files changed, 3 insertions(+), 3 deletions(-) rename example/{ => build}/.gitignore (100%) rename example/{ => build}/minimal_example.h5ad (100%) rename example/{ => build}/nextflow.config (100%) create mode 100755 example/build/run.sh rename example/{ => build}/samplesheet.csv (100%) delete mode 100755 example/run.sh diff --git a/example/.gitignore b/example/build/.gitignore similarity index 100% rename from example/.gitignore rename to example/build/.gitignore diff --git a/example/minimal_example.h5ad b/example/build/minimal_example.h5ad similarity index 100% rename from example/minimal_example.h5ad rename to example/build/minimal_example.h5ad diff --git a/example/nextflow.config b/example/build/nextflow.config similarity index 100% rename from example/nextflow.config rename to example/build/nextflow.config diff --git a/example/build/run.sh b/example/build/run.sh new file mode 100755 index 0000000..78192ec --- /dev/null +++ b/example/build/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +nextflow run ../.. -resume -profile apptainer,build \ No newline at end of file diff --git a/example/samplesheet.csv b/example/build/samplesheet.csv similarity index 100% rename from example/samplesheet.csv rename to example/build/samplesheet.csv diff --git a/example/run.sh b/example/run.sh deleted file mode 100755 index aeda69f..0000000 --- a/example/run.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -nextflow run .. -resume -profile apptainer,build \ No newline at end of file From 490f6933437c0b5c20e357485301323caea545ed Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 10:09:29 +0200 Subject: [PATCH 06/20] Add test configuration for sub workflow --- conf/modes.config | 21 +++++++++++++++++++++ conf/modes/build-extend.config | 17 +---------------- conf/modes/sub.config | 6 ++++++ example/build/nextflow.config | 13 ++----------- example/sub/.gitignore | 6 ++++++ example/sub/nextflow.config | 8 ++++++++ example/sub/run.sh | 3 +++ example/test.config | 13 +++++++++++++ modules/split_categories.nf | 2 +- nextflow.config | 2 +- workflows/sub.nf | 8 ++------ 11 files changed, 64 insertions(+), 35 deletions(-) create mode 100644 example/sub/.gitignore create mode 100644 example/sub/nextflow.config create mode 100755 example/sub/run.sh create mode 100644 example/test.config diff --git a/conf/modes.config b/conf/modes.config index a8705e7..bc83b93 100644 --- a/conf/modes.config +++ b/conf/modes.config @@ -1,3 +1,24 @@ +params { + leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] + entropy = false + entropy_initial_smoothness = 0.5 + scshc = false +} + +process { + withName: SCSHC_CLUSTERING { + ext.when = { params.scshc } + } + + withName: SCSHC_CLUSTERING_QC { + ext.when = { params.scshc } + } + + withName: ENTROPY { + ext.when = { params.entropy } + } +} + profiles { build { params.mode = "build" diff --git a/conf/modes/build-extend.config b/conf/modes/build-extend.config index a33ae85..207fdf9 100644 --- a/conf/modes/build-extend.config +++ b/conf/modes/build-extend.config @@ -2,13 +2,10 @@ params { samplesheet = null celltypist_model = null - leiden_resolutions = [0.25, 0.5, 0.75, 1, 1.5, 2] integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] min_cells = 50 - scshc = false - entropy = false - entropy_initial_smoothness = 0.5 + cell_cycle = true benchmark_hvgs = 0 integration_hvgs = 10000 @@ -31,18 +28,6 @@ process { ext.when = { params.cell_cycle } } - withName: SCSHC_CLUSTERING { - ext.when = { params.scshc } - } - - withName: SCSHC_CLUSTERING_QC { - ext.when = { params.scshc } - } - - withName: ENTROPY { - ext.when = { params.entropy } - } - withName: BENCHMARK_INTEGRATIONS { ext.when = { params.benchmark_hvgs > 0 } } diff --git a/conf/modes/sub.config b/conf/modes/sub.config index e69de29..87a1efe 100644 --- a/conf/modes/sub.config +++ b/conf/modes/sub.config @@ -0,0 +1,6 @@ +params { + input = null + integration = null + annotation = null + split_on = null +} \ No newline at end of file diff --git a/example/build/nextflow.config b/example/build/nextflow.config index dafa4e4..1545a54 100644 --- a/example/build/nextflow.config +++ b/example/build/nextflow.config @@ -1,20 +1,11 @@ +includeConfig "../test.config" + params { samplesheet = "samplesheet.csv" benchmark_hvgs = 100 - scshc = false - entropy = false cell_cycle = true - leiden_resolutions = [0.5, 1] celltypist_model = "Cells_Intestinal_Tract.pkl" integration_methods = ["scvi", "scanvi", "harmony", "desc", "combat"] - - max_cpus = 4 - max_memory = "12G" - max_time = "6.h" -} - -process { - executor = "local" } \ No newline at end of file diff --git a/example/sub/.gitignore b/example/sub/.gitignore new file mode 100644 index 0000000..c981920 --- /dev/null +++ b/example/sub/.gitignore @@ -0,0 +1,6 @@ +* +!.gitignore +!minimal_example.h5ad +!nextflow.config +!run.sh +!samplesheet.csv \ No newline at end of file diff --git a/example/sub/nextflow.config b/example/sub/nextflow.config new file mode 100644 index 0000000..9ece30e --- /dev/null +++ b/example/sub/nextflow.config @@ -0,0 +1,8 @@ +includeConfig "../test.config" + +params { + input = "atlas.h5ad" + annotation = "annotation.csv" + integration = 'scanvi' + split_on = 'scanvi_leiden_0.5' +} \ No newline at end of file diff --git a/example/sub/run.sh b/example/sub/run.sh new file mode 100755 index 0000000..a15175b --- /dev/null +++ b/example/sub/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +nextflow run ../.. -resume -profile apptainer,sub \ No newline at end of file diff --git a/example/test.config b/example/test.config new file mode 100644 index 0000000..e8fa6a4 --- /dev/null +++ b/example/test.config @@ -0,0 +1,13 @@ +params { + scshc = false + entropy = false + leiden_resolutions = [0.5, 1] + + max_cpus = 4 + max_memory = "12G" + max_time = "6.h" +} + +process { + executor = "local" +} \ No newline at end of file diff --git a/modules/split_categories.nf b/modules/split_categories.nf index af946eb..1a7afe6 100644 --- a/modules/split_categories.nf +++ b/modules/split_categories.nf @@ -27,7 +27,7 @@ process SPLIT_CATEGORIES { for category in adata.obs["category"].unique(): adata_category = adata[adata.obs["category"] == category] - adata_category.write_h5ad(f"{category.replace(' ', '_')}.category.h5ad") + adata_category.write_h5ad(f"{category.replace(' ', '_') if type(category) == str else category}.category.h5ad") """ else """ diff --git a/nextflow.config b/nextflow.config index 18d6c64..069f592 100644 --- a/nextflow.config +++ b/nextflow.config @@ -1,7 +1,7 @@ // default parameters params { outdir = "output" - publish_mode = "symlink" + publish_mode = "copy" mode = null diff --git a/workflows/sub.nf b/workflows/sub.nf index 70a26be..f16e0ed 100644 --- a/workflows/sub.nf +++ b/workflows/sub.nf @@ -7,12 +7,12 @@ workflow SUB { ch_adata_input = Channel.fromPath(params.input) .map{ adata -> [[id: 'input'], adata]} - ch_categories = params.category_annotation ? Channel.fromPath(params.category_annotation) : [] + ch_categories = params.annotation ? Channel.fromPath(params.annotation) : [] CLEAN_ADATA(ch_adata_input, params.integration) ch_adata = CLEAN_ADATA.out.adata - SPLIT_CATEGORIES(ch_adata, ch_categories, params.category) + SPLIT_CATEGORIES(ch_adata, ch_categories, params.split_on) ch_categoric_adata = SPLIT_CATEGORIES.out .map{ meta, adatas -> adatas } @@ -29,12 +29,8 @@ workflow SUB { ch_obsm = CLUSTERING.out.obsm .mix(CLEAN_ADATA.out.embedding) .mix(CLEAN_ADATA.out.umap) - .map{ meta, obsm -> obsm} - .collect() ch_obs = CLUSTERING.out.obs - .map{ meta, obs -> obs} - .collect() emit: adata = ch_adata From aa9e22b0ea4c5a282b4c84565dbc7ac06e01b80c Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 10:34:21 +0200 Subject: [PATCH 07/20] Add basic extend workflow --- conf/modes/extend.config | 3 +++ example/extend/.gitignore | 6 ++++++ example/extend/nextflow.config | 11 +++++++++++ example/extend/run.sh | 3 +++ example/extend/samplesheet.csv | 2 ++ subworkflows/preprocessing.nf | 1 + workflows/build.nf | 2 +- workflows/extend.nf | 18 ++++++++++++++++++ 8 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 example/extend/.gitignore create mode 100644 example/extend/nextflow.config create mode 100755 example/extend/run.sh create mode 100644 example/extend/samplesheet.csv diff --git a/conf/modes/extend.config b/conf/modes/extend.config index e69de29..f3d7845 100644 --- a/conf/modes/extend.config +++ b/conf/modes/extend.config @@ -0,0 +1,3 @@ +params { + base = null +} \ No newline at end of file diff --git a/example/extend/.gitignore b/example/extend/.gitignore new file mode 100644 index 0000000..c981920 --- /dev/null +++ b/example/extend/.gitignore @@ -0,0 +1,6 @@ +* +!.gitignore +!minimal_example.h5ad +!nextflow.config +!run.sh +!samplesheet.csv \ No newline at end of file diff --git a/example/extend/nextflow.config b/example/extend/nextflow.config new file mode 100644 index 0000000..37f56e4 --- /dev/null +++ b/example/extend/nextflow.config @@ -0,0 +1,11 @@ +includeConfig "../test.config" + +params { + samplesheet = "samplesheet.csv" + base = "atlas.h5ad" + + benchmark_hvgs = 100 + cell_cycle = true + + celltypist_model = "Cells_Intestinal_Tract.pkl" +} \ No newline at end of file diff --git a/example/extend/run.sh b/example/extend/run.sh new file mode 100755 index 0000000..93ef62b --- /dev/null +++ b/example/extend/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +nextflow run ../.. -resume -profile apptainer,extend \ No newline at end of file diff --git a/example/extend/samplesheet.csv b/example/extend/samplesheet.csv new file mode 100644 index 0000000..8c80dff --- /dev/null +++ b/example/extend/samplesheet.csv @@ -0,0 +1,2 @@ +id,input_adata +transfer,transfer.h5ad diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index 256bf21..c9cbf30 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -13,6 +13,7 @@ include { IDENTIFY_HVGS } from "../modules/identify_hvgs.nf" workflow PREPROCESSING { take: ch_samplesheet + ch_base main: ch_samples = Channel.from(check_samplesheet(ch_samplesheet.toString())) diff --git a/workflows/build.nf b/workflows/build.nf index 53f32ce..e9ee844 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -17,7 +17,7 @@ workflow BUILD { ch_samplesheet = file(params.samplesheet) - PREPROCESSING(ch_samplesheet) + PREPROCESSING(ch_samplesheet, []) ch_adata_integration = PREPROCESSING.out.integration ch_adata_intersection = PREPROCESSING.out.intersection diff --git a/workflows/extend.nf b/workflows/extend.nf index c1491fd..b92783e 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -1,4 +1,22 @@ +// Workflows +include { PREPROCESSING } from "../subworkflows/preprocessing.nf" + + workflow EXTEND { + if (!params.samplesheet) { + exit 1, 'Samplesheet not specified!' + } + + ch_samplesheet = file(params.samplesheet) + + if (!params.base) { + exit 1, 'Base path not specified!' + } + + ch_base = Channel.value(file(params.base)).map{ base -> [[id: "base"], base]} + + PREPROCESSING(ch_samplesheet, ch_base) + ch_adata = Channel.empty() ch_obsm = Channel.empty() ch_obs = Channel.empty() From cb3cd07f6fdf7fa7b10f02b607a5302445af2825 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 10:39:34 +0200 Subject: [PATCH 08/20] Simplify preprocessing --- subworkflows/preprocessing.nf | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index c9cbf30..21f710d 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -32,9 +32,12 @@ workflow PREPROCESSING { COLLECT_PROBLEMS(PREPROCESS.out.problems.map{ meta, problems -> problems}.collect()) STOP_IF_PROBLEMS(COLLECT_PROBLEMS.out) - GENES_UPSET(PREPROCESS.out.adata.map{ meta, adata -> adata }.collect()) + adatas = PREPROCESS.out.adata.map{ meta, adata -> adata }.collect() + + GENES_UPSET(adatas) + MERGE_DATASETS( - PREPROCESS.out.adata.flatMap{ meta, adata -> adata }.collect(), + adatas, params.min_cells, params.custom_hvgs ) From 4855391449cc7d027bf7729e9e15fe03060dd843 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 11:37:40 +0200 Subject: [PATCH 09/20] Remove transfer from build workflow --- bin/merge_datasets.py | 66 +++++++++++++--------------------- bin/preprocess.py | 3 -- conf/modes/extend.config | 1 + example/extend/nextflow.config | 1 + modules/merge_datasets.nf | 6 ++-- modules/prepare_solo.nf | 5 --- modules/preprocess.nf | 3 +- subworkflows/doublets.nf | 2 -- subworkflows/integration.nf | 27 ++++---------- subworkflows/preprocessing.nf | 24 ++++--------- subworkflows/transfer.nf | 22 ++++++++++++ workflows/build.nf | 12 +++---- workflows/extend.nf | 6 ++++ 13 files changed, 74 insertions(+), 104 deletions(-) create mode 100644 subworkflows/transfer.nf diff --git a/bin/merge_datasets.py b/bin/merge_datasets.py index ad3a1b4..9d70aeb 100755 --- a/bin/merge_datasets.py +++ b/bin/merge_datasets.py @@ -4,16 +4,11 @@ import anndata as ad import scanpy as sc from scipy.sparse import csr_matrix -import numpy as np - - parser = argparse.ArgumentParser(description="Merge datasets") parser.add_argument("--input", help="Input file", type=str, nargs="+") -parser.add_argument("--output_integration", help="Output file containing only cells which do not require transfer learning", type=str) parser.add_argument("--output_intersection", help="Output file containing all cells but gene intersection", type=str) -parser.add_argument("--output_transfer", help="Output file containing all cells which require transfer learning", type=str) -parser.add_argument("--output_counts", help="Output file, outer join of cells and genes", type=str) +parser.add_argument("--output_union", help="Output file, outer join of cells and genes", type=str) parser.add_argument("--min_cells", help='Minimum number of cells to keep a gene', type=int, required=False, default=50) parser.add_argument("--custom_genes", help="Additional genes to include", type=str, nargs="*") @@ -21,36 +16,32 @@ datasets = [ad.read_h5ad(f) for f in args.input] -adata = ad.concat(datasets) -adata_outer = ad.concat(datasets, join='outer') +adata_intersection = ad.concat(datasets) +adata_union = ad.concat(datasets, join='outer') -additional_genes = [gene for gene in args.custom_genes if gene not in adata.var_names and gene in adata_outer.var_names] +additional_genes = [gene for gene in args.custom_genes if gene not in adata_intersection.var_names and gene in adata_union.var_names] # Add custom genes from outer join to the intersection if additional_genes: - adata_additional = adata_outer[adata.obs_names, additional_genes] - adata_concatenated = ad.concat([adata, adata_additional], join="outer", axis=1) - adata_concatenated.obs, adata_concatenated.obsm = adata.obs, adata.obsm - adata = adata_concatenated + adata_additional = adata_union[adata_intersection.obs_names, additional_genes] + adata_concatenated = ad.concat([adata_intersection, adata_additional], join="outer", axis=1) + adata_concatenated.obs, adata_concatenated.obsm = adata_intersection.obs, adata_intersection.obsm + adata_intersection = adata_concatenated # Convert to CSR matrix -adata.X = csr_matrix(adata.X) -adata_outer.X = csr_matrix(adata_outer.X) - -# Filter genes with no counts in core atlas -gene_mask, _ = sc.pp.filter_genes(adata[~adata.obs["transfer"]], min_cells=1, inplace=False) -adata = adata[:, gene_mask] +adata_intersection.X = csr_matrix(adata_intersection.X) +adata_union.X = csr_matrix(adata_union.X) # Filter cells with no counts -cell_mask, _ = sc.pp.filter_cells(adata, min_genes=1, inplace=False) -adata = adata[cell_mask, :] -adata_outer = adata_outer[cell_mask, :] +cell_mask, _ = sc.pp.filter_cells(adata_intersection, min_genes=1, inplace=False) +adata_intersection = adata_intersection[cell_mask, :] +adata_union = adata_union[cell_mask, :] # Filter genes with too few occurrences in outer join -sc.pp.filter_genes(adata_outer, min_cells=args.min_cells) +sc.pp.filter_genes(adata_union, min_cells=args.min_cells) -adata.obs["batch"] = adata.obs["dataset"].astype(str) + "_" + adata.obs["batch"].astype(str) -adata.obs["patient"] = adata.obs["dataset"].astype(str) + "_" + adata.obs["patient"].astype(str) +adata_intersection.obs["batch"] = adata_intersection.obs["dataset"].astype(str) + "_" + adata_intersection.obs["batch"].astype(str) +adata_intersection.obs["patient"] = adata_intersection.obs["dataset"].astype(str) + "_" + adata_intersection.obs["patient"].astype(str) def to_Florent_case(s: str): corrected = s.lower().strip() @@ -77,25 +68,16 @@ def to_Florent_case(s: str): return corrected[0].upper() + corrected[1:] -for column in adata.obs.columns: - if column == "transfer": - continue - if not adata.obs[column].dtype.name == "category" and not adata.obs[column].dtype.name == "object": +for column in adata_intersection.obs.columns: + if not adata_intersection.obs[column].dtype.name == "category" and not adata_intersection.obs[column].dtype.name == "object": continue # Convert first to string and then to category - adata.obs[column] = adata.obs[column].astype(str).fillna("Unknown").apply(to_Florent_case).astype("category") - -adata_outer.obs = adata.obs - -adata.layers["counts"] = adata.X -adata_outer.layers["counts"] = adata_outer.X + adata_intersection.obs[column] = adata_intersection.obs[column].astype(str).fillna("Unknown").apply(to_Florent_case).astype("category") -if any(adata.obs["transfer"]): - adata_transfer = adata[adata.obs["transfer"]] - adata_transfer.write_h5ad(args.output_transfer) +adata_union.obs = adata_intersection.obs -adata_notransfer = adata[~adata.obs["transfer"]] -adata_notransfer.write_h5ad(args.output_integration) +adata_intersection.layers["counts"] = adata_intersection.X +adata_union.layers["counts"] = adata_union.X -adata.write_h5ad(args.output_intersection) -adata_outer.write_h5ad(args.output_counts) \ No newline at end of file +adata_intersection.write_h5ad(args.output_intersection) +adata_union.write_h5ad(args.output_union) diff --git a/bin/preprocess.py b/bin/preprocess.py index aa17a93..5bb7f09 100755 --- a/bin/preprocess.py +++ b/bin/preprocess.py @@ -13,7 +13,6 @@ "patient": True, "tissue": True, "dataset": True, - "transfer": True } parser = argparse.ArgumentParser(description="Filter dataset") @@ -22,7 +21,6 @@ parser.add_argument("--output", help="Output file", type=str) parser.add_argument("--problems", help="Problems file", type=str) parser.add_argument("--no-symbols", help="Convert varnames to gene symbols", action="store_true") -parser.add_argument("--transfer", help="Apply transfer leanring on dataset", action="store_true") parser.add_argument("--sure_raw", help="Skip check for raw counts", action="store_true") parser.add_argument("--custom_metadata", help="Additional metadata columns to include", type=str, nargs="*") @@ -57,7 +55,6 @@ def aggregate_duplicate_var(adata, aggr_fun=np.mean): print("Reading input") adata = sc.read_h5ad(args.input) adata.obs["dataset"] = args.id -adata.obs["transfer"] = args.transfer if adata.__dict__["_raw"] and "_index" in adata.__dict__["_raw"].__dict__["_var"]: adata.__dict__["_raw"].__dict__["_var"] = ( diff --git a/conf/modes/extend.config b/conf/modes/extend.config index f3d7845..4075880 100644 --- a/conf/modes/extend.config +++ b/conf/modes/extend.config @@ -1,3 +1,4 @@ params { base = null + model = null } \ No newline at end of file diff --git a/example/extend/nextflow.config b/example/extend/nextflow.config index 37f56e4..3e3dcdb 100644 --- a/example/extend/nextflow.config +++ b/example/extend/nextflow.config @@ -3,6 +3,7 @@ includeConfig "../test.config" params { samplesheet = "samplesheet.csv" base = "atlas.h5ad" + model = "model" benchmark_hvgs = 100 cell_cycle = true diff --git a/modules/merge_datasets.nf b/modules/merge_datasets.nf index d49c4b1..f01e130 100644 --- a/modules/merge_datasets.nf +++ b/modules/merge_datasets.nf @@ -9,16 +9,14 @@ process MERGE_DATASETS { val(custom_genes) output: - path("datasets.integration.h5ad"), emit: integration - path("datasets.counts.h5ad"), emit: counts + path("datasets.union.h5ad") , emit: union path("datasets.intersection.h5ad"), emit: intersection - path("datasets.transfer.h5ad"), emit: transfer, optional: true when: task.ext.when == null || task.ext.when script: """ - merge_datasets.py --input ${adatas} --custom_genes ${custom_genes.join(" ")} --min_cells ${min_cells} --output_transfer datasets.transfer.h5ad --output_intersection datasets.intersection.h5ad --output_integration datasets.integration.h5ad --output_counts datasets.counts.h5ad + merge_datasets.py --input ${adatas} --custom_genes ${custom_genes.join(" ")} --min_cells ${min_cells} --output_intersection datasets.intersection.h5ad --output_union datasets.union.h5ad """ } \ No newline at end of file diff --git a/modules/prepare_solo.nf b/modules/prepare_solo.nf index e0b3c11..b995d18 100644 --- a/modules/prepare_solo.nf +++ b/modules/prepare_solo.nf @@ -7,7 +7,6 @@ process PREPARE_SOLO { input: tuple val(meta), path(adata) - tuple val(meta2), path(adata_core) tuple val(meta3), path(hvgs) output: @@ -22,12 +21,8 @@ process PREPARE_SOLO { import pandas as pd adata = ad.read_h5ad("${adata}") - adata_core = ad.read_h5ad("${adata_core}") df_hvgs = pd.read_pickle("${hvgs}") - known_cell_types = adata_core.obs["cell_type"].unique() - adata.obs["cell_type"] = adata.obs["cell_type"].map(lambda original: original if original in known_cell_types else "Unknown") - adata_hvg = adata[:, df_hvgs[df_hvgs["highly_variable"]].index] with open("batches.txt", "w") as f: diff --git a/modules/preprocess.nf b/modules/preprocess.nf index 37a7507..028b35e 100644 --- a/modules/preprocess.nf +++ b/modules/preprocess.nf @@ -19,9 +19,8 @@ process PREPROCESS { max_genes = meta.max_genes ? "--max_genes ${meta.max_genes}" : "" max_pct_mito = meta.max_pct_mito ? "--max_pct_mito ${meta.max_pct_mito}" : "" no_symbols = meta.no_symbols && meta.no_symbols.toLowerCase() == "true" ? "--no-symbols" : "" - transfer = meta.transfer && meta.transfer.toLowerCase() == "true" ? "--transfer" : "" sure_raw = meta.sure_raw && meta.sure_raw.toLowerCase() == "true" ? "--sure_raw" : "" """ - preprocess.py --input ${input} --custom_metadata ${custom_metadata.join(" ")} --id ${meta.id} ${sure_raw} ${transfer} ${no_symbols} ${min_counts} ${max_counts} ${min_genes} ${max_genes} ${max_pct_mito} --output ${meta.id}.preprocessed.h5ad --problems ${meta.id}.problems.txt + preprocess.py --input ${input} --custom_metadata ${custom_metadata.join(" ")} --id ${meta.id} ${sure_raw} ${no_symbols} ${min_counts} ${max_counts} ${min_genes} ${max_genes} ${max_pct_mito} --output ${meta.id}.preprocessed.h5ad --problems ${meta.id}.problems.txt """ } \ No newline at end of file diff --git a/subworkflows/doublets.nf b/subworkflows/doublets.nf index 476a55e..4dd133a 100644 --- a/subworkflows/doublets.nf +++ b/subworkflows/doublets.nf @@ -8,7 +8,6 @@ include { EXTRACT_EMBEDDING } from "../modules/extract_embedding.nf" workflow DOUBLETS { take: ch_adata - ch_adata_core ch_hvgs ch_scanvi_model ch_integrations @@ -17,7 +16,6 @@ workflow DOUBLETS { main: PREPARE_SOLO( ch_adata, - ch_adata_core, ch_hvgs ) diff --git a/subworkflows/integration.nf b/subworkflows/integration.nf index 78f93d5..68d1488 100644 --- a/subworkflows/integration.nf +++ b/subworkflows/integration.nf @@ -32,10 +32,9 @@ gpu_integrations = ["scgen"] */ workflow INTEGRATION { take: - ch_adata_core + ch_adata ch_hvgs ch_integration_methods - ch_adata_extended benchmark_hvgs main: @@ -47,19 +46,19 @@ workflow INTEGRATION { } INTEGRATE( - ch_adata_core, + ch_adata, ch_hvgs, ch_integration_methods.cpu ) INTEGRATE_GPU( - ch_adata_core, + ch_adata, ch_hvgs, ch_integration_methods.gpu ) INTEGRATE_SCVI( - ch_adata_core, + ch_adata, ch_hvgs, "scvi" ) @@ -70,7 +69,7 @@ workflow INTEGRATION { if (params.has_celltypes) { INTEGRATE_SCANVI( - ch_adata_core, + ch_adata, ch_hvgs, INTEGRATE_SCVI.out.model ) @@ -86,25 +85,11 @@ workflow INTEGRATION { ch_core_integrated = INTEGRATE_SCVI.out.integrated } - INTEGRATE_SCARCHES( - ch_adata_extended, - ch_core_integrated.join(ch_arches_basemodel), - params.has_celltypes, - ch_hvgs - ) - - MERGE_EXTENDED( - INTEGRATE_SCARCHES.out.integrated, - ch_core_integrated - ) - - ch_integrated = ch_integrated.mix(MERGE_EXTENDED.out) - ch_integrated_types = ch_integrated .map{ meta, adata -> [meta, adata, integration_types[meta.integration]] } BENCHMARKING( - ch_adata_core, + ch_adata, ch_integrated_types, benchmark_hvgs ) diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index 21f710d..e26c5a9 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -32,41 +32,31 @@ workflow PREPROCESSING { COLLECT_PROBLEMS(PREPROCESS.out.problems.map{ meta, problems -> problems}.collect()) STOP_IF_PROBLEMS(COLLECT_PROBLEMS.out) - adatas = PREPROCESS.out.adata.map{ meta, adata -> adata }.collect() - - GENES_UPSET(adatas) + GENES_UPSET(PREPROCESS.out.adata.mix(ch_base).map{ meta, adata -> adata }.collect()) MERGE_DATASETS( - adatas, + PREPROCESS.out.adata.map{ meta, adata -> adata }.collect(), params.min_cells, params.custom_hvgs ) - - ch_adata_integration = MERGE_DATASETS.out.integration - .map{ adata -> [[id: "integration"], adata] } ch_adata_intersection = MERGE_DATASETS.out.intersection .map{ adata -> [[id: "intersection"], adata] } - ch_adata_counts = MERGE_DATASETS.out.counts - .map{ adata -> [[id: "counts"], adata] } - - ch_transfer = MERGE_DATASETS.out.transfer.flatten() - .map{ adata -> [[id: adata.simpleName], adata]} + ch_adata_union = MERGE_DATASETS.out.union + .map{ adata -> [[id: "union"], adata] } COMPOSITION(ch_adata_intersection) DISTRIBUTION(ch_adata_intersection) IDENTIFY_HVGS( - ch_adata_integration, + ch_adata_intersection, params.integration_hvgs, params.custom_hvgs ) emit: - integration = ch_adata_integration intersection = ch_adata_intersection - counts = ch_adata_counts - transfer = ch_transfer - hvgs = IDENTIFY_HVGS.out + union = ch_adata_union + hvgs = IDENTIFY_HVGS.out } \ No newline at end of file diff --git a/subworkflows/transfer.nf b/subworkflows/transfer.nf new file mode 100644 index 0000000..1b2c2b0 --- /dev/null +++ b/subworkflows/transfer.nf @@ -0,0 +1,22 @@ +workflow TRANSFER { + take: + ch_base + ch_model + ch_extension + + main: + INTEGRATE_SCARCHES( + ch_extension, + ch_base.join(ch_model), + params.has_celltypes, + ch_hvgs + ) + + MERGE_EXTENDED( + INTEGRATE_SCARCHES.out.integrated, + ch_core_integrated + ) + + emit: + +} \ No newline at end of file diff --git a/workflows/build.nf b/workflows/build.nf index e9ee844..84b2acf 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -17,15 +17,13 @@ workflow BUILD { ch_samplesheet = file(params.samplesheet) - PREPROCESSING(ch_samplesheet, []) + PREPROCESSING(ch_samplesheet, Channel.empty()) - ch_adata_integration = PREPROCESSING.out.integration ch_adata_intersection = PREPROCESSING.out.intersection - ch_adata_counts = PREPROCESSING.out.counts - ch_adata_transfer = PREPROCESSING.out.transfer + ch_adata_union = PREPROCESSING.out.union ch_hvgs = PREPROCESSING.out.hvgs - COUNTS(ch_adata_counts, params.normalization_method) + COUNTS(ch_adata_union, params.normalization_method) CELLTYPIST( ch_adata_intersection, @@ -40,16 +38,14 @@ workflow BUILD { CELL_QC(ch_adata_intersection) INTEGRATION( - ch_adata_integration, + ch_adata_intersection, ch_hvgs, Channel.from(params.integration_methods).mix(Channel.value("unintegrated")), - ch_adata_transfer, Channel.value(params.benchmark_hvgs) ) DOUBLETS( ch_adata_intersection, - ch_adata_integration, ch_hvgs, INTEGRATION.out.model, INTEGRATION.out.integrated, diff --git a/workflows/extend.nf b/workflows/extend.nf index b92783e..c571ddf 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -15,6 +15,12 @@ workflow EXTEND { ch_base = Channel.value(file(params.base)).map{ base -> [[id: "base"], base]} + if (!params.model) { + exit 1, 'Model not specified!' + } + + ch_model = Channel.value(file(params.model)).map{ model -> [[id: "model"], model]} + PREPROCESSING(ch_samplesheet, ch_base) ch_adata = Channel.empty() From ea4cff11815d8b63871b302cb03fbd37ae59c9a2 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 12:02:30 +0200 Subject: [PATCH 10/20] Implement support for base dataset in dataset merging --- bin/merge_datasets.py | 17 +++++++++++++++-- modules/merge_datasets.nf | 24 +++++++++++++++++++++--- subworkflows/preprocessing.nf | 1 + workflows/build.nf | 2 +- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/bin/merge_datasets.py b/bin/merge_datasets.py index 9d70aeb..6af50ff 100755 --- a/bin/merge_datasets.py +++ b/bin/merge_datasets.py @@ -7,8 +7,10 @@ parser = argparse.ArgumentParser(description="Merge datasets") parser.add_argument("--input", help="Input file", type=str, nargs="+") -parser.add_argument("--output_intersection", help="Output file containing all cells but gene intersection", type=str) -parser.add_argument("--output_union", help="Output file, outer join of cells and genes", type=str) +parser.add_argument("--base", help="Base dataset to use as reference", type=str, required=False) +parser.add_argument("--output_intersection", help="Output file containing all cells but gene intersection", type=str, required=True) +parser.add_argument("--output_union", help="Output file, outer join of cells and genes", type=str, required=True) +parser.add_argument("--output_transfer", help="Output file, cells to project onto base", type=str, required=False) parser.add_argument("--min_cells", help='Minimum number of cells to keep a gene', type=int, required=False, default=50) parser.add_argument("--custom_genes", help="Additional genes to include", type=str, nargs="*") @@ -16,6 +18,13 @@ datasets = [ad.read_h5ad(f) for f in args.input] +if args.base: + if not args.output_transfer: + raise ValueError("Transfer file required when using base dataset") + + adata_base = ad.read_h5ad(args.base) + datasets = [adata_base] + datasets + adata_intersection = ad.concat(datasets) adata_union = ad.concat(datasets, join='outer') @@ -81,3 +90,7 @@ def to_Florent_case(s: str): adata_intersection.write_h5ad(args.output_intersection) adata_union.write_h5ad(args.output_union) + +if args.base: + adata_transfer = adata_intersection[~adata_intersection.obs.index.isin(adata_base.obs.index)] + adata_transfer.write_h5ad(args.output_transfer) \ No newline at end of file diff --git a/modules/merge_datasets.nf b/modules/merge_datasets.nf index f01e130..d062363 100644 --- a/modules/merge_datasets.nf +++ b/modules/merge_datasets.nf @@ -5,18 +5,36 @@ process MERGE_DATASETS { input: path(adatas) + tuple val(meta), path(base) val(min_cells) val(custom_genes) output: path("datasets.union.h5ad") , emit: union path("datasets.intersection.h5ad"), emit: intersection + path("datasets.transfer.h5ad") , emit: transfer, optional: true when: task.ext.when == null || task.ext.when script: - """ - merge_datasets.py --input ${adatas} --custom_genes ${custom_genes.join(" ")} --min_cells ${min_cells} --output_intersection datasets.intersection.h5ad --output_union datasets.union.h5ad - """ + if (base) { + """ + merge_datasets.py --input ${adatas} \\ + --base ${base} \\ + --custom_genes ${custom_genes.join(" ")} \\ + --min_cells ${min_cells} \\ + --output_intersection datasets.intersection.h5ad \\ + --output_union datasets.union.h5ad \\ + --output_transfer datasets.transfer.h5ad + """ + } else { + """ + merge_datasets.py --input ${adatas} \\ + --custom_genes ${custom_genes.join(" ")} \\ + --min_cells ${min_cells} \\ + --output_intersection datasets.intersection.h5ad \\ + --output_union datasets.union.h5ad + """ + } } \ No newline at end of file diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index e26c5a9..dea3081 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -36,6 +36,7 @@ workflow PREPROCESSING { MERGE_DATASETS( PREPROCESS.out.adata.map{ meta, adata -> adata }.collect(), + ch_base.collect(), params.min_cells, params.custom_hvgs ) diff --git a/workflows/build.nf b/workflows/build.nf index 84b2acf..9cb139a 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -17,7 +17,7 @@ workflow BUILD { ch_samplesheet = file(params.samplesheet) - PREPROCESSING(ch_samplesheet, Channel.empty()) + PREPROCESSING(ch_samplesheet, Channel.value([[], []])) ch_adata_intersection = PREPROCESSING.out.intersection ch_adata_union = PREPROCESSING.out.union From 157a39a780ec937f3261a09334eb9a99967a6cfd Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 12:10:10 +0200 Subject: [PATCH 11/20] Disable HVG identification for extend workflow --- conf/modes/extend.config | 6 ++++++ modules/identify_hvgs.nf | 3 +++ subworkflows/preprocessing.nf | 4 ++++ 3 files changed, 13 insertions(+) diff --git a/conf/modes/extend.config b/conf/modes/extend.config index 4075880..70529aa 100644 --- a/conf/modes/extend.config +++ b/conf/modes/extend.config @@ -1,4 +1,10 @@ params { base = null model = null +} + +process { + withName: IDENTIFY_HVGS { + ext.when = false + } } \ No newline at end of file diff --git a/modules/identify_hvgs.nf b/modules/identify_hvgs.nf index caece9f..621a5e9 100644 --- a/modules/identify_hvgs.nf +++ b/modules/identify_hvgs.nf @@ -12,6 +12,9 @@ process IDENTIFY_HVGS { output: tuple val(meta), path("${meta.id}.hvgs.pkl") + + when: + task.ext.when == null || task.ext.when script: """ diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index dea3081..0f0dcb6 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -47,6 +47,9 @@ workflow PREPROCESSING { ch_adata_union = MERGE_DATASETS.out.union .map{ adata -> [[id: "union"], adata] } + ch_adata_transfer = MERGE_DATASETS.out.transfer + .map{ adata -> [[id: "transfer"], adata] } + COMPOSITION(ch_adata_intersection) DISTRIBUTION(ch_adata_intersection) @@ -59,5 +62,6 @@ workflow PREPROCESSING { emit: intersection = ch_adata_intersection union = ch_adata_union + transfer = ch_adata_transfer hvgs = IDENTIFY_HVGS.out } \ No newline at end of file From 8c0b709d2f783777932027e486778e0cff5ce1ef Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 12:24:50 +0200 Subject: [PATCH 12/20] Add more functionalities to extend workflow --- conf/modes/build-extend.config | 9 ++------- conf/modes/build.config | 4 ++++ conf/modes/extend.config | 3 --- subworkflows/integration.nf | 6 +++--- subworkflows/preprocessing.nf | 18 ++++++++++++------ workflows/extend.nf | 24 ++++++++++++++++++++++++ 6 files changed, 45 insertions(+), 19 deletions(-) diff --git a/conf/modes/build-extend.config b/conf/modes/build-extend.config index 207fdf9..2557e54 100644 --- a/conf/modes/build-extend.config +++ b/conf/modes/build-extend.config @@ -1,22 +1,17 @@ params { samplesheet = null celltypist_model = null - - integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] - min_cells = 50 - cell_cycle = true + custom_hvgs = [] benchmark_hvgs = 0 - integration_hvgs = 10000 + normalization_method = "log_total" upset_only = false decontX = true - has_extended = false has_celltypes = true custom_metadata = [] - custom_hvgs = [] } process { diff --git a/conf/modes/build.config b/conf/modes/build.config index e69de29..34e3e53 100644 --- a/conf/modes/build.config +++ b/conf/modes/build.config @@ -0,0 +1,4 @@ +params { + integration_hvgs = 10000 + integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] +} \ No newline at end of file diff --git a/conf/modes/extend.config b/conf/modes/extend.config index 70529aa..9c3debc 100644 --- a/conf/modes/extend.config +++ b/conf/modes/extend.config @@ -4,7 +4,4 @@ params { } process { - withName: IDENTIFY_HVGS { - ext.when = false - } } \ No newline at end of file diff --git a/subworkflows/integration.nf b/subworkflows/integration.nf index 68d1488..c266980 100644 --- a/subworkflows/integration.nf +++ b/subworkflows/integration.nf @@ -76,11 +76,11 @@ workflow INTEGRATION { ch_integrated = ch_integrated.mix(INTEGRATE_SCANVI.out.integrated) - ch_arches_basemodel = INTEGRATE_SCANVI.out.model + ch_model = INTEGRATE_SCANVI.out.model ch_scanvi_labels = INTEGRATE_SCANVI.out.labels ch_core_integrated = INTEGRATE_SCANVI.out.integrated } else { - ch_arches_basemodel = INTEGRATE_SCVI.out.model + ch_model = INTEGRATE_SCVI.out.model ch_scanvi_labels = Channel.empty() ch_core_integrated = INTEGRATE_SCVI.out.integrated } @@ -96,6 +96,6 @@ workflow INTEGRATION { emit: integrated = ch_integrated - model = params.has_extended ? INTEGRATE_SCARCHES.out.model : ch_arches_basemodel + model = ch_model scanvi_labels = ch_scanvi_labels } diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index 0f0dcb6..5c05b0c 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -53,15 +53,21 @@ workflow PREPROCESSING { COMPOSITION(ch_adata_intersection) DISTRIBUTION(ch_adata_intersection) - IDENTIFY_HVGS( - ch_adata_intersection, - params.integration_hvgs, - params.custom_hvgs - ) + if (params.mode != "extend") { + IDENTIFY_HVGS( + ch_adata_intersection, + params.integration_hvgs, + params.custom_hvgs + ) + ch_hvgs = IDENTIFY_HVGS.out + } else { + ch_hvgs = Channel.empty() + } + emit: intersection = ch_adata_intersection union = ch_adata_union transfer = ch_adata_transfer - hvgs = IDENTIFY_HVGS.out + hvgs = ch_hvgs } \ No newline at end of file diff --git a/workflows/extend.nf b/workflows/extend.nf index c571ddf..7ed6af6 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -1,5 +1,11 @@ +// Modules +include { CELLTYPIST } from "../modules/celltypist.nf" +include { CELL_CYCLE } from "../modules/cell_cycle.nf" +include { CELL_QC } from "../modules/cell_qc.nf" + // Workflows include { PREPROCESSING } from "../subworkflows/preprocessing.nf" +include { COUNTS } from "../subworkflows/counts.nf" workflow EXTEND { @@ -23,6 +29,24 @@ workflow EXTEND { PREPROCESSING(ch_samplesheet, ch_base) + ch_adata_intersection = PREPROCESSING.out.intersection + ch_adata_union = PREPROCESSING.out.union + ch_adata_transfer = PREPROCESSING.out.transfer + + COUNTS(ch_adata_union, params.normalization_method) + + CELLTYPIST( + ch_adata_intersection, + params.celltypist_model ?: "" + ) + + CELL_CYCLE( + ch_adata_intersection, + "human" + ) + + CELL_QC(ch_adata_intersection) + ch_adata = Channel.empty() ch_obsm = Channel.empty() ch_obs = Channel.empty() From 8fb6846bf60747920682fdd8431973c91984e86e Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 14:28:29 +0200 Subject: [PATCH 13/20] Start working on SOLO problems --- conf/modes/build-extend.config | 1 + conf/modes/build.config | 1 - modules/identify_hvgs.nf | 73 ++++++++++++++++++++-------------- modules/integrate_scarches.nf | 10 +---- subworkflows/doublets.nf | 4 +- subworkflows/integration.nf | 2 - subworkflows/preprocessing.nf | 19 ++++----- subworkflows/transfer.nf | 22 ---------- workflows/build.nf | 2 +- workflows/extend.nf | 30 ++++++++++---- 10 files changed, 79 insertions(+), 85 deletions(-) delete mode 100644 subworkflows/transfer.nf diff --git a/conf/modes/build-extend.config b/conf/modes/build-extend.config index 2557e54..e021090 100644 --- a/conf/modes/build-extend.config +++ b/conf/modes/build-extend.config @@ -4,6 +4,7 @@ params { min_cells = 50 cell_cycle = true custom_hvgs = [] + integration_hvgs = 10000 benchmark_hvgs = 0 normalization_method = "log_total" diff --git a/conf/modes/build.config b/conf/modes/build.config index 34e3e53..9092048 100644 --- a/conf/modes/build.config +++ b/conf/modes/build.config @@ -1,4 +1,3 @@ params { - integration_hvgs = 10000 integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] } \ No newline at end of file diff --git a/modules/identify_hvgs.nf b/modules/identify_hvgs.nf index 621a5e9..5bf79fc 100644 --- a/modules/identify_hvgs.nf +++ b/modules/identify_hvgs.nf @@ -9,6 +9,7 @@ process IDENTIFY_HVGS { tuple val(meta), path(adata) val(n_hvgs) val(custom_hvgs) + val(hvgs_existing) output: tuple val(meta), path("${meta.id}.hvgs.pkl") @@ -17,36 +18,48 @@ process IDENTIFY_HVGS { task.ext.when == null || task.ext.when script: - """ - #!/opt/conda/bin/python - - import scanpy as sc - - adata = sc.read_h5ad("${adata}") - - span = 0.3 # default - worked = False - - while not worked and span <= 1: - try: - sc.pp.highly_variable_genes(adata, - n_top_genes=10000, - flavor="seurat_v3", - span=span, - batch_key="batch") - worked = True - except: - span += 0.1 - print(f"Increased span to {span}") - - custom_hvgs = "${custom_hvgs.join(" ")}".split(" ") - custom_hvgs = [x.upper().replace("_", "-").replace(".", "-") for x in custom_hvgs] - custom_hvgs = [x for x in custom_hvgs if x in adata.var_names] + if (hvgs_existing) { + """ + #!/opt/conda/bin/python + + import scanpy as sc + + adata = sc.read_h5ad("${adata}") + + adata.var[["highly_variable"]].to_pickle("${meta.id}.hvgs.pkl") + """ + } else { + """ + #!/opt/conda/bin/python + + import scanpy as sc + + adata = sc.read_h5ad("${adata}") + + span = 0.3 # default + worked = False + + while not worked and span <= 1: + try: + sc.pp.highly_variable_genes(adata, + n_top_genes=10000, + flavor="seurat_v3", + span=span, + batch_key="batch") + worked = True + except: + span += 0.1 + print(f"Increased span to {span}") + + custom_hvgs = "${custom_hvgs.join(" ")}".split(" ") + custom_hvgs = [x.upper().replace("_", "-").replace(".", "-") for x in custom_hvgs] + custom_hvgs = [x for x in custom_hvgs if x in adata.var_names] - if len(custom_hvgs) > 0: - # Set "highly_variable" to True for custom HVGs - adata.var.loc[custom_hvgs, "highly_variable"] = True + if len(custom_hvgs) > 0: + # Set "highly_variable" to True for custom HVGs + adata.var.loc[custom_hvgs, "highly_variable"] = True - adata.var[["highly_variable"]].to_pickle("${meta.id}.hvgs.pkl") - """ + adata.var[["highly_variable"]].to_pickle("${meta.id}.hvgs.pkl") + """ + } } \ No newline at end of file diff --git a/modules/integrate_scarches.nf b/modules/integrate_scarches.nf index c53bb8e..a02f3b7 100644 --- a/modules/integrate_scarches.nf +++ b/modules/integrate_scarches.nf @@ -6,9 +6,8 @@ process INTEGRATE_SCARCHES { input: tuple val(meta), path(query) - tuple val(meta2), path(reference), path(base_model, stageAs: "base_model") + tuple val(meta2), path(base_model, stageAs: "base_model") val(has_celltypes) - tuple val(meta3), path(hvgs) output: tuple val(meta_out), path("${method}.integrated.h5ad"), emit: integrated @@ -27,20 +26,13 @@ process INTEGRATE_SCARCHES { reference_model_path = "${base_model}" surgery_model_path = "${model}" - reference_path = "${reference}" query_path = "${query}" - adata_reference = ad.read_h5ad(reference_path) adata_query = ad.read_h5ad(query_path) - df_hvgs = pd.read_pickle("${hvgs}") - adata_query = adata_query[:, df_hvgs[df_hvgs["highly_variable"]].index.to_list()].copy() adata_output = adata_query.copy() if ${has_celltypes ? "True" : "False"}: - known_cell_types = adata_reference.obs["cell_type"].unique() - adata_query.obs["cell_type"] = adata_query.obs["cell_type"].map(lambda original: original if original in known_cell_types else "Unknown") - sca.models.SCANVI.prepare_query_anndata( adata=adata_query, reference_model=reference_model_path ) diff --git a/subworkflows/doublets.nf b/subworkflows/doublets.nf index 4dd133a..27c1257 100644 --- a/subworkflows/doublets.nf +++ b/subworkflows/doublets.nf @@ -9,7 +9,7 @@ workflow DOUBLETS { take: ch_adata ch_hvgs - ch_scanvi_model + ch_model ch_integrations ch_raw @@ -25,7 +25,7 @@ workflow DOUBLETS { SOLO( PREPARE_SOLO.out.adata.collect(), - ch_scanvi_model.collect(), + ch_model.collect(), params.has_celltypes, ch_batches ) diff --git a/subworkflows/integration.nf b/subworkflows/integration.nf index c266980..3bdc8cc 100644 --- a/subworkflows/integration.nf +++ b/subworkflows/integration.nf @@ -2,8 +2,6 @@ include { INTEGRATE } from "../modules/integrate.nf" include { INTEGRATE as INTEGRATE_GPU } from "../modules/integrate.nf" include { INTEGRATE as INTEGRATE_SCVI } from "../modules/integrate.nf" include { INTEGRATE_SCANVI } from "../modules/integrate_scanvi.nf" -include { INTEGRATE_SCARCHES } from "../modules/integrate_scarches.nf" -include { MERGE_EXTENDED } from "../modules/merge_extended.nf" include { BENCHMARKING } from "./benchmarking.nf" diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index 5c05b0c..44ab265 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -14,6 +14,7 @@ workflow PREPROCESSING { take: ch_samplesheet ch_base + hvgs_existing main: ch_samples = Channel.from(check_samplesheet(ch_samplesheet.toString())) @@ -53,21 +54,17 @@ workflow PREPROCESSING { COMPOSITION(ch_adata_intersection) DISTRIBUTION(ch_adata_intersection) - if (params.mode != "extend") { - IDENTIFY_HVGS( - ch_adata_intersection, - params.integration_hvgs, - params.custom_hvgs - ) - ch_hvgs = IDENTIFY_HVGS.out - } else { - ch_hvgs = Channel.empty() - } + IDENTIFY_HVGS( + ch_adata_intersection, + params.integration_hvgs, + params.custom_hvgs, + hvgs_existing + ) emit: intersection = ch_adata_intersection union = ch_adata_union transfer = ch_adata_transfer - hvgs = ch_hvgs + hvgs = IDENTIFY_HVGS.out } \ No newline at end of file diff --git a/subworkflows/transfer.nf b/subworkflows/transfer.nf deleted file mode 100644 index 1b2c2b0..0000000 --- a/subworkflows/transfer.nf +++ /dev/null @@ -1,22 +0,0 @@ -workflow TRANSFER { - take: - ch_base - ch_model - ch_extension - - main: - INTEGRATE_SCARCHES( - ch_extension, - ch_base.join(ch_model), - params.has_celltypes, - ch_hvgs - ) - - MERGE_EXTENDED( - INTEGRATE_SCARCHES.out.integrated, - ch_core_integrated - ) - - emit: - -} \ No newline at end of file diff --git a/workflows/build.nf b/workflows/build.nf index 9cb139a..7b2bb90 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -17,7 +17,7 @@ workflow BUILD { ch_samplesheet = file(params.samplesheet) - PREPROCESSING(ch_samplesheet, Channel.value([[], []])) + PREPROCESSING(ch_samplesheet, Channel.value([[], []]), false) ch_adata_intersection = PREPROCESSING.out.intersection ch_adata_union = PREPROCESSING.out.union diff --git a/workflows/extend.nf b/workflows/extend.nf index 7ed6af6..1523f7c 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -1,12 +1,13 @@ // Modules -include { CELLTYPIST } from "../modules/celltypist.nf" -include { CELL_CYCLE } from "../modules/cell_cycle.nf" -include { CELL_QC } from "../modules/cell_qc.nf" +include { CELLTYPIST } from "../modules/celltypist" +include { CELL_CYCLE } from "../modules/cell_cycle" +include { CELL_QC } from "../modules/cell_qc" +include { INTEGRATE_SCARCHES } from "../modules/integrate_scarches" // Workflows -include { PREPROCESSING } from "../subworkflows/preprocessing.nf" -include { COUNTS } from "../subworkflows/counts.nf" - +include { PREPROCESSING } from "../subworkflows/preprocessing" +include { COUNTS } from "../subworkflows/counts" +include { DOUBLETS } from "../subworkflows/doublets.nf" workflow EXTEND { if (!params.samplesheet) { @@ -27,11 +28,12 @@ workflow EXTEND { ch_model = Channel.value(file(params.model)).map{ model -> [[id: "model"], model]} - PREPROCESSING(ch_samplesheet, ch_base) + PREPROCESSING(ch_samplesheet, ch_base, true) ch_adata_intersection = PREPROCESSING.out.intersection ch_adata_union = PREPROCESSING.out.union ch_adata_transfer = PREPROCESSING.out.transfer + ch_hvgs = PREPROCESSING.out.hvgs COUNTS(ch_adata_union, params.normalization_method) @@ -47,6 +49,20 @@ workflow EXTEND { CELL_QC(ch_adata_intersection) + INTEGRATE_SCARCHES( + ch_adata_transfer, + ch_model, + params.has_celltypes + ) + + DOUBLETS( + ch_adata_transfer, + ch_hvgs, + INTEGRATE_SCARCHES.out.model, + INTEGRATE_SCARCHES.out.integrated, + COUNTS.out + ) + ch_adata = Channel.empty() ch_obsm = Channel.empty() ch_obs = Channel.empty() From bdb28ded6c682e874675ee71f4e46cb62396e275 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 14:34:19 +0200 Subject: [PATCH 14/20] Add HVGs to build output --- main.nf | 3 ++- modules/merge.nf | 12 ++++++++++++ workflows/build.nf | 3 +++ workflows/extend.nf | 2 ++ workflows/sub.nf | 3 +++ 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/main.nf b/main.nf index 2d1fab6..ae9aaa4 100644 --- a/main.nf +++ b/main.nf @@ -32,7 +32,8 @@ workflow { MERGE ( subworkflow.adata, subworkflow.obsm.map{ meta, obsm -> obsm}.collect(), - subworkflow.obs.map{ meta, obs -> obs}.collect() + subworkflow.obs.map{ meta, obs -> obs}.collect(), + subworkflow.var.map{ meta, var -> var}.collect() ) RENAME_INTEGRATIONS(MERGE.out.adata) diff --git a/modules/merge.nf b/modules/merge.nf index 78b016c..f3516ca 100644 --- a/modules/merge.nf +++ b/modules/merge.nf @@ -10,6 +10,7 @@ process MERGE { tuple val(meta), path(base, stageAs: 'base.h5ad') path(obsm) path(obs) + path(var) output: tuple val(meta), file("merged.h5ad"), emit: adata @@ -51,6 +52,17 @@ process MERGE { if adata.obs[col].dtype == np.float64: adata.obs[col] = adata.obs[col].astype(np.float32) + var_paths = "${var}".split(' ') + + for var_path in var_paths: + df = pd.read_pickle(var_path) + df = df.reindex(adata.var_names) + adata.var = pd.concat([adata.var, df], axis=1) + + for col in adata.var.columns: + if adata.var[col].dtype == np.float64: + adata.var[col] = adata.var[col].astype(np.float32) + adata.write('merged.h5ad') adata.obs.to_pickle('metadata.pkl') """ diff --git a/workflows/build.nf b/workflows/build.nf index 7b2bb90..6121a3b 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -70,8 +70,11 @@ workflow BUILD { DOUBLETS.out.obsm ) + ch_var = ch_hvgs + emit: adata = DOUBLETS.out.counts obsm = ch_obsm obs = ch_obs + var = ch_var } diff --git a/workflows/extend.nf b/workflows/extend.nf index 1523f7c..eec1a05 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -66,9 +66,11 @@ workflow EXTEND { ch_adata = Channel.empty() ch_obsm = Channel.empty() ch_obs = Channel.empty() + ch_var = Channel.empty() emit: adata = ch_adata obsm = ch_obsm obs = ch_obs + var = ch_var } \ No newline at end of file diff --git a/workflows/sub.nf b/workflows/sub.nf index f16e0ed..f2addbb 100644 --- a/workflows/sub.nf +++ b/workflows/sub.nf @@ -32,8 +32,11 @@ workflow SUB { ch_obs = CLUSTERING.out.obs + ch_var = Channel.empty() + emit: adata = ch_adata obsm = ch_obsm obs = ch_obs + var = ch_var } \ No newline at end of file From c561298e422879e6fc8c1f742aaec649800d91be Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 15:47:38 +0200 Subject: [PATCH 15/20] Fix gene problems in SOLO --- bin/merge_datasets.py | 12 +++++++++--- modules/get_batches.nf | 26 ++++++++++++++++++++++++++ modules/prepare_solo.nf | 33 --------------------------------- modules/solo.nf | 17 ++++++++++++----- subworkflows/doublets.nf | 20 +++++++++----------- workflows/build.nf | 1 - workflows/extend.nf | 1 - 7 files changed, 56 insertions(+), 54 deletions(-) create mode 100644 modules/get_batches.nf delete mode 100644 modules/prepare_solo.nf diff --git a/bin/merge_datasets.py b/bin/merge_datasets.py index 6af50ff..a7b4708 100755 --- a/bin/merge_datasets.py +++ b/bin/merge_datasets.py @@ -88,9 +88,15 @@ def to_Florent_case(s: str): adata_intersection.layers["counts"] = adata_intersection.X adata_union.layers["counts"] = adata_union.X +if args.base: + adata_transfer = adata_intersection[~adata_intersection.obs.index.isin(adata_base.obs.index)] + adata_transfer.write_h5ad(args.output_transfer) + + highly_variable_genes_set = set(adata_base.var[adata_base.var["highly_variable"]].index) + + adata_intersection.var["highly_variable"] = adata_intersection.var.index.isin(highly_variable_genes_set) + adata_union.var["highly_variable"] = adata_union.var.index.isin(highly_variable_genes_set) + adata_intersection.write_h5ad(args.output_intersection) adata_union.write_h5ad(args.output_union) -if args.base: - adata_transfer = adata_intersection[~adata_intersection.obs.index.isin(adata_base.obs.index)] - adata_transfer.write_h5ad(args.output_transfer) \ No newline at end of file diff --git a/modules/get_batches.nf b/modules/get_batches.nf new file mode 100644 index 0000000..52c5634 --- /dev/null +++ b/modules/get_batches.nf @@ -0,0 +1,26 @@ +process GET_BATCHES { + tag "$meta.id" + + container "bigdatainbiomedicine/sc-rpy:1.0" + + label "process_medium" + + input: + tuple val(meta), path(adata) + + output: + tuple val(meta), path("batches.txt") + + script: + """ + #!/opt/conda/bin/python + + import anndata as ad + import pandas as pd + + adata = ad.read_h5ad("${adata}") + + with open("batches.txt", "w") as f: + f.write("\\n".join(adata.obs["batch"].unique())) + """ +} \ No newline at end of file diff --git a/modules/prepare_solo.nf b/modules/prepare_solo.nf deleted file mode 100644 index b995d18..0000000 --- a/modules/prepare_solo.nf +++ /dev/null @@ -1,33 +0,0 @@ -process PREPARE_SOLO { - tag "$meta.id" - - container "bigdatainbiomedicine/sc-rpy:1.0" - - label "process_medium" - - input: - tuple val(meta), path(adata) - tuple val(meta3), path(hvgs) - - output: - tuple val(meta), path("${meta.id}.hvg.h5ad"), emit: adata - tuple val(meta), path("batches.txt"), emit: batches - - script: - """ - #!/opt/conda/bin/python - - import anndata as ad - import pandas as pd - - adata = ad.read_h5ad("${adata}") - df_hvgs = pd.read_pickle("${hvgs}") - - adata_hvg = adata[:, df_hvgs[df_hvgs["highly_variable"]].index] - - with open("batches.txt", "w") as f: - f.write("\\n".join(adata_hvg.obs["batch"].unique())) - - adata_hvg.write_h5ad("${meta.id}.hvg.h5ad") - """ -} \ No newline at end of file diff --git a/modules/solo.nf b/modules/solo.nf index 9e085ee..3b8bd09 100644 --- a/modules/solo.nf +++ b/modules/solo.nf @@ -26,15 +26,22 @@ process SOLO { threadpool_limits(${task.cpus}) adata = sc.read_h5ad("${adata}") - adata_batch = adata[adata.obs.batch == "${batch}"] + model_path = "${scvi_model}" + if ${has_celltypes ? "True" : "False"}: - scvi.model.SCANVI.setup_anndata(adata, batch_key="batch", labels_key="cell_type", unlabeled_category="Unknown") - scvi_model = scvi.model.SCANVI.load("${scvi_model}", adata=adata) + # scvi.model.SCANVI.setup_anndata(adata, batch_key="batch", labels_key="cell_type", unlabeled_category="Unknown") + scvi.model.SCANVI.prepare_query_anndata( + adata=adata, reference_model=model_path + ) + scvi_model = scvi.model.SCANVI.load(model_path, adata=adata) else: - scvi.model.SCVI.setup_anndata(adata, batch_key="batch") - scvi_model = scvi.model.SCVI.load("${scvi_model}", adata=adata) + # scvi.model.SCVI.setup_anndata(adata, batch_key="batch") + scvi.model.SCVI.prepare_query_anndata( + adata=adata, reference_model=model_path + ) + scvi_model = scvi.model.SCVI.load(model_path, adata=adata) solo = scvi.external.SOLO.from_scvi_model(scvi_model, restrict_to_batch="${batch}") diff --git a/subworkflows/doublets.nf b/subworkflows/doublets.nf index 27c1257..3a4900d 100644 --- a/subworkflows/doublets.nf +++ b/subworkflows/doublets.nf @@ -1,30 +1,28 @@ -include { PREPARE_SOLO } from "../modules/prepare_solo.nf" -include { SOLO } from "../modules/solo.nf" -include { DEDOUBLET_ADATA as DEDOUBLET_INTEGRATIONS } from "../modules/dedoublet_adata.nf" -include { DEDOUBLET_ADATA as DEDOUBLET_COUNTS } from "../modules/dedoublet_adata.nf" -include { EXTRACT_EMBEDDING } from "../modules/extract_embedding.nf" +include { GET_BATCHES } from "../modules/get_batches" +include { SOLO } from "../modules/solo" +include { DEDOUBLET_ADATA as DEDOUBLET_INTEGRATIONS } from "../modules/dedoublet_adata" +include { DEDOUBLET_ADATA as DEDOUBLET_COUNTS } from "../modules/dedoublet_adata" +include { EXTRACT_EMBEDDING } from "../modules/extract_embedding" workflow DOUBLETS { take: ch_adata - ch_hvgs ch_model ch_integrations ch_raw main: - PREPARE_SOLO( - ch_adata, - ch_hvgs + GET_BATCHES( + ch_adata ) - ch_batches = PREPARE_SOLO.out.batches + ch_batches = GET_BATCHES.out .splitText().flatten() .map{ batch -> batch.replace("\n", "") } SOLO( - PREPARE_SOLO.out.adata.collect(), + ch_adata, ch_model.collect(), params.has_celltypes, ch_batches diff --git a/workflows/build.nf b/workflows/build.nf index 6121a3b..0b5a1f8 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -46,7 +46,6 @@ workflow BUILD { DOUBLETS( ch_adata_intersection, - ch_hvgs, INTEGRATION.out.model, INTEGRATION.out.integrated, COUNTS.out diff --git a/workflows/extend.nf b/workflows/extend.nf index eec1a05..b5d8a24 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -57,7 +57,6 @@ workflow EXTEND { DOUBLETS( ch_adata_transfer, - ch_hvgs, INTEGRATE_SCARCHES.out.model, INTEGRATE_SCARCHES.out.integrated, COUNTS.out From 7d395c3912969706459c1f6717247bee60733ace Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 16:26:21 +0200 Subject: [PATCH 16/20] Remove HVG identification from extend workflow --- bin/merge_datasets.py | 5 --- conf/modes/extend.config | 3 -- modules/identify_hvgs.nf | 73 ++++++++++++++--------------------- modules/merge_extended.nf | 3 ++ subworkflows/preprocessing.nf | 20 ++++++---- workflows/build.nf | 2 +- workflows/extend.nf | 34 +++++++++++++--- 7 files changed, 74 insertions(+), 66 deletions(-) diff --git a/bin/merge_datasets.py b/bin/merge_datasets.py index a7b4708..155fdb1 100755 --- a/bin/merge_datasets.py +++ b/bin/merge_datasets.py @@ -92,11 +92,6 @@ def to_Florent_case(s: str): adata_transfer = adata_intersection[~adata_intersection.obs.index.isin(adata_base.obs.index)] adata_transfer.write_h5ad(args.output_transfer) - highly_variable_genes_set = set(adata_base.var[adata_base.var["highly_variable"]].index) - - adata_intersection.var["highly_variable"] = adata_intersection.var.index.isin(highly_variable_genes_set) - adata_union.var["highly_variable"] = adata_union.var.index.isin(highly_variable_genes_set) - adata_intersection.write_h5ad(args.output_intersection) adata_union.write_h5ad(args.output_union) diff --git a/conf/modes/extend.config b/conf/modes/extend.config index 9c3debc..4075880 100644 --- a/conf/modes/extend.config +++ b/conf/modes/extend.config @@ -1,7 +1,4 @@ params { base = null model = null -} - -process { } \ No newline at end of file diff --git a/modules/identify_hvgs.nf b/modules/identify_hvgs.nf index 5bf79fc..621a5e9 100644 --- a/modules/identify_hvgs.nf +++ b/modules/identify_hvgs.nf @@ -9,7 +9,6 @@ process IDENTIFY_HVGS { tuple val(meta), path(adata) val(n_hvgs) val(custom_hvgs) - val(hvgs_existing) output: tuple val(meta), path("${meta.id}.hvgs.pkl") @@ -18,48 +17,36 @@ process IDENTIFY_HVGS { task.ext.when == null || task.ext.when script: - if (hvgs_existing) { - """ - #!/opt/conda/bin/python - - import scanpy as sc - - adata = sc.read_h5ad("${adata}") - - adata.var[["highly_variable"]].to_pickle("${meta.id}.hvgs.pkl") - """ - } else { - """ - #!/opt/conda/bin/python - - import scanpy as sc - - adata = sc.read_h5ad("${adata}") - - span = 0.3 # default - worked = False - - while not worked and span <= 1: - try: - sc.pp.highly_variable_genes(adata, - n_top_genes=10000, - flavor="seurat_v3", - span=span, - batch_key="batch") - worked = True - except: - span += 0.1 - print(f"Increased span to {span}") - - custom_hvgs = "${custom_hvgs.join(" ")}".split(" ") - custom_hvgs = [x.upper().replace("_", "-").replace(".", "-") for x in custom_hvgs] - custom_hvgs = [x for x in custom_hvgs if x in adata.var_names] + """ + #!/opt/conda/bin/python + + import scanpy as sc + + adata = sc.read_h5ad("${adata}") + + span = 0.3 # default + worked = False + + while not worked and span <= 1: + try: + sc.pp.highly_variable_genes(adata, + n_top_genes=10000, + flavor="seurat_v3", + span=span, + batch_key="batch") + worked = True + except: + span += 0.1 + print(f"Increased span to {span}") + + custom_hvgs = "${custom_hvgs.join(" ")}".split(" ") + custom_hvgs = [x.upper().replace("_", "-").replace(".", "-") for x in custom_hvgs] + custom_hvgs = [x for x in custom_hvgs if x in adata.var_names] - if len(custom_hvgs) > 0: - # Set "highly_variable" to True for custom HVGs - adata.var.loc[custom_hvgs, "highly_variable"] = True + if len(custom_hvgs) > 0: + # Set "highly_variable" to True for custom HVGs + adata.var.loc[custom_hvgs, "highly_variable"] = True - adata.var[["highly_variable"]].to_pickle("${meta.id}.hvgs.pkl") - """ - } + adata.var[["highly_variable"]].to_pickle("${meta.id}.hvgs.pkl") + """ } \ No newline at end of file diff --git a/modules/merge_extended.nf b/modules/merge_extended.nf index 6b029ac..56c26ff 100644 --- a/modules/merge_extended.nf +++ b/modules/merge_extended.nf @@ -8,6 +8,7 @@ process MERGE_EXTENDED { input: tuple val(meta), path(extension) tuple val(meta2), path(core) + val(integration_key) output: tuple val(meta), path("${meta.id}.merged.h5ad") @@ -19,6 +20,8 @@ process MERGE_EXTENDED { import anndata as ad adata_core = ad.read_h5ad("${core}") + adata_core.obsm["X_emb"] = adata_core.obsm["${integration_key}"] + adata_extension = ad.read_h5ad("${extension}") adata_extended = ad.concat([adata_core, adata_extension]) diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index 44ab265..720980c 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -14,7 +14,6 @@ workflow PREPROCESSING { take: ch_samplesheet ch_base - hvgs_existing main: ch_samples = Channel.from(check_samplesheet(ch_samplesheet.toString())) @@ -54,17 +53,22 @@ workflow PREPROCESSING { COMPOSITION(ch_adata_intersection) DISTRIBUTION(ch_adata_intersection) - IDENTIFY_HVGS( - ch_adata_intersection, - params.integration_hvgs, - params.custom_hvgs, - hvgs_existing - ) + if (params.mode == "build") { + IDENTIFY_HVGS( + ch_adata_intersection, + params.integration_hvgs, + params.custom_hvgs + ) + ch_hvgs = IDENTIFY_HVGS.out + } else { + ch_hvgs = Channel.empty() + } + emit: intersection = ch_adata_intersection union = ch_adata_union transfer = ch_adata_transfer - hvgs = IDENTIFY_HVGS.out + hvgs = ch_hvgs } \ No newline at end of file diff --git a/workflows/build.nf b/workflows/build.nf index 0b5a1f8..f6fa791 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -17,7 +17,7 @@ workflow BUILD { ch_samplesheet = file(params.samplesheet) - PREPROCESSING(ch_samplesheet, Channel.value([[], []]), false) + PREPROCESSING(ch_samplesheet, Channel.value([[], []])) ch_adata_intersection = PREPROCESSING.out.intersection ch_adata_union = PREPROCESSING.out.union diff --git a/workflows/extend.nf b/workflows/extend.nf index b5d8a24..45c07f1 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -3,11 +3,14 @@ include { CELLTYPIST } from "../modules/celltypist" include { CELL_CYCLE } from "../modules/cell_cycle" include { CELL_QC } from "../modules/cell_qc" include { INTEGRATE_SCARCHES } from "../modules/integrate_scarches" +include { MERGE_EXTENDED } from "../modules/merge_extended" // Workflows include { PREPROCESSING } from "../subworkflows/preprocessing" include { COUNTS } from "../subworkflows/counts" include { DOUBLETS } from "../subworkflows/doublets.nf" +include { CLUSTERING } from "../subworkflows/clustering.nf" + workflow EXTEND { if (!params.samplesheet) { @@ -28,12 +31,11 @@ workflow EXTEND { ch_model = Channel.value(file(params.model)).map{ model -> [[id: "model"], model]} - PREPROCESSING(ch_samplesheet, ch_base, true) + PREPROCESSING(ch_samplesheet, ch_base) ch_adata_intersection = PREPROCESSING.out.intersection ch_adata_union = PREPROCESSING.out.union ch_adata_transfer = PREPROCESSING.out.transfer - ch_hvgs = PREPROCESSING.out.hvgs COUNTS(ch_adata_union, params.normalization_method) @@ -62,13 +64,33 @@ workflow EXTEND { COUNTS.out ) - ch_adata = Channel.empty() - ch_obsm = Channel.empty() - ch_obs = Channel.empty() + MERGE_EXTENDED( + DOUBLETS.out.integrations.collect(), + ch_base, + params.has_celltypes ? "scanvi" : "scvi" + ) + + CLUSTERING( + MERGE_EXTENDED.out, + Channel.from(params.leiden_resolutions), + CELLTYPIST.out, + Channel.value(params.entropy_initial_smoothness) + ) + + ch_obs = CLUSTERING.out.obs.mix( + CELL_CYCLE.out, + CELLTYPIST.out, + CELL_QC.out + ) + + ch_obsm = CLUSTERING.out.obsm.mix( + DOUBLETS.out.obsm + ) + ch_var = Channel.empty() emit: - adata = ch_adata + adata = DOUBLETS.out.counts obsm = ch_obsm obs = ch_obs var = ch_var From 555a610424e94eff5809a75048192ed5c0047322 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 16:39:02 +0200 Subject: [PATCH 17/20] Improve config structure --- conf/modes/build-extend.config | 3 --- conf/modes/build.config | 3 +++ subworkflows/preprocessing.nf | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/conf/modes/build-extend.config b/conf/modes/build-extend.config index e021090..11c2187 100644 --- a/conf/modes/build-extend.config +++ b/conf/modes/build-extend.config @@ -3,9 +3,6 @@ params { celltypist_model = null min_cells = 50 cell_cycle = true - custom_hvgs = [] - integration_hvgs = 10000 - benchmark_hvgs = 0 normalization_method = "log_total" upset_only = false diff --git a/conf/modes/build.config b/conf/modes/build.config index 9092048..1ba4724 100644 --- a/conf/modes/build.config +++ b/conf/modes/build.config @@ -1,3 +1,6 @@ params { integration_methods = ["scvi", "scanvi", "harmony", "scgen", "scanorama", "bbknn", "desc", "combat", "trvaep"] + custom_hvgs = [] + integration_hvgs = 10000 + benchmark_hvgs = 0 } \ No newline at end of file diff --git a/subworkflows/preprocessing.nf b/subworkflows/preprocessing.nf index 720980c..bc6045a 100644 --- a/subworkflows/preprocessing.nf +++ b/subworkflows/preprocessing.nf @@ -38,7 +38,7 @@ workflow PREPROCESSING { PREPROCESS.out.adata.map{ meta, adata -> adata }.collect(), ch_base.collect(), params.min_cells, - params.custom_hvgs + params.mode == "build" ? params.custom_hvgs : [] ) ch_adata_intersection = MERGE_DATASETS.out.intersection From 68712bf2cfcd5dd9ac074c6885bfb0e4295aff9a Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 16:42:40 +0200 Subject: [PATCH 18/20] Fix problems with non-existing var channels --- main.nf | 1 - modules/merge.nf | 12 ------------ workflows/build.nf | 3 --- workflows/extend.nf | 3 --- workflows/sub.nf | 3 --- 5 files changed, 22 deletions(-) diff --git a/main.nf b/main.nf index ae9aaa4..4be9b5c 100644 --- a/main.nf +++ b/main.nf @@ -33,7 +33,6 @@ workflow { subworkflow.adata, subworkflow.obsm.map{ meta, obsm -> obsm}.collect(), subworkflow.obs.map{ meta, obs -> obs}.collect(), - subworkflow.var.map{ meta, var -> var}.collect() ) RENAME_INTEGRATIONS(MERGE.out.adata) diff --git a/modules/merge.nf b/modules/merge.nf index f3516ca..78b016c 100644 --- a/modules/merge.nf +++ b/modules/merge.nf @@ -10,7 +10,6 @@ process MERGE { tuple val(meta), path(base, stageAs: 'base.h5ad') path(obsm) path(obs) - path(var) output: tuple val(meta), file("merged.h5ad"), emit: adata @@ -52,17 +51,6 @@ process MERGE { if adata.obs[col].dtype == np.float64: adata.obs[col] = adata.obs[col].astype(np.float32) - var_paths = "${var}".split(' ') - - for var_path in var_paths: - df = pd.read_pickle(var_path) - df = df.reindex(adata.var_names) - adata.var = pd.concat([adata.var, df], axis=1) - - for col in adata.var.columns: - if adata.var[col].dtype == np.float64: - adata.var[col] = adata.var[col].astype(np.float32) - adata.write('merged.h5ad') adata.obs.to_pickle('metadata.pkl') """ diff --git a/workflows/build.nf b/workflows/build.nf index f6fa791..d4d4031 100644 --- a/workflows/build.nf +++ b/workflows/build.nf @@ -69,11 +69,8 @@ workflow BUILD { DOUBLETS.out.obsm ) - ch_var = ch_hvgs - emit: adata = DOUBLETS.out.counts obsm = ch_obsm obs = ch_obs - var = ch_var } diff --git a/workflows/extend.nf b/workflows/extend.nf index 45c07f1..a32cd30 100644 --- a/workflows/extend.nf +++ b/workflows/extend.nf @@ -86,12 +86,9 @@ workflow EXTEND { ch_obsm = CLUSTERING.out.obsm.mix( DOUBLETS.out.obsm ) - - ch_var = Channel.empty() emit: adata = DOUBLETS.out.counts obsm = ch_obsm obs = ch_obs - var = ch_var } \ No newline at end of file diff --git a/workflows/sub.nf b/workflows/sub.nf index f2addbb..f16e0ed 100644 --- a/workflows/sub.nf +++ b/workflows/sub.nf @@ -32,11 +32,8 @@ workflow SUB { ch_obs = CLUSTERING.out.obs - ch_var = Channel.empty() - emit: adata = ch_adata obsm = ch_obsm obs = ch_obs - var = ch_var } \ No newline at end of file From 606ff29246c2d218881550d510ad0f43d01f37c6 Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 17:01:01 +0200 Subject: [PATCH 19/20] Fix cellQC too few genes bug --- modules/cell_qc.nf | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/cell_qc.nf b/modules/cell_qc.nf index 6501721..920fcaa 100644 --- a/modules/cell_qc.nf +++ b/modules/cell_qc.nf @@ -23,13 +23,13 @@ process CELL_QC { adata = ad.read_h5ad("${adata}") adata.var["mito"] = adata.var_names.str.lower().str.startswith("mt-") - sc.pp.calculate_qc_metrics(adata, qc_vars=["mito"], inplace=True) + n_genes = adata.X.shape[1] + top_genes = [n for n in [50, 100, 200, 500] if n <= n_genes] + sc.pp.calculate_qc_metrics(adata, qc_vars=["mito"], inplace=True, percent_top=top_genes) qc_columns = ['n_genes_by_counts', 'log1p_n_genes_by_counts', 'total_counts', - 'log1p_total_counts', 'pct_counts_in_top_50_genes', - 'pct_counts_in_top_100_genes', 'pct_counts_in_top_200_genes', - 'pct_counts_in_top_500_genes', 'total_counts_mito', - 'log1p_total_counts_mito', 'pct_counts_mito'] + 'log1p_total_counts', 'total_counts_mito', + 'log1p_total_counts_mito', 'pct_counts_mito'] + [f"pct_counts_in_top_{n}_genes" for n in top_genes] qc = adata.obs[qc_columns].rename(columns={column: f"qc:{column}" for column in qc_columns}) From 7616379f7f1ab33347d88366f8d61953dc031f6f Mon Sep 17 00:00:00 2001 From: Nico Trummer Date: Mon, 8 Apr 2024 18:11:00 +0200 Subject: [PATCH 20/20] Fix problem with new cell labels in extend workflow --- bin/merge_datasets.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bin/merge_datasets.py b/bin/merge_datasets.py index 155fdb1..bb8e12b 100755 --- a/bin/merge_datasets.py +++ b/bin/merge_datasets.py @@ -90,6 +90,10 @@ def to_Florent_case(s: str): if args.base: adata_transfer = adata_intersection[~adata_intersection.obs.index.isin(adata_base.obs.index)] + + known_celltypes = adata_base.obs["cell_type"].unique() + adata_transfer.obs["cell_type"] = adata_transfer.obs["cell_type"].map(lambda x: x if x in known_celltypes else "Unknown") + adata_transfer.write_h5ad(args.output_transfer) adata_intersection.write_h5ad(args.output_intersection)