Skip to content

Commit

Permalink
Add maxCores to settings (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonysena authored Aug 28, 2024
1 parent 34b315c commit c1b365f
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 17 deletions.
2 changes: 1 addition & 1 deletion R/Module-Characterization.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ CharacterizationModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
workFolder <- jobContext$moduleExecutionSettings$workSubFolder
Expand Down
2 changes: 1 addition & 1 deletion R/Module-CohortDiagnostics.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ CohortDiagnosticsModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
cohortDefinitionSet <- super$.createCohortDefinitionSetFromJobContext()
Expand Down
2 changes: 1 addition & 1 deletion R/Module-CohortGenerator.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ CohortGeneratorModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
cohortDefinitionSet <- super$.createCohortDefinitionSetFromJobContext()
Expand Down
2 changes: 1 addition & 1 deletion R/Module-CohortIncidence.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ CohortIncidenceModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

refId <- 1 # this should be part of execution settings
resultsFolder <- private$jobContext$moduleExecutionSettings$resultsSubFolder
Expand Down
6 changes: 3 additions & 3 deletions R/Module-CohortMethod.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ CohortMethodModule <- R6::R6Class(
#' @param analysisSpecifications The analysis specifications for the study
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
multiThreadingSettings <- CohortMethod::createDefaultMultiThreadingSettings(parallel::detectCores())
multiThreadingSettings <- CohortMethod::createDefaultMultiThreadingSettings(jobContext$moduleExecutionSettings$maxCores)

args <- jobContext$settings
args$connectionDetails <- connectionDetails
Expand All @@ -41,7 +41,7 @@ CohortMethodModule <- R6::R6Class(
exportFolder = exportFolder,
databaseId = jobContext$moduleExecutionSettings$cdmDatabaseMetaData$databaseId,
minCellCount = jobContext$moduleExecutionSettings$minCellCount,
maxCores = parallel::detectCores(),
maxCores = jobContext$moduleExecutionSettings$maxCores,
cmDiagnosticThresholds = jobContext$settings$cmDiagnosticThresholds
)
# TODO: Removing this to make the upload easier
Expand Down
4 changes: 2 additions & 2 deletions R/Module-EvidenceSynthesis.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ EvidenceSynthesisModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateResultsExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "ResultsExecutionSettings")
jobContext <- private$jobContext

resultsFolder <- jobContext$moduleExecutionSettings$resultsSubFolder
Expand Down Expand Up @@ -357,7 +357,7 @@ EvidenceSynthesisModule <- R6::R6Class(
fullKeys <- perDbEstimates$estimates[, c(perDbEstimates$key, "analysisId")] |>
distinct()

cluster <- ParallelLogger::makeCluster(min(10, parallel::detectCores()))
cluster <- ParallelLogger::makeCluster(min(10, jobContext$moduleExecutionSettings$maxCores))
ParallelLogger::clusterRequire(cluster, "dplyr")
on.exit(ParallelLogger::stopCluster(cluster))

Expand Down
2 changes: 1 addition & 1 deletion R/Module-PatientLevelPrediction.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ PatientLevelPredictionModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
cohortDefinitionSet <- super$.createCohortDefinitionSetFromJobContext()
Expand Down
4 changes: 2 additions & 2 deletions R/Module-SelfControlledCaseSeries.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ SelfControlledCaseSeriesModule <- R6::R6Class(
#' @template analysisSpecifications
#' @template executionSettings
execute = function(connectionDetails, analysisSpecifications, executionSettings) {
super$.validateCdmExecutionSettings(executionSettings)
super$execute(connectionDetails, analysisSpecifications, executionSettings)
checkmate::assertClass(executionSettings, "CdmExecutionSettings")

jobContext <- private$jobContext
sccsMultiThreadingSettings <- SelfControlledCaseSeries::createDefaultSccsMultiThreadingSettings(parallel::detectCores())
sccsMultiThreadingSettings <- SelfControlledCaseSeries::createDefaultSccsMultiThreadingSettings(jobContext$moduleExecutionSettings$maxCores)

args <- jobContext$settings
args$connectionDetails <- connectionDetails
Expand Down
10 changes: 10 additions & 0 deletions R/Module-StrategusModule.R
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ StrategusModule <- R6::R6Class(
detectOnDescendants = FALSE
))
}
},
.validateCdmExecutionSettings = function(executionSettings) {
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertClass(executionSettings, "CdmExecutionSettings", add = errorMessages)
checkmate::assertInt(executionSettings$maxCores, lower = 1, upper = parallel::detectCores())
},
.validateResultsExecutionSettings = function(executionSettings) {
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertClass(executionSettings, "ResultsExecutionSettings", add = errorMessages)
checkmate::assertInt(executionSettings$maxCores, lower = 1, upper = parallel::detectCores())
}
)
)
13 changes: 10 additions & 3 deletions R/Settings.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ createEmptyAnalysisSpecificiations <- function() {
#' @param incremental This value will be passed to each module that supports execution in an incremental manner. Modules
#' and their underlying packages may use the `workFolder` contents to determine their state of execution
#' and attempt to pick up where they left off when this value is set to TRUE.
#' @param maxCores The maximum number of processing cores to use for execution. The default is to
#' use all available cores on the machine.
#'
#' @return
#' An object of type `ExecutionSettings`.
Expand All @@ -269,7 +271,8 @@ createCdmExecutionSettings <- function(workDatabaseSchema,
resultsFolder,
logFileName = file.path(resultsFolder, "strategus-log.txt"),
minCellCount = 5,
incremental = TRUE) {
incremental = TRUE,
maxCores = parallel::detectCores()) {
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertCharacter(workDatabaseSchema, len = 1, add = errorMessages)
checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages)
Expand All @@ -279,6 +282,7 @@ createCdmExecutionSettings <- function(workDatabaseSchema,
checkmate::assertCharacter(logFileName, len = 1, add = errorMessages)
checkmate::assertInt(minCellCount, add = errorMessages)
checkmate::assertLogical(incremental, add = errorMessages)
checkmate::assertInt(maxCores, add = errorMessages)
checkmate::reportAssertions(collection = errorMessages)

# Normalize paths to convert relative paths to absolute paths
Expand All @@ -302,7 +306,8 @@ createCdmExecutionSettings <- function(workDatabaseSchema,
#' @param logFileName Logging information from Strategus and all modules will be located in this file. Individual modules will continue to have their own module-specific logs. By default this will be written to the root of the `resultsFolder`
#' @param minCellCount The minimum number of subjects contributing to a count before it can be included
#' in results.
#'
#' @param maxCores The maximum number of processing cores to use for execution. The default is to
#' use all available cores on the machine.
#' @return
#' An object of type `ExecutionSettings`.
#'
Expand All @@ -311,13 +316,15 @@ createResultsExecutionSettings <- function(resultsDatabaseSchema,
workFolder,
resultsFolder,
logFileName = file.path(resultsFolder, "strategus-log.txt"),
minCellCount = 5) {
minCellCount = 5,
maxCores = parallel::detectCores()) {
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertCharacter(resultsDatabaseSchema, len = 1, add = errorMessages)
checkmate::assertCharacter(workFolder, len = 1, add = errorMessages)
checkmate::assertCharacter(resultsFolder, len = 1, add = errorMessages)
checkmate::assertCharacter(logFileName, len = 1, add = errorMessages)
checkmate::assertInt(minCellCount, add = errorMessages)
checkmate::assertInt(maxCores, add = errorMessages)
checkmate::reportAssertions(collection = errorMessages)

# Normalize paths to convert relative paths to absolute paths
Expand Down
6 changes: 5 additions & 1 deletion man/createCdmExecutionSettings.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion man/createResultsExecutionSettings.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c1b365f

Please sign in to comment.