-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Returning success count from the .populate()
call
#1050
Changes from 21 commits
29357fe
b737003
d1011fb
6f7a0c0
1f358a9
eb827e6
1b4806e
0abd3c0
6bf2afc
37801d6
7a258d4
9480435
0f84560
02127a0
c061f8a
9ef2046
c66ff04
ff6b81c
45938aa
e143ce8
291a468
008a723
18fd619
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -180,6 +180,9 @@ def populate( | |||||||||||||||||||||||||||
to be passed down to each ``make()`` call. Computation arguments should be | ||||||||||||||||||||||||||||
specified within the pipeline e.g. using a `dj.Lookup` table. | ||||||||||||||||||||||||||||
:type make_kwargs: dict, optional | ||||||||||||||||||||||||||||
:return: a dict with two keys | ||||||||||||||||||||||||||||
"success_count": the count of successful ``make()`` calls in this ``populate()`` call | ||||||||||||||||||||||||||||
"error_list": the error list that is filled if `suppress_errors` is True | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
if self.connection.in_transaction: | ||||||||||||||||||||||||||||
raise DataJointError("Populate cannot be called during a transaction.") | ||||||||||||||||||||||||||||
|
@@ -222,49 +225,60 @@ def handler(signum, frame): | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
keys = keys[:max_calls] | ||||||||||||||||||||||||||||
nkeys = len(keys) | ||||||||||||||||||||||||||||
if not nkeys: | ||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
processes = min(_ for _ in (processes, nkeys, mp.cpu_count()) if _) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
error_list = [] | ||||||||||||||||||||||||||||
populate_kwargs = dict( | ||||||||||||||||||||||||||||
suppress_errors=suppress_errors, | ||||||||||||||||||||||||||||
return_exception_objects=return_exception_objects, | ||||||||||||||||||||||||||||
make_kwargs=make_kwargs, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
success_list = [] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if processes == 1: | ||||||||||||||||||||||||||||
for key in ( | ||||||||||||||||||||||||||||
tqdm(keys, desc=self.__class__.__name__) if display_progress else keys | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
error = self._populate1(key, jobs, **populate_kwargs) | ||||||||||||||||||||||||||||
if error is not None: | ||||||||||||||||||||||||||||
error_list.append(error) | ||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
# spawn multiple processes | ||||||||||||||||||||||||||||
self.connection.close() # disconnect parent process from MySQL server | ||||||||||||||||||||||||||||
del self.connection._conn.ctx # SSLContext is not pickleable | ||||||||||||||||||||||||||||
with mp.Pool( | ||||||||||||||||||||||||||||
processes, _initialize_populate, (self, jobs, populate_kwargs) | ||||||||||||||||||||||||||||
) as pool, ( | ||||||||||||||||||||||||||||
tqdm(desc="Processes: ", total=nkeys) | ||||||||||||||||||||||||||||
if display_progress | ||||||||||||||||||||||||||||
else contextlib.nullcontext() | ||||||||||||||||||||||||||||
) as progress_bar: | ||||||||||||||||||||||||||||
for error in pool.imap(_call_populate1, keys, chunksize=1): | ||||||||||||||||||||||||||||
if error is not None: | ||||||||||||||||||||||||||||
error_list.append(error) | ||||||||||||||||||||||||||||
if display_progress: | ||||||||||||||||||||||||||||
progress_bar.update() | ||||||||||||||||||||||||||||
self.connection.connect() # reconnect parent process to MySQL server | ||||||||||||||||||||||||||||
if nkeys: | ||||||||||||||||||||||||||||
processes = min(_ for _ in (processes, nkeys, mp.cpu_count()) if _) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
populate_kwargs = dict( | ||||||||||||||||||||||||||||
suppress_errors=suppress_errors, | ||||||||||||||||||||||||||||
return_exception_objects=return_exception_objects, | ||||||||||||||||||||||||||||
make_kwargs=make_kwargs, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if processes == 1: | ||||||||||||||||||||||||||||
for key in ( | ||||||||||||||||||||||||||||
tqdm(keys, desc=self.__class__.__name__) | ||||||||||||||||||||||||||||
if display_progress | ||||||||||||||||||||||||||||
else keys | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
status = self._populate1(key, jobs, **populate_kwargs) | ||||||||||||||||||||||||||||
if status is not None: | ||||||||||||||||||||||||||||
if isinstance(status, tuple): | ||||||||||||||||||||||||||||
error_list.append(status) | ||||||||||||||||||||||||||||
elif status: | ||||||||||||||||||||||||||||
success_list.append(1) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we change
Suggested change
|
||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
# spawn multiple processes | ||||||||||||||||||||||||||||
self.connection.close() # disconnect parent process from MySQL server | ||||||||||||||||||||||||||||
del self.connection._conn.ctx # SSLContext is not pickleable | ||||||||||||||||||||||||||||
with mp.Pool( | ||||||||||||||||||||||||||||
processes, _initialize_populate, (self, jobs, populate_kwargs) | ||||||||||||||||||||||||||||
) as pool, ( | ||||||||||||||||||||||||||||
tqdm(desc="Processes: ", total=nkeys) | ||||||||||||||||||||||||||||
if display_progress | ||||||||||||||||||||||||||||
else contextlib.nullcontext() | ||||||||||||||||||||||||||||
) as progress_bar: | ||||||||||||||||||||||||||||
for status in pool.imap(_call_populate1, keys, chunksize=1): | ||||||||||||||||||||||||||||
if status is not None: | ||||||||||||||||||||||||||||
if isinstance(status, tuple): | ||||||||||||||||||||||||||||
error_list.append(status) | ||||||||||||||||||||||||||||
elif status: | ||||||||||||||||||||||||||||
success_list.append(1) | ||||||||||||||||||||||||||||
if display_progress: | ||||||||||||||||||||||||||||
progress_bar.update() | ||||||||||||||||||||||||||||
self.connection.connect() # reconnect parent process to MySQL server | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# restore original signal handler: | ||||||||||||||||||||||||||||
if reserve_jobs: | ||||||||||||||||||||||||||||
signal.signal(signal.SIGTERM, old_handler) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if suppress_errors: | ||||||||||||||||||||||||||||
return error_list | ||||||||||||||||||||||||||||
return { | ||||||||||||||||||||||||||||
"success_count": sum(success_list), | ||||||||||||||||||||||||||||
"error_list": error_list, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
def _populate1( | ||||||||||||||||||||||||||||
self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None | ||||||||||||||||||||||||||||
|
@@ -275,7 +289,8 @@ def _populate1( | |||||||||||||||||||||||||||
:param key: dict specifying job to populate | ||||||||||||||||||||||||||||
:param suppress_errors: bool if errors should be suppressed and returned | ||||||||||||||||||||||||||||
:param return_exception_objects: if True, errors must be returned as objects | ||||||||||||||||||||||||||||
:return: (key, error) when suppress_errors=True, otherwise None | ||||||||||||||||||||||||||||
:return: (key, error) when suppress_errors=True, | ||||||||||||||||||||||||||||
True if successfully invoke one `make()` call, otherwise False | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
make = self._make_tuples if hasattr(self, "_make_tuples") else self.make | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -322,9 +337,12 @@ def _populate1( | |||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
if jobs is not None: | ||||||||||||||||||||||||||||
jobs.complete(self.target.table_name, self._job_key(key)) | ||||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does it ever return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then you should add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does not populating anything constitute a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current function produces There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, I'll update accordingly |
||||||||||||||||||||||||||||
finally: | ||||||||||||||||||||||||||||
self.__class__._allow_insert = False | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
return False | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
def progress(self, *restrictions, display=False): | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
Report the progress of populating the table. | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is
processes == 0
handled? PerhapsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is handled in the
else
block