Skip to content

Commit

Permalink
Merge pull request #277 from usc-isi-i2/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
kyao authored Jul 19, 2019
2 parents 2fcc8a0 + 85222e1 commit 2cce3d7
Show file tree
Hide file tree
Showing 41 changed files with 7,628 additions and 4,569 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,6 @@ ENV/

# Mac
.DS_Store

# VS Code
.vscode
14 changes: 8 additions & 6 deletions python/dsbox/JobManager/DistributedJobManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _internal_worker_process(args: typing.Tuple[Queue, Queue, typing.Callable])
# '[DJM] Eval does not have runtime'
except:
_logger.exception(
f'{current_process()} > Target evaluation failed {hash(str(kwargs))}')
f'{current_process()} > Target evaluation failed {hash(str(kwargs))}', exc_info=True)
# print(f'[INFO] {current_process()} > Target evaluation failed {hash(str(kwargs))}')
traceback.print_exc()
# _logger.error(traceback.format_exc())
Expand All @@ -123,8 +123,8 @@ def _internal_worker_process(args: typing.Tuple[Queue, Queue, typing.Callable])
if "ensemble_tunning_result" in result:
result_simplified.pop("ensemble_tunning_result")

print(f"Pushing Results {current_process()} > {result}")
_logger.info(f"{current_process()} Pushing Results > {result}")
_logger.info(f"{current_process()} Pushing Results: {result['id'] if result and 'id' in result else 'NONE'}")
_logger.debug(f"{current_process()} Pushing Results > {result}")

pushed = False
# while not pushed:
Expand Down Expand Up @@ -259,7 +259,8 @@ def reset(self):
Cancel timer and clear the job queue.
'''
self._timeout_sec = -1
self.timer.cancel()
if self.timer:
self.timer.cancel()
self._clear_jobs()

def kill_job_manager(self):
Expand All @@ -282,8 +283,9 @@ def kill_job_manager(self):
self.kill_timer()

def kill_timer(self):
_logger.warning(f"timer killed")
self.timer.cancel()
if self.timer:
_logger.warning(f"timer killed")
self.timer.cancel()

def _setup_timeout_timer(self):
self.start_time = time.perf_counter()
Expand Down
30 changes: 24 additions & 6 deletions python/dsbox/JobManager/cache.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import traceback
import typing
import copy
import logging
import pprint
import traceback
import typing

from multiprocessing import Manager
from dsbox.template.configuration_space import ConfigurationPoint

from d3m.metadata.pipeline import PrimitiveStep
from d3m.container.dataset import Dataset
from d3m.container.pandas import DataFrame
from d3m.container.numpy import ndarray as d3m_ndarray
from d3m.primitive_interfaces.base import PrimitiveBase

from dsbox.combinatorial_search.search_utils import comparison_metrics
from dsbox.template.configuration_space import ConfigurationPoint

T = typing.TypeVar("T")
_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -222,6 +227,7 @@ def push_key(self, prim_hash: int, prim_name: int, model: PrimitiveBase,
# print("[WARN] Double-push in Primitives Cache")
return 1
except:
_logger.warning('Caching model failed. Most likely the primitive does not pickle properly.')
traceback.print_exc()
finally:
# print("[INFO] released")
Expand Down Expand Up @@ -257,9 +263,11 @@ def is_hit_key(self, prim_hash: int, prim_name: int) -> bool:

@staticmethod
def _get_hash(pipe_step: PrimitiveStep, primitive_arguments: typing.Dict,
primitive_hyperparams: typing.Dict, # 2019-7-11: must pass in hyperparams
hash_prefix: int=None) -> typing.Tuple[int, int]:
prim_name = str(pipe_step.primitive)
hyperparam_hash = hash(str(pipe_step.hyperparams.items()))
hyperparam_hash = hash(str(primitive_hyperparams.items()))

dataset_id = ""
dataset_digest = ""
try:
Expand All @@ -275,11 +283,21 @@ def _get_hash(pipe_step: PrimitiveStep, primitive_arguments: typing.Dict,
isinstance(primitive_arguments['inputs'], typing.List)), \
f"inputs type not valid {type(primitive_arguments['inputs'])}"

# v2019.6.30
# this added part used to check whether the input dataframe has column with ndarray
# hashing large ndarray is very slow so we should not do hash on this part
hash_part = copy.copy(primitive_arguments['inputs'])
if type(hash_part) is DataFrame:
for i in range(primitive_arguments['inputs'].shape[1]):
if type(primitive_arguments['inputs'].iloc[0, i]) is d3m_ndarray:
drop_column_name = hash_part.columns[i]
hash_part = hash_part.drop(columns=drop_column_name)

if hash_prefix is None:
_logger.debug("Primtive cache, hash computed in prefix mode")
dataset_value_hash = hash(str(primitive_arguments['inputs']))
dataset_value_hash = hash(str(hash_part))
else:
dataset_value_hash = hash(primitive_arguments['inputs'].values.tobytes())
dataset_value_hash = hash(hash_part.values.tobytes())

dataset_hash = hash(str(dataset_value_hash) + dataset_id + dataset_digest)
prim_hash = hash(str([hyperparam_hash, dataset_hash, hash_prefix]))
Expand Down
4 changes: 3 additions & 1 deletion python/dsbox/combinatorial_search/BanditDimensionalSearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, template_list: typing.List[DSBoxTemplate],
problem: Metadata, train_dataset1: Dataset,
train_dataset2: typing.List[Dataset], test_dataset1: Dataset,
test_dataset2: typing.List[Dataset], all_dataset: Dataset,
ensemble_tuning_dataset:Dataset,
ensemble_tuning_dataset:Dataset, extra_primitive:typing.Set[str],
output_directory: str, log_dir: str, timeout: int = 55, num_proc: int = 4) -> None:

# Use first metric from test
Expand All @@ -68,6 +68,7 @@ def __init__(self, template_list: typing.List[DSBoxTemplate],
train_dataset2=train_dataset2, test_dataset1=test_dataset1,
test_dataset2=test_dataset2, all_dataset=all_dataset,
ensemble_tuning_dataset = ensemble_tuning_dataset,
extra_primitive = extra_primitive,
log_dir=log_dir, output_directory=output_directory, timeout=timeout, num_proc=num_proc)

MultiBanditSearch.__init__(
Expand All @@ -78,6 +79,7 @@ def __init__(self, template_list: typing.List[DSBoxTemplate],
train_dataset2=train_dataset2, test_dataset1=test_dataset1,
test_dataset2=test_dataset2, all_dataset=all_dataset,
ensemble_tuning_dataset = ensemble_tuning_dataset,
extra_primitive = extra_primitive,
log_dir=log_dir, output_directory=output_directory, timeout=timeout, num_proc=num_proc)

# def search(self, num_iter: int = 2) -> typing.Dict:
Expand Down
Loading

0 comments on commit 2cce3d7

Please sign in to comment.