-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PTFM-12951 Move EE dependency installation code into dxpy #78
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,15 +14,14 @@ | |
# License for the specific language governing permissions and limitations | ||
# under the License. | ||
|
||
''' | ||
""" | ||
Utilities shared by dxpy modules. | ||
''' | ||
""" | ||
|
||
from __future__ import (print_function, unicode_literals) | ||
|
||
import os, json, collections, concurrent.futures, traceback, sys, time, gc | ||
import dateutil.parser | ||
from .exec_utils import run, convert_handlers_to_dxlinks, parse_args_as_job_input, entry_point, DXJSONEncoder | ||
from .thread_pool import PrioritizingThreadPool | ||
from .. import logger | ||
from ..compat import basestring | ||
|
@@ -41,11 +40,11 @@ def get_futures_threadpool(max_workers): | |
return PrioritizingThreadPool(max_workers=max_workers) | ||
|
||
def wait_for_a_future(futures, print_traceback=False): | ||
''' | ||
""" | ||
Return the next future that completes. If a KeyboardInterrupt is | ||
received, then the entire process is exited immediately. See | ||
wait_for_all_futures for more notes. | ||
''' | ||
""" | ||
while True: | ||
try: | ||
future = next(concurrent.futures.as_completed(futures, timeout=10000000000)) | ||
|
@@ -62,7 +61,7 @@ def wait_for_a_future(futures, print_traceback=False): | |
return future | ||
|
||
def wait_for_all_futures(futures, print_traceback=False): | ||
''' | ||
""" | ||
Wait indefinitely for all futures in the input iterable to complete. | ||
Use a timeout to enable interrupt handling. | ||
Call os._exit() in case of KeyboardInterrupt. Otherwise, the atexit registered handler in concurrent.futures.thread | ||
|
@@ -74,7 +73,7 @@ def wait_for_all_futures(futures, print_traceback=False): | |
|
||
Note: os._exit() doesn't work well with interactive mode (e.g. ipython). This may help: | ||
import __main__ as main; if hasattr(main, '__file__'): os._exit() else: os.exit() | ||
''' | ||
""" | ||
try: | ||
while True: | ||
waited_futures = concurrent.futures.wait(futures, timeout=60) | ||
|
@@ -88,7 +87,7 @@ def wait_for_all_futures(futures, print_traceback=False): | |
os._exit(os.EX_IOERR) | ||
|
||
def response_iterator(request_iterator, thread_pool, max_active_tasks=4, num_retries=0, retry_after=90, queue_id=''): | ||
''' | ||
""" | ||
:param request_iterator: This is expected to be an iterator producing inputs for consumption by the worker pool. | ||
:type request_iterator: iterator of callable_, args, kwargs | ||
:param thread_pool: thread pool to submit the requests to | ||
|
@@ -111,7 +110,7 @@ def response_iterator(request_iterator, thread_pool, max_active_tasks=4, num_ret | |
If there are 4 or more tasks in the queue, and all but the first one are done, the first task will be discarded | ||
after *retry_after* seconds and resubmitted with the same parameters. This will be done up to *num_retries* times. | ||
If retries are used, tasks should be idempotent. | ||
''' | ||
""" | ||
|
||
# Debug fallback | ||
#for _callable, args, kwargs in request_iterator: | ||
|
@@ -129,19 +128,19 @@ def make_priority_fn(request_index): | |
return lambda: request_index - num_results_yielded | ||
|
||
def submit(callable_, args, kwargs, retries=num_retries): | ||
"""Submit the task. | ||
""" | ||
Submit the task. | ||
|
||
Return (future, (callable_, args, kwargs), retries) | ||
|
||
""" | ||
future = thread_pool.submit_to_queue(queue_id, make_priority_fn(next_request_index), callable_, *args, **kwargs) | ||
return (future, (callable_, args, kwargs), retries) | ||
|
||
def resubmit(callable_, args, kwargs, retries): | ||
"""Submit the task. | ||
""" | ||
Submit the task. | ||
|
||
Return (future, (callable_, args, kwargs), retries) | ||
|
||
""" | ||
logger.warn("{}: Retrying {} after timeout".format(__name__, callable_)) | ||
# TODO: resubmitted tasks should be prioritized higher | ||
|
@@ -205,12 +204,13 @@ def string_buffer_length(buf): | |
return buf_len | ||
|
||
def normalize_time_input(t, future=False): | ||
''' Converts inputs such as: | ||
""" | ||
Converts inputs such as: | ||
"2012-05-01" | ||
"-5d" | ||
1352863174 | ||
to milliseconds since epoch. See http://labix.org/python-dateutil and :meth:`normalize_timedelta`. | ||
''' | ||
""" | ||
error_msg = 'Error: Could not parse {t} as a timestamp or timedelta. Expected a date format or an integer with a single-letter suffix: s=seconds, m=minutes, h=hours, d=days, w=weeks, M=months, y=years, e.g. "-10d" indicates 10 days ago' | ||
if isinstance(t, basestring): | ||
try: | ||
|
@@ -226,10 +226,10 @@ def normalize_time_input(t, future=False): | |
return t | ||
|
||
def normalize_timedelta(timedelta): | ||
''' | ||
""" | ||
Given a string like "1w" or "-5d", convert it to an integer in milliseconds. | ||
Note: not related to the datetime timedelta class. | ||
''' | ||
""" | ||
try: | ||
return int(timedelta) | ||
except ValueError: | ||
|
@@ -272,9 +272,10 @@ def group_array_by_field(array, field='group'): | |
return groups | ||
|
||
def merge(d, u): | ||
''' Recursively updates a dictionary. | ||
""" | ||
Recursively updates a dictionary. | ||
Example: merge({"a": {"b": 1, "c": 2}}, {"a": {"b": 3}}) = {"a": {"b": 3, "c": 2}} | ||
''' | ||
""" | ||
for k, v in u.items(): | ||
if isinstance(v, collections.Mapping): | ||
r = merge(d.get(k, {}), v) | ||
|
@@ -284,7 +285,9 @@ def merge(d, u): | |
return d | ||
|
||
def _dict_raise_on_duplicates(ordered_pairs): | ||
"""Reject duplicate keys.""" | ||
""" | ||
Reject duplicate keys. | ||
""" | ||
d = {} | ||
for k, v in ordered_pairs: | ||
if k in d: | ||
|
@@ -294,16 +297,20 @@ def _dict_raise_on_duplicates(ordered_pairs): | |
return d | ||
|
||
def json_load_raise_on_duplicates(*args, **kwargs): | ||
''' Like json.load(), but raises an error on duplicate keys. | ||
''' | ||
""" | ||
Like json.load(), but raises an error on duplicate keys. | ||
""" | ||
kwargs['object_pairs_hook'] = _dict_raise_on_duplicates | ||
return json.load(*args, **kwargs) | ||
|
||
def json_loads_raise_on_duplicates(*args, **kwargs): | ||
''' Like json.loads(), but raises an error on duplicate keys. | ||
''' | ||
""" | ||
Like json.loads(), but raises an error on duplicate keys. | ||
""" | ||
kwargs['object_pairs_hook'] = _dict_raise_on_duplicates | ||
return json.loads(*args, **kwargs) | ||
|
||
def warn(*args, **kwargs): | ||
print(*args, file=sys.stderr, **kwargs) | ||
|
||
from .exec_utils import run, convert_handlers_to_dxlinks, parse_args_as_job_input, entry_point, DXJSONEncoder | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this go down here? Is there an issue with circular imports? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, there are circular imports that cause this to go here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, please just add a comment to this effect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general it would be nice to break out cosmetic changes from the real substantive changes.