Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PTFM-12951 Move EE dependency installation code into dxpy #78

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions src/python/dxpy/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.

'''
"""
Utilities shared by dxpy modules.
'''
"""

from __future__ import (print_function, unicode_literals)

import os, json, collections, concurrent.futures, traceback, sys, time, gc
import dateutil.parser
from .exec_utils import run, convert_handlers_to_dxlinks, parse_args_as_job_input, entry_point, DXJSONEncoder
from .thread_pool import PrioritizingThreadPool
from .. import logger
from ..compat import basestring
Expand All @@ -41,11 +40,11 @@ def get_futures_threadpool(max_workers):
return PrioritizingThreadPool(max_workers=max_workers)

def wait_for_a_future(futures, print_traceback=False):
'''
"""
Return the next future that completes. If a KeyboardInterrupt is
received, then the entire process is exited immediately. See
wait_for_all_futures for more notes.
'''
"""
while True:
try:
future = next(concurrent.futures.as_completed(futures, timeout=10000000000))
Expand All @@ -62,7 +61,7 @@ def wait_for_a_future(futures, print_traceback=False):
return future

def wait_for_all_futures(futures, print_traceback=False):
'''
"""
Wait indefinitely for all futures in the input iterable to complete.
Use a timeout to enable interrupt handling.
Call os._exit() in case of KeyboardInterrupt. Otherwise, the atexit registered handler in concurrent.futures.thread
Expand All @@ -74,7 +73,7 @@ def wait_for_all_futures(futures, print_traceback=False):

Note: os._exit() doesn't work well with interactive mode (e.g. ipython). This may help:
import __main__ as main; if hasattr(main, '__file__'): os._exit() else: os.exit()
'''
"""
try:
while True:
waited_futures = concurrent.futures.wait(futures, timeout=60)
Expand All @@ -88,7 +87,7 @@ def wait_for_all_futures(futures, print_traceback=False):
os._exit(os.EX_IOERR)

def response_iterator(request_iterator, thread_pool, max_active_tasks=4, num_retries=0, retry_after=90, queue_id=''):
'''
"""
:param request_iterator: This is expected to be an iterator producing inputs for consumption by the worker pool.
:type request_iterator: iterator of callable_, args, kwargs
:param thread_pool: thread pool to submit the requests to
Expand All @@ -111,7 +110,7 @@ def response_iterator(request_iterator, thread_pool, max_active_tasks=4, num_ret
If there are 4 or more tasks in the queue, and all but the first one are done, the first task will be discarded
after *retry_after* seconds and resubmitted with the same parameters. This will be done up to *num_retries* times.
If retries are used, tasks should be idempotent.
'''
"""

# Debug fallback
#for _callable, args, kwargs in request_iterator:
Expand All @@ -129,19 +128,19 @@ def make_priority_fn(request_index):
return lambda: request_index - num_results_yielded

def submit(callable_, args, kwargs, retries=num_retries):
"""Submit the task.
"""
Submit the task.

Return (future, (callable_, args, kwargs), retries)

"""
future = thread_pool.submit_to_queue(queue_id, make_priority_fn(next_request_index), callable_, *args, **kwargs)
return (future, (callable_, args, kwargs), retries)

def resubmit(callable_, args, kwargs, retries):
"""Submit the task.
"""
Submit the task.

Return (future, (callable_, args, kwargs), retries)

"""
logger.warn("{}: Retrying {} after timeout".format(__name__, callable_))
# TODO: resubmitted tasks should be prioritized higher
Expand Down Expand Up @@ -205,12 +204,13 @@ def string_buffer_length(buf):
return buf_len

def normalize_time_input(t, future=False):
''' Converts inputs such as:
"""
Converts inputs such as:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it would be nice to break out cosmetic changes from the real substantive changes.

"2012-05-01"
"-5d"
1352863174
to milliseconds since epoch. See http://labix.org/python-dateutil and :meth:`normalize_timedelta`.
'''
"""
error_msg = 'Error: Could not parse {t} as a timestamp or timedelta. Expected a date format or an integer with a single-letter suffix: s=seconds, m=minutes, h=hours, d=days, w=weeks, M=months, y=years, e.g. "-10d" indicates 10 days ago'
if isinstance(t, basestring):
try:
Expand All @@ -226,10 +226,10 @@ def normalize_time_input(t, future=False):
return t

def normalize_timedelta(timedelta):
'''
"""
Given a string like "1w" or "-5d", convert it to an integer in milliseconds.
Note: not related to the datetime timedelta class.
'''
"""
try:
return int(timedelta)
except ValueError:
Expand Down Expand Up @@ -272,9 +272,10 @@ def group_array_by_field(array, field='group'):
return groups

def merge(d, u):
''' Recursively updates a dictionary.
"""
Recursively updates a dictionary.
Example: merge({"a": {"b": 1, "c": 2}}, {"a": {"b": 3}}) = {"a": {"b": 3, "c": 2}}
'''
"""
for k, v in u.items():
if isinstance(v, collections.Mapping):
r = merge(d.get(k, {}), v)
Expand All @@ -284,7 +285,9 @@ def merge(d, u):
return d

def _dict_raise_on_duplicates(ordered_pairs):
"""Reject duplicate keys."""
"""
Reject duplicate keys.
"""
d = {}
for k, v in ordered_pairs:
if k in d:
Expand All @@ -294,16 +297,20 @@ def _dict_raise_on_duplicates(ordered_pairs):
return d

def json_load_raise_on_duplicates(*args, **kwargs):
''' Like json.load(), but raises an error on duplicate keys.
'''
"""
Like json.load(), but raises an error on duplicate keys.
"""
kwargs['object_pairs_hook'] = _dict_raise_on_duplicates
return json.load(*args, **kwargs)

def json_loads_raise_on_duplicates(*args, **kwargs):
''' Like json.loads(), but raises an error on duplicate keys.
'''
"""
Like json.loads(), but raises an error on duplicate keys.
"""
kwargs['object_pairs_hook'] = _dict_raise_on_duplicates
return json.loads(*args, **kwargs)

def warn(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)

from .exec_utils import run, convert_handlers_to_dxlinks, parse_args_as_job_input, entry_point, DXJSONEncoder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this go down here? Is there an issue with circular imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are circular imports that cause this to go here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, please just add a comment to this effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Loading