Skip to content

Commit

Permalink
resolve a memory and file descriptor leak if multiprocessing is used;
Browse files Browse the repository at this point in the history
Processors can be pickled only if no multiprocessing is used, though
  • Loading branch information
Sebastian Böck committed Feb 25, 2015
1 parent fdb00c0 commit ccd7694
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 47 deletions.
7 changes: 5 additions & 2 deletions README
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ same forever! If you need stable features, create a branch and set the needed
parameters accordingly.

If you get segfaults on Mac OS X systems, please check if numpy and scipy use
ATLAS instead of Apple's own Accelerate framework or use the '--threads 1'
switch where appropriate.
ATLAS instead of Apple's own Accelerate framework or use the a single thread
(i.e. use the "-j 1" switch).

The same applies if you want to pickle any Processor, which can only pickled
if no multi-threading is used.

If you get errors pointing to any .pyx file, please recompile (see above)
before investigating this error further.
3 changes: 3 additions & 0 deletions bin/BeatDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def main():

# create an processor
processor = RNNBeatTracking(beat_method='BeatDetection', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/BeatTracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def main():

# create an processor
processor = RNNBeatTracking(beat_method='BeatTracking', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/CRFBeatDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def main():

# create an processor
processor = RNNBeatTracking(beat_method='CRFBeatDetection', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/ComplexFlux.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def main():

# create an processor
processor = ComlpexFlux(onset_method='complex_flux', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/DBNBeatTracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def main():

# create an processor
processor = RNNBeatTracking(beat_method='DBNBeatTracking', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/LogFiltSpecFlux.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def main():

# create an processor
processor = LogFiltSpecFlux(onset_method='spectral_flux', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/MMBeatTracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def main():
# create an processor
processor = RNNBeatTracking(beat_method='DBNBeatTracking',
multi_model=True, **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/OnsetDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def main():

# create an processor
processor = RNNOnsetDetection(**vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/OnsetDetectorLL.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def main():

# create an processor
processor = RNNOnsetDetection(online=True, **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/PianoTranscriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def main():

# create an processor
processor = RNNNoteTranscription(**vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
74 changes: 64 additions & 10 deletions bin/PickleProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"""

import os
import cPickle
import multiprocessing as mp


def process_single(processor, input, output):
Expand All @@ -17,29 +19,73 @@ def process_single(processor, input, output):
:param output: output file
"""
processor = cPickle.load(processor)
processor.process(input, output)


def process_batch(processor, files, output_dir=None, output_suffix=None):
class ParallelProcess(mp.Process):
"""
Process a single file with the given Processor.
Parallel Processing class.
"""

def __init__(self, task_queue):
"""
Create a ParallelProcess, which processes the tasks.
:param task_queue: queue with tasks
"""
mp.Process.__init__(self)
self.task_queue = task_queue

def run(self):
"""
Process all task from the task queue.
"""
while True:
# get the task tuple
processor, input_file, output_file = self.task_queue.get()
# process the Processor with the data
processor.process(input_file, output_file)
# signal that it is done
self.task_queue.task_done()


def process_batch(processor, files, output_dir=None, output_suffix=None,
num_threads=mp.cpu_count()):
"""
Process a list of files with the given Processor in batch mode.
:param processor: pickled Processor
:param files: audio files [list]
:param output_dir: output directory
:param output_suffix: output suffix
:param num_threads: number of parallel threads
Note: Either `output_dir` or `output_suffix` must be set.
"""
import os

# either output_dir or output_suffix must be given
if output_dir is None and output_suffix is None:
raise ValueError('either output directory or suffix must be given')
# get the processor
processor = cPickle.load(processor)
# make sure the directory exists
if output_dir is not None:
try:
# create output directory
os.mkdir(output_dir)
except OSError:
# directory exists already
pass

# create task queue
tasks = mp.JoinableQueue()
# create working threads
processes = [ParallelProcess(tasks) for _ in range(num_threads)]
for p in processes:
p.daemon = True
p.start()

# process all the files
for input_file in files:
# set the output file name
Expand All @@ -50,8 +96,10 @@ def process_batch(processor, files, output_dir=None, output_suffix=None):
# append the suffix if needed
if output_suffix is not None:
output_file += output_suffix
# process the input file to the output file
processor.process(input_file, output_file)
# put processing tasks in the queue
tasks.put((processor, input_file, output_file))
# wait for all processing tasks to finish
tasks.join()


def main():
Expand All @@ -65,6 +113,9 @@ def main():
formatter_class=argparse.RawDescriptionHelpFormatter, description='''
This script runs previously pickled Processors in either batch or single
file mode.
To obtain a pickled Processor, call the 'dump(outfile)' method.
''')
# general options
parser.add_argument('processor', type=argparse.FileType('r'),
Expand Down Expand Up @@ -92,12 +143,15 @@ def main():
help='output directory [default=%(default)s]')
sp.add_argument('-s', dest='output_suffix', default='.txt',
help='suffix appended to the files [default=%(default)s]')
sp.add_argument('-j', dest='num_threads', type=int, default=mp.cpu_count(),
help='number of parallel threads [default=%(default)s]')

# parse arguments and call the processing function
args = parser.parse_args()
kwargs = vars(args)
function = kwargs.pop('func')
function(**kwargs)
processor = cPickle.load(kwargs.pop('processor'))
function(processor, **kwargs)


if __name__ == '__main__':
Expand Down
3 changes: 3 additions & 0 deletions bin/SuperFlux.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def main():

# create an processor
processor = SuperFlux(**vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/SuperFluxNN.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def main():

# create an processor
processor = SuperFlux(peak_picking_method='nn', **vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
3 changes: 3 additions & 0 deletions bin/TempoDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def main():

# create an processor
processor = RNNTempoEstimation(**vars(args))
# pickle the processor if needed
if args.pickle is not None:
processor.dump(args.pickle)
# process everything
processor.process(args.input, args.output)

Expand Down
47 changes: 25 additions & 22 deletions madmom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ def load(cls, infile):
"""
import cPickle
# instantiate a new object and return it
# close the open file if needed and use its name
if not isinstance(infile, basestring):
infile.close()
infile = infile.name
# instantiate a new Processor and return it
return cPickle.load(open(infile, 'rb'))

def save(self, outfile):
def dump(self, outfile):
"""
Save the Processor to a file.
Expand All @@ -58,10 +62,11 @@ def save(self, outfile):
"""
import cPickle
# close the file if needed and use its name
# close the open file if needed and use its name
if not isinstance(outfile, basestring):
outfile.close()
outfile = outfile.name
# dump the Processor to the given file
cPickle.dump(self, open(outfile, 'wb'),
protocol=cPickle.HIGHEST_PROTOCOL)

Expand All @@ -79,9 +84,9 @@ def process(self, data):
"""
return data

# def __call__(self, *args):
# """This magic method makes an instance callable."""
# return self.process(*args)
def __call__(self, *args):
"""This magic method makes an instance callable."""
return self.process(*args)


class OutputProcessor(Processor):
Expand Down Expand Up @@ -189,6 +194,7 @@ class ParallelProcessor(SequentialProcessor):
import multiprocessing as mp
NUM_THREADS = mp.cpu_count()

# works, but is not pickle-able
def __init__(self, processors, num_threads=NUM_THREADS):
"""
Instantiates a ParallelProcessor object.
Expand All @@ -197,34 +203,31 @@ def __init__(self, processors, num_threads=NUM_THREADS):
:param num_threads: number of parallel working threads
"""
# save the processing queue
# save the processing chain
super(ParallelProcessor, self).__init__(processors)
# number of threads
if num_threads is None:
num_threads = 1
self.num_threads = num_threads
# Note: we must define the map function here, otherwise it leaks both
# memory and file descriptors if we init the pool in the process
# method. Thus we must use only 1 thread if we want to pickle the
# Processor
self.map = map
if min(len(processors), max(1, num_threads)) != 1:
import multiprocessing as mp
self.map = mp.Pool(num_threads).map

def process(self, data, num_threads=None):
def process(self, data):
"""
Process the data in parallel.
:param data: data to be processed
:param num_threads: number of parallel working threads
:return: list with individually processed data
:param data: data to be processed
:return: list with individually processed data
"""
import multiprocessing as mp
import itertools as it
# number of threads
if num_threads is None:
num_threads = self.num_threads
# init a pool of workers (if needed)
map_ = map
if min(len(self.processors), max(1, num_threads)) != 1:
map_ = mp.Pool(num_threads).map
# process data in parallel and return a list with processed data
data = map_(_process, it.izip(self.processors, it.repeat(data)))
return data
return self.map(_process, it.izip(self.processors, it.repeat(data)))

@classmethod
def add_arguments(cls, parser, num_threads=NUM_THREADS):
Expand Down
Loading

0 comments on commit ccd7694

Please sign in to comment.