From c1b365f24459f8b69033b1c2d4632f322beab73a Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Wed, 28 Aug 2024 15:40:57 -0400 Subject: [PATCH] Add maxCores to settings (#160) --- R/Module-Characterization.R | 2 +- R/Module-CohortDiagnostics.R | 2 +- R/Module-CohortGenerator.R | 2 +- R/Module-CohortIncidence.R | 2 +- R/Module-CohortMethod.R | 6 +++--- R/Module-EvidenceSynthesis.R | 4 ++-- R/Module-PatientLevelPrediction.R | 2 +- R/Module-SelfControlledCaseSeries.R | 4 ++-- R/Module-StrategusModule.R | 10 ++++++++++ R/Settings.R | 13 ++++++++++--- man/createCdmExecutionSettings.Rd | 6 +++++- man/createResultsExecutionSettings.Rd | 6 +++++- 12 files changed, 42 insertions(+), 17 deletions(-) diff --git a/R/Module-Characterization.R b/R/Module-Characterization.R index 1c91f5ac..7a34e7f2 100644 --- a/R/Module-Characterization.R +++ b/R/Module-Characterization.R @@ -19,8 +19,8 @@ CharacterizationModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") jobContext <- private$jobContext workFolder <- jobContext$moduleExecutionSettings$workSubFolder diff --git a/R/Module-CohortDiagnostics.R b/R/Module-CohortDiagnostics.R index 8a16b5c6..0f9acf06 100644 --- a/R/Module-CohortDiagnostics.R +++ b/R/Module-CohortDiagnostics.R @@ -19,8 +19,8 @@ CohortDiagnosticsModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") jobContext <- private$jobContext cohortDefinitionSet <- super$.createCohortDefinitionSetFromJobContext() diff --git a/R/Module-CohortGenerator.R b/R/Module-CohortGenerator.R index cccd4f09..2ea332e9 100644 --- a/R/Module-CohortGenerator.R +++ b/R/Module-CohortGenerator.R @@ -24,8 +24,8 @@ CohortGeneratorModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") jobContext <- private$jobContext cohortDefinitionSet <- super$.createCohortDefinitionSetFromJobContext() diff --git a/R/Module-CohortIncidence.R b/R/Module-CohortIncidence.R index f262f173..7596d34e 100644 --- a/R/Module-CohortIncidence.R +++ b/R/Module-CohortIncidence.R @@ -18,8 +18,8 @@ CohortIncidenceModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") refId <- 1 # this should be part of execution settings resultsFolder <- private$jobContext$moduleExecutionSettings$resultsSubFolder diff --git a/R/Module-CohortMethod.R b/R/Module-CohortMethod.R index d22f5735..533d2ccf 100644 --- a/R/Module-CohortMethod.R +++ b/R/Module-CohortMethod.R @@ -17,11 +17,11 @@ CohortMethodModule <- R6::R6Class( #' @param analysisSpecifications The analysis specifications for the study #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") jobContext <- private$jobContext - multiThreadingSettings <- CohortMethod::createDefaultMultiThreadingSettings(parallel::detectCores()) + multiThreadingSettings <- CohortMethod::createDefaultMultiThreadingSettings(jobContext$moduleExecutionSettings$maxCores) args <- jobContext$settings args$connectionDetails <- connectionDetails @@ -41,7 +41,7 @@ CohortMethodModule <- R6::R6Class( exportFolder = exportFolder, databaseId = jobContext$moduleExecutionSettings$cdmDatabaseMetaData$databaseId, minCellCount = jobContext$moduleExecutionSettings$minCellCount, - maxCores = parallel::detectCores(), + maxCores = jobContext$moduleExecutionSettings$maxCores, cmDiagnosticThresholds = jobContext$settings$cmDiagnosticThresholds ) # TODO: Removing this to make the upload easier diff --git a/R/Module-EvidenceSynthesis.R b/R/Module-EvidenceSynthesis.R index a4e71453..547dc1e4 100644 --- a/R/Module-EvidenceSynthesis.R +++ b/R/Module-EvidenceSynthesis.R @@ -20,8 +20,8 @@ EvidenceSynthesisModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateResultsExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "ResultsExecutionSettings") jobContext <- private$jobContext resultsFolder <- jobContext$moduleExecutionSettings$resultsSubFolder @@ -357,7 +357,7 @@ EvidenceSynthesisModule <- R6::R6Class( fullKeys <- perDbEstimates$estimates[, c(perDbEstimates$key, "analysisId")] |> distinct() - cluster <- ParallelLogger::makeCluster(min(10, parallel::detectCores())) + cluster <- ParallelLogger::makeCluster(min(10, jobContext$moduleExecutionSettings$maxCores)) ParallelLogger::clusterRequire(cluster, "dplyr") on.exit(ParallelLogger::stopCluster(cluster)) diff --git a/R/Module-PatientLevelPrediction.R b/R/Module-PatientLevelPrediction.R index 05a2f8ff..cfeefcef 100644 --- a/R/Module-PatientLevelPrediction.R +++ b/R/Module-PatientLevelPrediction.R @@ -19,8 +19,8 @@ PatientLevelPredictionModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") jobContext <- private$jobContext cohortDefinitionSet <- super$.createCohortDefinitionSetFromJobContext() diff --git a/R/Module-SelfControlledCaseSeries.R b/R/Module-SelfControlledCaseSeries.R index 37eba49d..fafaaec8 100644 --- a/R/Module-SelfControlledCaseSeries.R +++ b/R/Module-SelfControlledCaseSeries.R @@ -20,11 +20,11 @@ SelfControlledCaseSeriesModule <- R6::R6Class( #' @template analysisSpecifications #' @template executionSettings execute = function(connectionDetails, analysisSpecifications, executionSettings) { + super$.validateCdmExecutionSettings(executionSettings) super$execute(connectionDetails, analysisSpecifications, executionSettings) - checkmate::assertClass(executionSettings, "CdmExecutionSettings") jobContext <- private$jobContext - sccsMultiThreadingSettings <- SelfControlledCaseSeries::createDefaultSccsMultiThreadingSettings(parallel::detectCores()) + sccsMultiThreadingSettings <- SelfControlledCaseSeries::createDefaultSccsMultiThreadingSettings(jobContext$moduleExecutionSettings$maxCores) args <- jobContext$settings args$connectionDetails <- connectionDetails diff --git a/R/Module-StrategusModule.R b/R/Module-StrategusModule.R index 82231721..2b0df6cd 100644 --- a/R/Module-StrategusModule.R +++ b/R/Module-StrategusModule.R @@ -272,6 +272,16 @@ StrategusModule <- R6::R6Class( detectOnDescendants = FALSE )) } + }, + .validateCdmExecutionSettings = function(executionSettings) { + errorMessages <- checkmate::makeAssertCollection() + checkmate::assertClass(executionSettings, "CdmExecutionSettings", add = errorMessages) + checkmate::assertInt(executionSettings$maxCores, lower = 1, upper = parallel::detectCores()) + }, + .validateResultsExecutionSettings = function(executionSettings) { + errorMessages <- checkmate::makeAssertCollection() + checkmate::assertClass(executionSettings, "ResultsExecutionSettings", add = errorMessages) + checkmate::assertInt(executionSettings$maxCores, lower = 1, upper = parallel::detectCores()) } ) ) diff --git a/R/Settings.R b/R/Settings.R index 81545ac2..5a9d63ec 100644 --- a/R/Settings.R +++ b/R/Settings.R @@ -256,6 +256,8 @@ createEmptyAnalysisSpecificiations <- function() { #' @param incremental This value will be passed to each module that supports execution in an incremental manner. Modules #' and their underlying packages may use the `workFolder` contents to determine their state of execution #' and attempt to pick up where they left off when this value is set to TRUE. +#' @param maxCores The maximum number of processing cores to use for execution. The default is to +#' use all available cores on the machine. #' #' @return #' An object of type `ExecutionSettings`. @@ -269,7 +271,8 @@ createCdmExecutionSettings <- function(workDatabaseSchema, resultsFolder, logFileName = file.path(resultsFolder, "strategus-log.txt"), minCellCount = 5, - incremental = TRUE) { + incremental = TRUE, + maxCores = parallel::detectCores()) { errorMessages <- checkmate::makeAssertCollection() checkmate::assertCharacter(workDatabaseSchema, len = 1, add = errorMessages) checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages) @@ -279,6 +282,7 @@ createCdmExecutionSettings <- function(workDatabaseSchema, checkmate::assertCharacter(logFileName, len = 1, add = errorMessages) checkmate::assertInt(minCellCount, add = errorMessages) checkmate::assertLogical(incremental, add = errorMessages) + checkmate::assertInt(maxCores, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) # Normalize paths to convert relative paths to absolute paths @@ -302,7 +306,8 @@ createCdmExecutionSettings <- function(workDatabaseSchema, #' @param logFileName Logging information from Strategus and all modules will be located in this file. Individual modules will continue to have their own module-specific logs. By default this will be written to the root of the `resultsFolder` #' @param minCellCount The minimum number of subjects contributing to a count before it can be included #' in results. -#' +#' @param maxCores The maximum number of processing cores to use for execution. The default is to +#' use all available cores on the machine. #' @return #' An object of type `ExecutionSettings`. #' @@ -311,13 +316,15 @@ createResultsExecutionSettings <- function(resultsDatabaseSchema, workFolder, resultsFolder, logFileName = file.path(resultsFolder, "strategus-log.txt"), - minCellCount = 5) { + minCellCount = 5, + maxCores = parallel::detectCores()) { errorMessages <- checkmate::makeAssertCollection() checkmate::assertCharacter(resultsDatabaseSchema, len = 1, add = errorMessages) checkmate::assertCharacter(workFolder, len = 1, add = errorMessages) checkmate::assertCharacter(resultsFolder, len = 1, add = errorMessages) checkmate::assertCharacter(logFileName, len = 1, add = errorMessages) checkmate::assertInt(minCellCount, add = errorMessages) + checkmate::assertInt(maxCores, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) # Normalize paths to convert relative paths to absolute paths diff --git a/man/createCdmExecutionSettings.Rd b/man/createCdmExecutionSettings.Rd index 36bf9829..e54e6544 100644 --- a/man/createCdmExecutionSettings.Rd +++ b/man/createCdmExecutionSettings.Rd @@ -13,7 +13,8 @@ createCdmExecutionSettings( resultsFolder, logFileName = file.path(resultsFolder, "strategus-log.txt"), minCellCount = 5, - incremental = TRUE + incremental = TRUE, + maxCores = parallel::detectCores() ) } \arguments{ @@ -41,6 +42,9 @@ in results.} \item{incremental}{This value will be passed to each module that supports execution in an incremental manner. Modules and their underlying packages may use the \code{workFolder} contents to determine their state of execution and attempt to pick up where they left off when this value is set to TRUE.} + +\item{maxCores}{The maximum number of processing cores to use for execution. The default is to +use all available cores on the machine.} } \value{ An object of type \code{ExecutionSettings}. diff --git a/man/createResultsExecutionSettings.Rd b/man/createResultsExecutionSettings.Rd index 4a108827..883e0487 100644 --- a/man/createResultsExecutionSettings.Rd +++ b/man/createResultsExecutionSettings.Rd @@ -9,7 +9,8 @@ createResultsExecutionSettings( workFolder, resultsFolder, logFileName = file.path(resultsFolder, "strategus-log.txt"), - minCellCount = 5 + minCellCount = 5, + maxCores = parallel::detectCores() ) } \arguments{ @@ -23,6 +24,9 @@ createResultsExecutionSettings( \item{minCellCount}{The minimum number of subjects contributing to a count before it can be included in results.} + +\item{maxCores}{The maximum number of processing cores to use for execution. The default is to +use all available cores on the machine.} } \value{ An object of type \code{ExecutionSettings}.