diff --git a/apyfal/_pool_executor.py b/apyfal/_pool_executor.py index 37de651..7272882 100644 --- a/apyfal/_pool_executor.py +++ b/apyfal/_pool_executor.py @@ -24,7 +24,8 @@ def process_submit(self, src=None, dst=None, info_dict=None, See "apyfal.Accelerator.process" for more information. """ - def process_map(self, srcs=None, dsts=None, timeout=None, **parameters): + def process_map(self, srcs=None, dsts=None, timeout=None, info_list=None, + **parameters): """ Map process execution on multiples files. @@ -48,6 +49,9 @@ def process_map(self, srcs=None, dsts=None, timeout=None, **parameters): dictionary values. Take a look to accelerator documentation for more information on possible parameters. Path-like object can be path, URL or cloud object URL. + info_list (list): If a list passed, this list is updated + with "info_dict" extra information dicts for each process + operation. Returns: generator: Results. @@ -87,7 +91,9 @@ def process_map(self, srcs=None, dsts=None, timeout=None, **parameters): if size_dst: dst = dsts[index] - futures.append(self.process_submit(src=src, dst=dst, **parameters)) + futures.append(self.process_submit( + src=src, dst=dst, info_dict=self._get_info_dict(info_list), + **parameters)) def result_iterator(): """ @@ -109,6 +115,23 @@ def result_iterator(): return result_iterator() + @staticmethod + def _get_info_dict(info_list): + """ + Return info dict and append it in info list. + + Args: + info_list (list or None): info list + + Returns: + dict or None: info_dict + """ + if info_list is not None: + info_dict = dict() + info_list.append(info_dict) + return info_dict + return None + class AcceleratorPoolExecutor(_AbstractAsyncAccelerator): """ @@ -207,7 +230,7 @@ def hosts(self): return [worker.host for worker in self._workers] def start(self, stop_mode=None, src=None, host_env=None, reload=None, - reset=None, **parameters): + reset=None, info_list=None, **parameters): """ Starts and/or configure all accelerators in the pool. @@ -231,6 +254,8 @@ def start(self, stop_mode=None, src=None, host_env=None, reload=None, reload (bool): Force reload of FPGA bitstream. reset (bool): Force reset of FPGA logic. host_env (dict): Overrides Accelerator "env". + info_list (list): If a list passed, this list is updated + with "info_dict" extra information dicts for each accelerator. Returns: list: List of "Accelerator.start" results. @@ -238,7 +263,8 @@ def start(self, stop_mode=None, src=None, host_env=None, reload=None, with ThreadPoolExecutor(max_workers=self._workers_count) as executor: futures = [executor.submit( worker.start, stop_mode=stop_mode, src=src, - host_env=host_env, reload=reload, reset=reset, **parameters) + host_env=host_env, reload=reload, reset=reset, + info_dict=self._get_info_dict(info_list), **parameters) for worker in self._workers] return [future.result() for future in as_completed(futures)] @@ -284,7 +310,7 @@ def process_submit(self, src=None, dst=None, info_dict=None, return self._workers[index].process_submit( src=src, dst=dst, info_dict=info_dict, **parameters) - def stop(self, stop_mode=None, wait=True): + def stop(self, stop_mode=None, wait=True, info_list=None): """ Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to @@ -299,6 +325,8 @@ def stop(self, stop_mode=None, wait=True): current "stop_mode" value. See "apyfal.host.Host.stop_mode" property for more information and possible values. wait (bool): Waits stop completion before return. + info_list (list): If a list passed, this list is updated + with "info_dict" extra information dicts for each accelerator. Returns: list: List of "Accelerator.stop" results if "info_dict", else @@ -310,7 +338,9 @@ def stop(self, stop_mode=None, wait=True): with ThreadPoolExecutor(max_workers=self._workers_count) as executor: futures = [executor.submit( - worker.stop, stop_mode=stop_mode) for worker in self._workers] + worker.stop, stop_mode=stop_mode, + info_dict=self._get_info_dict(info_list)) + for worker in self._workers] if wait: return [future.result() for future in as_completed(futures)] diff --git a/docs/changes.rst b/docs/changes.rst index d19274e..275a941 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,7 +1,7 @@ Changelog ========= -1.2.3 (2018/??) +1.2.3 (2018/10) --------------- Backward incompatible changes: @@ -34,9 +34,10 @@ Backward incompatible changes: info = dict() my_accel.process(dst='data.dat', info_dict=info) -- The ``info_dict`` argument was removed from ``AcceleratorPoolExecutor.start``, +- The ``info_dict`` argument from ``AcceleratorPoolExecutor.start``, ``AcceleratorPoolExecutor.stop``, ``AcceleratorPoolExecutor.process_map`` and - ``Accelerator.process_map`` methods. + ``Accelerator.process_map`` methods is replaced by ``info_list`` and wait a + ``list`` to populate instead of a ``dict``. 1.2.2 (2018/10) --------------- diff --git a/tests/test_pool_executor.py b/tests/test_pool_executor.py index de0fd24..555e497 100644 --- a/tests/test_pool_executor.py +++ b/tests/test_pool_executor.py @@ -14,7 +14,7 @@ def test_abstract_async_accelerator_process_map(): # Mocks sub class files_in = [] files_out = [] - process_kwargs = dict(arg='arg', info_dict=None, parameters='parameters') + process_kwargs = dict(arg='arg', parameters='parameters') process_duration = 0.0 class AsyncAccelerator(_AbstractAsyncAccelerator): @@ -31,7 +31,8 @@ def run_task(): def process_submit(self, src=None, dst=None, **kwargs): """Checks arguments and returns fake result""" - assert kwargs == process_kwargs + for key in process_kwargs: + assert key in kwargs assert src in files_in or (src is None and not files_in) assert dst in files_out or (dst is None and not files_out) return self._executor.submit(self.run_task) @@ -92,12 +93,12 @@ def test_accelerator_pool_executor(): accelerator = 'accelerator' workers_count = 4 - start_kwargs = dict(src='src', - host_env='env', stop_mode='term', reset=None, + start_kwargs = dict(src='src', host_env='env', stop_mode='term', reset=None, reload=None) stop_kwargs = dict(stop_mode=None) - process_kwargs = dict(arg='arg', info_dict=None, parameters='parameters', - src='src', dst='dst') + process_kwargs = dict( + arg='arg', parameters='parameters', src='src', dst='dst') + excepted_info_dict = None # Mocks Accelerator @@ -125,19 +126,29 @@ def _wait_completed(self): def start(self, **kwargs): """Checks arguments and return fake result""" self.running = True + kwargs = kwargs.copy() + assert kwargs.pop('info_dict') == excepted_info_dict assert kwargs == start_kwargs return True def process_submit(self, **kwargs): """Checks arguments and return fake result""" - assert kwargs == process_kwargs + kwargs = kwargs.copy() + assert kwargs.pop('info_dict') == excepted_info_dict + for key in process_kwargs: + assert key in kwargs self.process_running_count += 1 return Future() def stop(self, **kwargs): """Checks arguments and return fake result""" self.running = False - assert kwargs == stop_kwargs + kwargs = kwargs.copy() + info_dict = kwargs.pop('info_dict') + if kwargs.get('stop_mode') == 'check_info_dict': + assert info_dict == excepted_info_dict + else: + assert kwargs == stop_kwargs return True apyfal_accelerator = apyfal.Accelerator @@ -203,6 +214,26 @@ def stop(self, **kwargs): for acc in pool.accelerators: assert not acc.running + # Info dict + excepted_info_dict = dict() + pool = apyfal.AcceleratorPoolExecutor(workers_count=workers_count) + + info_list = [] + pool.start(info_list=info_list, **start_kwargs) + assert info_list == workers_count * [dict()] + + info_list = [] + del process_kwargs['src'] + del process_kwargs['dst'] + pool.process_map(srcs=[''] * 10, info_list=info_list, **process_kwargs) + assert info_list == 10 * [dict()] + + info_list = [] + pool.stop(info_list=info_list, + stop_mode='check_info_dict', # avoid check on __del__ + ) + assert info_list == workers_count * [dict()] + # Restores mocked class finally: apyfal.Accelerator = apyfal_accelerator