diff --git a/CHANGES.md b/CHANGES.md index b179ee3..3cbdcce 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +v1.1.27 (2020.06.01) + +* Fix #100 - use correct property to determine timeout when running `qc` in parallel + v1.1.26 (2020.05.04) * Fix #97 - handle single-end data in QC report diff --git a/atropos/commands/multicore.py b/atropos/commands/multicore.py index 39c6699..f4f3995 100644 --- a/atropos/commands/multicore.py +++ b/atropos/commands/multicore.py @@ -26,36 +26,36 @@ class MulticoreError(AtroposError): class Control(object): """Shared (long) value for passing control information between main and worker threads. - + Args: initial_value: Initial value of the shared control variable. """ def __init__(self, initial_value=CONTROL_ACTIVE): self.control = Value('l', initial_value) - + def check_value(self, value, lock=False): """Check that the current control value == `value`. - + Args: value: The value to check. lock: Whether to lock the shared variable before checking. - + Returns: True if the values are equal. """ return self.get_value(lock=lock) == value - + def check_value_positive(self, lock=False): """Check that the current control value is positive. - + Args: lock: Whether to lock the shared variable before checking. """ return self.get_value(lock=lock) > 0 - + def get_value(self, lock=True): """Returns the current control value. - + Args: lock: Whether to lock the shared variable before checking. """ @@ -64,10 +64,10 @@ def get_value(self, lock=True): return self.control.value else: return self.control.value - + def set_value(self, value): """Set the control value. The shared variable is always locked. - + Args: value: The value to set. """ @@ -78,7 +78,7 @@ class PendingQueue(object): """Queue for items with sequentially increasing priority. An item whose priority is below the current level is queued. Pop returns the item with the current priority and increments the current priority. - + Args: max_size: Maximum queue size; None == infinite. """ @@ -86,15 +86,15 @@ def __init__(self, max_size=None): self.queue = {} self.max_size = max_size self.min_priority = None - + def push(self, priority, value): """Add an item to the queue with priority. - + Args: priority: An integer that determines the placement of `value` in the queue. Must be unique. value: The value to queue. - + Raises: Full if queue is full. """ @@ -105,10 +105,10 @@ def push(self, priority, value): self.queue[priority] = value if self.min_priority is None or priority < self.min_priority: self.min_priority = priority - + def pop(self): """Remove and return the item in the queue with lowest priority. - + Raises: Empty if the queue is emtpy. """ @@ -120,13 +120,13 @@ def pop(self): else: self.min_priority = min(self.queue.keys()) return value - + @property def full(self): """Whether the queue is full. """ return self.max_size and len(self.queue) >= self.max_size - + @property def empty(self): """Whether the queue is empty. @@ -140,11 +140,11 @@ class ParallelPipelineMixin(object): def start(self, **kwargs): super().start(**kwargs) self.seen_batches = set() - + def process_batch(self, batch): self.seen_batches.add(batch[0]['index']) super().process_batch(batch) - + def finish(self, summary, worker=None): super().finish(summary, worker=worker) logging.getLogger().debug( @@ -154,7 +154,7 @@ def finish(self, summary, worker=None): class WorkerProcess(Process): """Parent class for worker processes that execute Pipelines. - + Args: index: A unique ID for the process. input_queue: Queue with batches of records to process. @@ -169,13 +169,13 @@ def __init__(self, index, input_queue, pipeline, summary_queue, timeout): self.pipeline = pipeline self.summary_queue = summary_queue self.timeout = timeout - + def run(self): logging.getLogger().debug( "%s running under pid %d", self.name, os.getpid()) - + summary = {} - + def iter_batches(): """Deque and yield batches. """ @@ -185,7 +185,7 @@ def iter_batches(): wait_message="{} waiting on batch {{}}".format(self.name), timeout=self.timeout) yield batch - + def enqueue_summary(): """Enqueue a summary dict. """ @@ -196,10 +196,10 @@ def enqueue_summary(): self.name), timeout=self.timeout ) - + try: self.pipeline.start(worker=self) - + try: for batch in iter_batches(): if batch is None: @@ -210,19 +210,19 @@ def enqueue_summary(): self.pipeline.process_batch(batch) finally: self.pipeline.finish(summary, worker=self) - + logging.getLogger().debug("%s finished normally", self.name) except Exception as err: logging.getLogger().error( "Unexpected error in %s", self.name, exc_info=True) summary['exception'] = err - + logging.getLogger().debug("%s sending summary", self.name) enqueue_summary() class ParallelPipelineRunner(object): """Run a pipeline in parallel. - + Args: reader: A :class:`BatchReader`. pipeline: A :class:`Pipeline`. @@ -230,9 +230,11 @@ class ParallelPipelineRunner(object): from command_runner. """ def __init__(self, command_runner, pipeline, threads=None): + self.threads = threads or command_runner.threads + if threads < 2: + raise ValueError("'threads' must be >= 2") self.command_runner = command_runner self.pipeline = pipeline - self.threads = threads or command_runner.threads self.timeout = max(command_runner.process_timeout, RETRY_INTERVAL) # Queue by which batches of reads are sent to worker processes self.input_queue = Queue(command_runner.read_queue_size) @@ -242,35 +244,35 @@ def __init__(self, command_runner, pipeline, threads=None): self.num_batches = None self.seen_summaries = None self.seen_batches = None - + def ensure_alive(self): """Callback when enqueue times out. """ ensure_processes(self.worker_processes) - + def after_enqueue(self): """Called after all batches are queued. """ pass - + def finish(self): """Called at the end of the run. """ pass - + def run(self): """Run the pipeline. - + Returns: The return code. """ retcode = run_interruptible(self) self.terminate(retcode) return retcode - + def terminate(self, retcode): """Terminates all processes. - + Args: retcode: The return code. """ @@ -278,33 +280,33 @@ def terminate(self, retcode): logging.getLogger().debug("Exiting all processes") for process in self.worker_processes: kill(process, retcode, self.timeout) - + def __call__(self): # Start worker processes, reserve a thread for the reader process, # which we will get back after it completes worker_args = ( self.input_queue, self.pipeline, self.summary_queue, self.timeout) self.worker_processes = launch_workers(self.threads - 1, worker_args) - + self.num_batches = enqueue_all( self.command_runner.iterator(), self.input_queue, self.timeout, self.ensure_alive) - + logging.getLogger().debug( "Main loop complete; saw %d batches", self.num_batches) - + # Tell the worker processes no more input is coming enqueue_all( (None,) * self.threads, self.input_queue, self.timeout, self.ensure_alive) - + self.after_enqueue() - + # Now that the reader process is done, it essentially # frees up another thread to use for a worker self.worker_processes.extend( launch_workers(1, worker_args, offset=self.threads-1)) - + # Wait for all summaries to be available on queue def summary_timeout_callback(): """Ensure that workers are still alive. @@ -316,21 +318,21 @@ def summary_timeout_callback(): alive=False) except Exception as err: logging.getLogger().error(err) - + wait_on( self.summary_queue.full, wait_message="Waiting on worker summaries {}", timeout=self.timeout, wait=True, timeout_callback=summary_timeout_callback) - + # Process summary information from worker processes logging.getLogger().debug( "Processing summary information from worker processes") - + self.seen_summaries = set() self.seen_batches = set() - + def summary_fail_callback(): """Raises AtroposError with workers that did not report summaries. """ @@ -339,7 +341,7 @@ def summary_fail_callback(): raise AtroposError( "Missing summaries from processes %s", ",".join(str(summ) for summ in missing_summaries)) - + for _ in range(1, self.threads+1): batch = dequeue( self.summary_queue, fail_callback=summary_fail_callback) @@ -359,7 +361,7 @@ def summary_fail_callback(): self.seen_summaries.add(worker_index) self.seen_batches |= worker_batches self.command_runner.summary.merge(worker_summary) - + # Check if any batches were missed if self.num_batches > 0: missing_batches = ( @@ -368,7 +370,7 @@ def summary_fail_callback(): raise AtroposError( "Workers did not process batches {}".format( ",".join(str(batch) for batch in missing_batches))) - + self.finish() def launch_workers(num_workers, args=(), offset=0, worker_class=WorkerProcess): @@ -397,7 +399,7 @@ def wait_on( condition, *args, wait_message="Waiting {}", timeout=None, fail_callback=None, wait=None, timeout_callback=None): """Wait on a condition to be non-False. - + Args: condition: Function that returns either False or a non-False value. args: Args with which to call `condition`. @@ -445,7 +447,7 @@ def wait_on( def wait_on_process(process, timeout, terminate=False): """Wait on a process to terminate. - + Args: process: The process on which to wait. timeout: Number of seconds to wait for process to terminate. @@ -464,7 +466,7 @@ def enqueue( queue, item, wait_message="Waiting to enqueue item {}", block_timeout=RETRY_INTERVAL, **kwargs): """Enqueue an item, using `wait_on` to wait while `queue` is full. - + Args: queue: The queue to which to add the item. item: The item to queue. @@ -485,13 +487,13 @@ def condition(item): def enqueue_all(iterable, queue, timeout, fail_callback): """Enqueue all items in `iterable`, using `wait_on` to wait while `queue` is full. - + Args: iterable: Iterable of items to queue. queue: The queue to which to add the item. timeout: Number of seconds to wait after each `queue.put` attempt. fail_callback: Function called (or Exception raised) after timeout. - + Returns: The number of items queued. """ diff --git a/atropos/commands/qc/__init__.py b/atropos/commands/qc/__init__.py index c08f58d..0b2888c 100644 --- a/atropos/commands/qc/__init__.py +++ b/atropos/commands/qc/__init__.py @@ -16,15 +16,15 @@ def __init__(self, read_statistics_class, **kwargs): self.read_statistics_class = read_statistics_class self.stats = {} self.stats_kwargs = kwargs - + def _get_stats(self, source): if source not in self.stats: self.stats[source] = self.read_statistics_class(**self.stats_kwargs) return self.stats[source] - + def handle_reads(self, context, read1, read2=None): self._get_stats(context['source']).collect(read1, read2) - + def finish(self, summary, **kwargs): super().finish(summary) summary['pre'] = dict( @@ -45,7 +45,7 @@ def __init__(self, **kwargs): class CommandRunner(BaseCommandRunner): name = 'qc' - + def __call__(self): if self.paired: pipeline_class = PairedEndQcPipeline @@ -56,7 +56,7 @@ def __call__(self): quality_base=self.quality_base) if self.stats: pipeline_args.update(self.stats) - + if self.threads is None: self.summary.update(mode='serial', threads=1) pipeline = pipeline_class(**pipeline_args) @@ -64,31 +64,27 @@ def __call__(self): else: self.summary.update(mode='parallel', threads=self.threads) return self.run_parallel(pipeline_class, pipeline_args) - + def run_parallel(self, pipeline_class, pipeline_args): """Execute qc in parallel mode. - + Args: pipeline_class: Pipeline class to instantiate. pipeline_args: Arguments to pass to Pipeline constructor. - + Returns: The return code. """ from atropos.commands.multicore import ( - ParallelPipelineMixin, ParallelPipelineRunner) - - logging.getLogger().debug( - "Starting atropos qc in parallel mode with threads=%d, timeout=%d", - self.threads, self.timeout) - - if self.threads < 2: - raise ValueError("'threads' must be >= 2") - - # Start worker processes, reserve a thread for the reader process, - # which we will get back after it completes + ParallelPipelineMixin, ParallelPipelineRunner, RETRY_INTERVAL) + pipeline_class = type( 'QcPipelineImpl', (ParallelPipelineMixin, pipeline_class)) pipeline = pipeline_class(**pipeline_args) runner = ParallelPipelineRunner(self, pipeline) + + logging.getLogger().debug( + "Starting atropos qc in parallel mode with threads=%d, timeout=%d", + runner.threads, runner.timeout) + return runner.run()