diff --git a/FunctionWorker/python/SessionHelperThread.py b/FunctionWorker/python/SessionHelperThread.py index e1094baf..aca9c060 100644 --- a/FunctionWorker/python/SessionHelperThread.py +++ b/FunctionWorker/python/SessionHelperThread.py @@ -235,6 +235,7 @@ def _store_message(self, msg): self._message_queue.put(msg) def _handle_special_message(self, msg): + #self._logger.debug("[SessionHelperThread] Special message: " + str(msg)) action = msg["action"] if action == "--stop": diff --git a/FunctionWorker/python/StateUtils.py b/FunctionWorker/python/StateUtils.py index 7718b872..804162ee 100644 --- a/FunctionWorker/python/StateUtils.py +++ b/FunctionWorker/python/StateUtils.py @@ -419,15 +419,21 @@ def evaluateMapState(self, function_input, key, metadata, sapi): self._logger.debug("[StateUtils] evaluateMapState, maxConcurrency: " + str(maxConcurrency)) self._logger.debug("[StateUtils] evaluateMapState metadata: " + str(metadata)) + self._logger.info("[StateUtils] evaluateMapState, maxConcurrency: " + str(maxConcurrency)) + self._logger.info("[StateUtils] evaluateMapState metadata: " + str(metadata)) + counter_name_topic = self.functionstatename + "-" + self.sandboxid total_branch_count = len(function_input) # all branches executed concurrently - + #sapi.put(name_prefix + "_" + "mapInputCount", str(len(function_input))) + klist = [total_branch_count] self.parsedfunctionstateinfo["BranchCount"] = int(total_branch_count) # overwrite parsed BranchCount with new value self._logger.debug("[StateUtils] evaluateMapState, total_branch_count: " + str(total_branch_count)) + self._logger.info("[StateUtils] evaluateMapState, total_branch_count: " + str(total_branch_count)) + # prepare counter metadata counter_metadata = {} counter_metadata["__state_action"] = "post_map_processing" @@ -459,6 +465,7 @@ def evaluateMapState(self, function_input, key, metadata, sapi): counter_name_value_metadata["__state_action"] = "post_map_processing" counter_name_value_metadata["state_counter"] = metadata["state_counter"] self._logger.debug("[StateUtils] evaluateMapState, metadata[state_counter]: " + str(metadata["state_counter"])) + self._logger.info("[StateUtils] evaluateMapState, metadata[state_counter]: " + str(metadata["state_counter"])) self.mapStateCounter = int(metadata["state_counter"]) counter_name_value = {"__mfnmetadata": counter_name_value_metadata, "__mfnuserdata": '{}'} @@ -506,6 +513,8 @@ def evaluateMapState(self, function_input, key, metadata, sapi): assert py3utils.is_string(workflow_instance_metadata_storage_key) self._logger.debug("[StateUtils] full_metadata_encoded put key: " + str(workflow_instance_metadata_storage_key)) + self._logger.info("[StateUtils] full_metadata_encoded put key: " + str(workflow_instance_metadata_storage_key)) + sapi.put(workflow_instance_metadata_storage_key, json.dumps(metadata)) # Now provide each branch with its own input @@ -523,9 +532,14 @@ def evaluateMapState(self, function_input, key, metadata, sapi): self._logger.debug("\t Map State StartAt:" + startat) self._logger.debug("\t Map State input:" + str(function_input[i])) + self._logger.info("\t Map State StartAt:" + startat) + self._logger.info("\t Map State input:" + str(function_input[i])) + return function_input, metadata def evaluatePostMap(self, function_input, key, metadata, sapi): + self._logger.info("\t inside evaluatePostMap: " + str(function_input)+ " " + str(metadata) + " " + str(sapi)) + name_prefix = self.functiontopic + "_" + key @@ -542,11 +556,15 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): self._logger.debug("\t metadata:" + json.dumps(metadata)) + self._logger.info("\t metadata:" + json.dumps(metadata)) + workflow_instance_metadata_storage_key = str(function_input["WorkflowInstanceMetadataStorageKey"]) assert py3utils.is_string(workflow_instance_metadata_storage_key) full_metadata_encoded = sapi.get(workflow_instance_metadata_storage_key) self._logger.debug("[StateUtils] full_metadata_encoded get: " + str(full_metadata_encoded)) + self._logger.info("[StateUtils] full_metadata_encoded get: " + str(full_metadata_encoded)) + full_metadata = json.loads(full_metadata_encoded) full_metadata["state_counter"] = state_counter @@ -557,6 +575,8 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): branchOutputKeysSet = sapi.retrieveSet(branchOutputKeysSetKey) self._logger.debug("\t branchOutputKeysSet: " + str(branchOutputKeysSet)) + self._logger.info("\t branchOutputKeysSet: " + str(branchOutputKeysSet)) + if not branchOutputKeysSet: self._logger.error("[StateUtils] branchOutputKeysSet is empty") raise Exception("[StateUtils] branchOutputKeysSet is empty") @@ -576,6 +596,8 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): NumBranchesFinished = abs(counterValue) self._logger.debug("\t NumBranchesFinished:" + str(NumBranchesFinished)) + self._logger.info("\t NumBranchesFinished:" + str(NumBranchesFinished)) + do_cleanup = False if klist[-1] == NumBranchesFinished: @@ -583,6 +605,8 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): self._logger.debug("\t do_cleanup:" + str(do_cleanup)) + self._logger.info("\t do_cleanup:" + str(do_cleanup)) + counterName = str(mapInfo["CounterName"]) counter_metadata_key_name = counterName + "_metadata" assert py3utils.is_string(counterName) @@ -610,6 +634,10 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): self._logger.debug("\t mapInfo_BranchOutputKeys length: " + str(len(mapInfo["BranchOutputKeys"]))) + self._logger.info("\t mapInfo_BranchOutputKeys:" + str(mapInfo["BranchOutputKeys"])) + + self._logger.info("\t mapInfo_BranchOutputKeys length: " + str(len(mapInfo["BranchOutputKeys"]))) + for outputkey in mapInfo["BranchOutputKeys"]: outputkey = str(outputkey) if outputkey in branchOutputKeysSet: # mapInfo["BranchOutputKeys"]: @@ -623,15 +651,23 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): self._logger.debug("\t branchOutput:" + branchOutput) self._logger.debug("\t branchOutput_decoded(type):" + str(type(branchOutput_decoded))) self._logger.debug("\t branchOutput_decoded:" + str(branchOutput_decoded)) + self._logger.info("\t branchOutput(type):" + str(type(branchOutput))) + self._logger.info("\t branchOutput:" + branchOutput) + self._logger.info("\t branchOutput_decoded(type):" + str(type(branchOutput_decoded))) + self._logger.info("\t branchOutput_decoded:" + str(branchOutput_decoded)) post_map_output_values = post_map_output_values + [branchOutput_decoded] if do_cleanup: sapi.delete(outputkey) # cleanup the key from data layer self._logger.debug("\t cleaned output key:" + outputkey) + self._logger.info("\t cleaned output key:" + outputkey) else: post_map_output_values = post_map_output_values + [None] self._logger.debug("\t this_BranchOutputKeys is not contained: " + str(outputkey)) + self._logger.info("\t this_BranchOutputKeys is not contained: " + str(outputkey)) + self._logger.debug("\t post_map_output_values:" + str(post_map_output_values)) + self._logger.info("\t post_map_output_values:" + str(post_map_output_values)) while (sapi.get(name_prefix + "_" + "mapStatePartialResult")) == "": time.sleep(0.1) # wait until value is available @@ -640,18 +676,25 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): mapStatePartialResult += post_map_output_values sapi.put(name_prefix + "_" + "mapStatePartialResult", str(mapStatePartialResult)) + time.sleep(5.0) + # now apply ResultPath and OutputPath if do_cleanup: sapi.deleteSet(branchOutputKeysSetKey) - while sapi.get(name_prefix + "_" + "mapStatePartialResult") != str(mapStatePartialResult): - time.sleep(0.1) + while (sapi.get(name_prefix + "_" + "mapInputCount") == ""): + time.sleep(0.1) # wait until value is available if ast.literal_eval(sapi.get(name_prefix + "_" + "mapInputCount")) == len(mapStatePartialResult): + #time.sleep(0.5) # we are ready to publish but need to honour ResultPath and OutputPath + while (sapi.get(name_prefix + "_" +"mapStatePartialResult") == ""): + time.sleep(0.1) res_raw = ast.literal_eval(sapi.get(name_prefix + "_" +"mapStatePartialResult")) + self._logger.info("[StateUtils] evaluatePostMap: res_raw" + str(res_raw) + " vs. " + sapi.get(name_prefix + "_" + "mapInputCount")) + # remove unwanted keys from input before publishing function_input = {} @@ -671,6 +714,9 @@ def evaluatePostMap(self, function_input, key, metadata, sapi): sapi.delete(name_prefix + "_" + "mapStatePartialResult") sapi.delete(name_prefix + "_" + "tobeProcessedlater") post_map_output_values = function_input_post_output + else: + #raise Exception("mapInputCount" + str(sapi.get(name_prefix + "_" + "mapInputCount")) + " does not match mapStatePartialResult: " + str(mapStatePartialResult)) + print("mapInputCount" + str(sapi.get(name_prefix + "_" + "mapInputCount")) + " does not match mapStatePartialResult: " + str(mapStatePartialResult)) return post_map_output_values, full_metadata def evaluateParallelState(self, function_input, key, metadata, sapi): @@ -967,7 +1013,7 @@ def evaluatePostParallel(self, function_input, key, metadata, sapi): def evaluateNonTaskState(self, function_input, key, metadata, sapi): # 3. Evaluate Non Task states - #self._logger.debug("[StateUtils] NonTask state type: " + str(self.functionstatetype)) + self._logger.info("[StateUtils] NonTask state type: " + str(self.functionstatetype)) #self._logger.debug("[StateUtils] Welcome to evaluateNonTaskState! Current key:" + str(key)) function_output = None if self.functionstatetype == StateUtils.choiceStateType: @@ -1094,6 +1140,9 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi): self._logger.debug("[StateUtils] Map state maxConcurrency: " + str(maxConcurrency)) self._logger.debug("[StateUtils] Map state handling") + self._logger.info("[StateUtils] Map state maxConcurrency: " + str(maxConcurrency)) + self._logger.info("[StateUtils] Map state handling metadata: " + str(metadata) ) + if "__state_action" not in metadata or metadata["__state_action"] != "post_map_processing": # here we start the iteration process on a first batch if maxConcurrency != 0: @@ -1103,32 +1152,41 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi): tobeProcessednow = function_input tobeProcessedlater = [] self._logger.debug("[StateUtils] Map state function_input split:" + str(tobeProcessednow) + " " + str(tobeProcessedlater)) + self._logger.info("[StateUtils] Map state function_input split:" + str(tobeProcessednow) + " " + str(tobeProcessedlater)) sapi.put(name_prefix + "_" + "tobeProcessedlater", str(tobeProcessedlater)) # store elements to be processed on DL sapi.put(name_prefix + "_" + "mapStatePartialResult", "[]") # initialise the collector variable sapi.put(name_prefix + "_" + "mapInputCount", str(len(function_input))) - #if "__client_origin_frontend" in metadata and metadata["__client_origin_frontend"] != "": - # sapi.put(name_prefix + "_" + "mapInputOriginFE", str(metadata["__client_origin_frontend"])) + #metadata["__state_action"] = "" function_output, metadata = self.evaluateMapState(tobeProcessednow, key, metadata, sapi) + #metadata["__state_action"] = "" + elif metadata["__state_action"] == "post_map_processing": tobeProcessedlater = ast.literal_eval(sapi.get(name_prefix + "_" + "tobeProcessedlater")) # get all elements that have not yet been processed self._logger.debug("[StateUtils] Map state post_map processing input:" + str(tobeProcessedlater)) + self._logger.info("[StateUtils] Map state post_map processing input:" + str(tobeProcessedlater)) # we need to decide at this point if there is a need for more batches. if so: if len(tobeProcessedlater) > 0: # we need to start another batch - function_output, metadata = self.evaluatePostMap(function_input, key, metadata, sapi) # take care not to overwrite metadata - metadata["__client_origin_frontend"] = self._internal_endpoint - time.sleep(5.0) # allow cleanup of previous execution before launching new - #metadata["__client_origin_frontend"] = sapi.get(name_prefix + "_" + "mapInputOriginFE") - self._logger.debug("[StateUtils] Map state metadata between calls: " + str(metadata)) + self._logger.info("[StateUtils] tobeProcessedlater: " + str(tobeProcessedlater)+ ", function_input: " +str(function_input)) + function_output, metadata2 = self.evaluatePostMap(function_input, key, metadata, sapi) # take care not to overwrite metadata + self._logger.info("[StateUtils] after evaluatPostMap: " + str(function_output)) function_output, metadata = self.evaluateMapState(tobeProcessedlater[:maxConcurrency], key, metadata, sapi) # start a new batch + self._logger.info("[StateUtils] after evaluateMapState:" + str(function_output)) + self._logger.info("[StateUtils] after evaluateMapState, metadata: " + str(metadata)) sapi.put(name_prefix + "_" + "tobeProcessedlater", str(tobeProcessedlater[maxConcurrency:])) # store remaining elements to be processed on DL + self._logger.info("[StateUtils] after sapi.put: " + str(tobeProcessedlater[maxConcurrency:])) + else:# no more batches required. we are at the iteration end, publish the final result self._logger.debug("[StateUtils] Map state input final stage: " + str(function_input)) + self._logger.info("[StateUtils] Map state input final stage: " + str(function_input)) function_output, metadata = self.evaluatePostMap(function_input, key, metadata, sapi) + elif metadata["__state_action"] == '': + raise Exception("Unkown state action in map state") + else: raise Exception("Unknow action type in map state") diff --git a/GUI/app/pages/functions/FunctionTableCtrl.js b/GUI/app/pages/functions/FunctionTableCtrl.js index 64e1ff52..376d6f18 100644 --- a/GUI/app/pages/functions/FunctionTableCtrl.js +++ b/GUI/app/pages/functions/FunctionTableCtrl.js @@ -171,7 +171,6 @@ return selected.length ? mFunction.runtime : 'Not set'; }; - function createTemporaryWorkflow(functionIndex) { var req = { method: 'POST', @@ -636,7 +635,7 @@ }, - data: JSON.stringify({ "action" : "addFunction", "data" : { "user" : { "token" : token } , "function" : { "name" : mFunction.name, "runtime" : mFunction.runtime } } }) + data: JSON.stringify({ "action" : "addFunction", "data" : { "user" : { "token" : token } , "function" : { "name" : mFunction.name, "runtime" : mFunction.runtime, "gpu_usage": mFunction.gpu_usage, "gpu_mem_usage": mFunction.gpu_mem_usage } } }) } $http(req).then(function successCallback(response) { @@ -646,7 +645,7 @@ console.log('new function id:' + response.data.data.function.id); toastr.success('Your function has been created successfully!'); $scope.reloadFunctions(); - $scope.open('app/pages/functions/modals/codeEditorModal.html', 'lg', mFunction.id, mFunction.name, mFunction.status, mFunction.runtime); + $scope.open('app/pages/functions/modals/codeEditorModal.html', 'lg', mFunction.id, mFunction.name, mFunction.status, mFunction.runtime, mFunction.gpu_usage, mFunction.gpu_mem_usage); } else { console.log("Failure status returned by addFunction"); @@ -692,7 +691,7 @@ }, - data: JSON.stringify({ "action" : "modifyFunction", "data" : { "user" : { "token" : token } , "function" : { "id": mFunction.id, "name" : mFunction.name, "runtime" : mFunction.runtime } } }) + data: JSON.stringify({ "action" : "modifyFunction", "data" : { "user" : { "token" : token } , "function" : { "id": mFunction.id, "name" : mFunction.name, "runtime" : mFunction.runtime, "gpu_usage" : mFunction.gpu_usage, "gpu_mem_usage": mFunction.gpu_mem_usage } } }) } $http(req).then(function successCallback(response) { @@ -738,6 +737,8 @@ name: '', status: 'undeployed', runtime: 'Python 3.6', + gpu_usage: '0', + gpu_mem_usage: '0', modified: '0' }; $scope.functions.push($scope.inserted); diff --git a/GUI/app/pages/functions/widgets/editableRowTable.html b/GUI/app/pages/functions/widgets/editableRowTable.html index 8712e8ab..920d1087 100644 --- a/GUI/app/pages/functions/widgets/editableRowTable.html +++ b/GUI/app/pages/functions/widgets/editableRowTable.html @@ -24,6 +24,8 @@ Name Runtime + GPU Cores (%) + GPU Memory (GB) Last Modified Actions @@ -45,6 +47,16 @@ {{ showRuntime(mFunction) }} + + + {{ mFunction.gpu_usage || 'Not set' }} + + + + + {{ mFunction.gpu_mem_usage || 'Not set' }} + + {{ showLastModified(mFunction) }} @@ -66,7 +78,7 @@
- + diff --git a/GUI/app/pages/workflows/WorkflowImportCtrl.js b/GUI/app/pages/workflows/WorkflowImportCtrl.js index 5c5f43e0..6fdc0aa7 100644 --- a/GUI/app/pages/workflows/WorkflowImportCtrl.js +++ b/GUI/app/pages/workflows/WorkflowImportCtrl.js @@ -658,7 +658,7 @@ }, - data: JSON.stringify({ "action" : "addFunction", "data" : { "user" : { "token" : token } , "function" : { "name" : functionName, "runtime" : functionRuntime } } }) + data: JSON.stringify({ "action" : "addFunction", "data" : { "user" : { "token" : token } , "function" : { "name" : functionName, "runtime" : functionRuntime, "gpu_usage": functionGpuusage, "gpu_mem_usage": functionGpuMemusage } } }) } $http(req).then(function successCallback(response) { diff --git a/ManagementService/management_init.py b/ManagementService/management_init.py index 76446f73..59f0c1ec 100755 --- a/ManagementService/management_init.py +++ b/ManagementService/management_init.py @@ -419,7 +419,7 @@ def printUsage(): sys.path.append(workflowdir) if os.getenv("KUBERNETES_PORT", None) != None: import deployWorkflow - url, endpoint_key = deployWorkflow.create_k8s_deployment(email, workflow_info, "Python", management=True) + url, endpoint_key = deployWorkflow.create_k8s_deployment(email, workflow_info, "Python", 0, 0, management=True) DLCLIENT_MANAGEMENT.putMapEntry("Management_workflow_endpoint_map", endpoint_key, url) # Kubernetes mode only has one url endpoint_list = [url] diff --git a/ManagementService/python/addFunction.py b/ManagementService/python/addFunction.py index 8e7c52ae..ec36e54c 100644 --- a/ManagementService/python/addFunction.py +++ b/ManagementService/python/addFunction.py @@ -42,6 +42,8 @@ def handle(value, sapi): else: f["name"] = function["name"] f["runtime"] = function["runtime"] + f["gpu_usage"] = function["gpu_usage"] + f["gpu_mem_usage"] = function["gpu_mem_usage"] f["modified"] = time.time() f["id"] = hashlib.md5(str(uuid.uuid4()).encode()).hexdigest() diff --git a/ManagementService/python/addWorkflow.py b/ManagementService/python/addWorkflow.py index 31cf4ea9..7d5eadfa 100644 --- a/ManagementService/python/addWorkflow.py +++ b/ManagementService/python/addWorkflow.py @@ -78,7 +78,7 @@ def initialize_storage(sapi, wid): def handle(value, sapi): assert isinstance(value, dict) - data = value + data = value # data may contain "null" value response = {} response_data = {} @@ -87,6 +87,7 @@ def handle(value, sapi): email = data["email"] + if "workflow" in data: workflow = data["workflow"] @@ -97,6 +98,11 @@ def handle(value, sapi): wf["status"] = "undeployed" wf["modified"] = time.time() wf["endpoints"] = [] + #wf["gpu_usage"] = None + if "gpu_usage" in workflow: + wf["gpu_usage"] = str(workflow["gpu_usage"]) + #if "gpu_mem_usage" in workflow: + wf["gpu_mem_usage"] = str(workflow["gpu_mem_usage"]) wf['associatedTriggerableTables'] = {} wf['associatedTriggers'] = {} wf["id"] = hashlib.md5(str(uuid.uuid4()).encode()).hexdigest().lower() @@ -104,6 +110,8 @@ def handle(value, sapi): # make a request to elasticsearch to create the workflow index create_workflow_index("mfnwf-" + wf["id"]) + #wf["on_gpu"] = True # add metadata on GPU requirements for this workflow. ToDo: make this configurable via GUI + # initialize global workflow related storage (workflow-private and mfn internal tables) initialize_storage(sapi, wf["id"]) diff --git a/ManagementService/python/deployWorkflow.py b/ManagementService/python/deployWorkflow.py index d9ae5243..c537fe9d 100644 --- a/ManagementService/python/deployWorkflow.py +++ b/ManagementService/python/deployWorkflow.py @@ -28,6 +28,23 @@ WF_TYPE_SAND = 0 WF_TYPE_ASL = 1 +def get_kv_pairs(testdict, keys, dicts=None): + # find and return kv pairs with particular keys in testdict + if not dicts: + dicts = [testdict] + testdict = [testdict] + data = testdict.pop(0) + if isinstance(data, dict): + data = data.values() + for d in data: + if isinstance(d, dict) or isinstance(d, list): # check d for type + testdict.append(d) + if isinstance(d, dict): + dicts.append(d) + if testdict: # no more data to search + return get_kv_pairs(testdict, keys, dicts) + return [(k, v) for d in dicts for k, v in d.items() if k in keys] + def is_asl_workflow(wfobj): return 'StartAt' in wfobj and 'States' in wfobj and isinstance(wfobj['States'], dict) @@ -135,6 +152,12 @@ def compile_resource_info_map(resource_names, uploaded_resources, email, sapi, d if "runtime" in resource_metadata: resource_info["runtime"] = resource_metadata["runtime"] + if "gpu_usage" in resource_metadata: + resource_info["gpu_usage"] = resource_metadata["gpu_usage"] + + if "gpu_mem_usage" in resource_metadata: + resource_info["gpu_mem_usage"] = resource_metadata["gpu_mem_usage"] + num_chunks_str = dlc.get("grain_source_zip_num_chunks_" + resource_id) try: num_chunks = int(num_chunks_str) @@ -204,9 +227,11 @@ def start_docker_sandbox(host_to_deploy, uid, sid, wid, wname, sandbox_image_nam try: print("Starting sandbox docker container for: " + uid + " " + sid + " " + wid + " " + sandbox_image_name) print("Docker daemon: " + "tcp://" + host_to_deploy[1] + ":2375" + ", environment variables: " + str(env_vars)) - client.containers.run(sandbox_image_name, init=True, detach=True, ports={"8080/tcp": None}, ulimits=ulimit_list, auto_remove=True, name=sid, environment=env_vars, extra_hosts={host_to_deploy[0]:host_to_deploy[1]}, log_config=lc) + if sandbox_image_name.endswith("gpu"): + client.containers.run(sandbox_image_name, init=True, detach=True, ports={"8080/tcp": None}, ulimits=ulimit_list, auto_remove=True, name=sid, environment=env_vars, extra_hosts={host_to_deploy[0]:host_to_deploy[1]}, log_config=lc, runtime="nvidia") + else: + client.containers.run(sandbox_image_name, init=True, detach=True, ports={"8080/tcp": None}, ulimits=ulimit_list, auto_remove=True, name=sid, environment=env_vars, extra_hosts={host_to_deploy[0]:host_to_deploy[1]}, log_config=lc) # TEST/DEVELOPMENT: no auto_remove to access sandbox logs - #client.containers.run(sandbox_image_name, init=True, detach=True, ports={"8080/tcp": None}, ulimits=ulimit_list, name=sid, environment=env_vars, extra_hosts={host_to_deploy[0]:host_to_deploy[1]}, log_config=lc) except Exception as exc: print("Error launching sandbox: " + str(host_to_deploy) + " " + uid + " " + sid + " " + wid) print(traceback.format_exc()) @@ -243,7 +268,7 @@ def get_workflow_host_port(host_to_deploy, sid): return success, host_port -def create_k8s_deployment(email, workflow_info, runtime, management=False): +def create_k8s_deployment(email, workflow_info, runtime, gpu_usage, gpu_mem_usage, management=False): # KUBERNETES MODE new_workflow_conf = {} conf_file = '/opt/mfn/SandboxAgent/conf/new_workflow.conf' @@ -296,7 +321,95 @@ def create_k8s_deployment(email, workflow_info, runtime, management=False): env.append({'name': 'WORKFLOWID', 'value': workflow_info["workflowId"]}) env.append({'name': 'WORKFLOWNAME', 'value': workflow_info["workflowName"]}) - # Special handling for the management container + # apply gpu_usage fraction to k8s deployment configuration + print("GPU usage in create_k8s_service: "+ str(gpu_usage)) + print("GPU mem usage in create_k8s_service: "+ str(gpu_mem_usage)) + + use_gpus = gpu_usage + use_mem_gpus = gpu_mem_usage + + if runtime=="Java": # non gpu python function + # overwrite values from values.yaml for new workflows + # only change the image name + imageName = kservice['spec']['template']['spec']['containers'][0]['image'] + imageRepoName = imageName.split("/")[0] + + kservice['spec']['template']['spec']['containers'][0]['image'] = imageRepoName+"/microfn/sandbox_java" + + if not management and use_gpus == 0. and runtime=="Python": # non gpu python function + # overwrite values from values.yaml for new workflows + #kservice['spec']['template']['spec']['containers'][0]['resources']['limits'].pop('nvidia.com/gpu', None) # ['nvidia.com/gpu'] = str(use_gpus) + #kservice['spec']['template']['spec']['containers'][0]['resources']['requests'].pop('nvidia.com/gpu', None) # ['nvidia.com/gpu'] = str(use_gpus) + imageName = kservice['spec']['template']['spec']['containers'][0]['image'] + imageRepoName = imageName.split("/")[0] + + kservice['spec']['template']['spec']['containers'][0]['image'] = imageRepoName+"/microfn/sandbox" + + if not management and use_gpus > 0. and runtime=="Python": # gpu using python function + + # first set default values + vcore = 100 + vmemory = 31 + # use token obtained from kubernetes master to update cluster node properties + + if os.getenv("API_TOKEN") is not None: + new_token=os.getenv("API_TOKEN") + print('getting cluster node capacity info with token' + str(new_token)) + else: + new_token="default" + + try: + resp = requests.get( + "https://kubernetes.default:"+os.getenv("KUBERNETES_SERVICE_PORT_HTTPS")+"/api/v1/nodes", + headers={"Authorization": "Bearer "+new_token, "Accept": "application/json"}, + verify="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + proxies={"https":""}) + if resp.status_code == 200: + data = json.loads(resp.text) + vmemory = 0 + vcore = 0 + + for d in data["items"]: # iterate over the cluster nodes + res_capacity = d["status"]["capacity"] + #print("res_capacity: " + str(res_capacity)) + if "tencent.com/vcuda-memory" in res_capacity.keys(): + vmemory += int(d["status"]["capacity"]["tencent.com/vcuda-memory"]) + vcore += int(d["status"]["capacity"]["tencent.com/vcuda-core"]) + print("found vcuda capability: " + str(vmemory) + " " + str(vcore)) + else: + print("this node has no vcuda capability, skipping") + print('queried cluster node capacities: vcuda-memory: %s, vcuda-core: %s' % (str(vmemory), str(vcore))) + except requests.exceptions.HTTPError as e: + print("Error: could not get cluster node vcuda capacities!") + print(e) + print(resp.text) + + # overwrite values from values.yaml for new workflows + imageName = kservice['spec']['template']['spec']['containers'][0]['image'] + imageRepoName = imageName.split("/")[0] + + # gpu_total_memory = 7800 # hardcoded info (gtx1070), should give free GPU memory + gpu_core_request = str(int(use_gpus)) # derived from GUI float input parameter, yielding core percentage as required by gpu-manager + #gpu_memory_request = str(int(vmemory * use_gpus)) # adapted to gpu-manager memory parameter definition + gpu_memory_request = str(int(use_mem_gpus*4.0)) # gpu-manager requires gpu memory parameter in units of 256 MB + print ("memory request set to %s vcuda units " % gpu_memory_request) + + if int(gpu_memory_request) > int(vmemory): + print("only up to %s GB GPU memory available on the cluster nodes!" % str(int(vmemory))) + gpu_memory_request = str(int(vmemory)) # limit to max available memory + print ("memory set to %s GB " % gpu_memory_request) + + kservice['spec']['template']['spec']['containers'][0]['image'] = imageRepoName+"/microfn/sandbox_gpu" + kservice['spec']['template']['spec']['containers'][0]['resources']['requests']['tencent.com/vcuda-core'] = gpu_core_request #str(use_gpus) + kservice['spec']['template']['spec']['containers'][0]['resources']['requests']['tencent.com/vcuda-memory'] = gpu_memory_request #str(use_gpus) + # calculate limits resource parameters for gpu-manager, need to identical to requests parameter + kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['tencent.com/vcuda-core'] = gpu_core_request #str(use_gpus) + kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['tencent.com/vcuda-memory'] = gpu_memory_request #str(use_gpus) + kservice['spec']['template']['metadata']['annotations']['tencent.com/vcuda-core-limit'] = str(int(vmemory)) #gpu_core_request #ToDo: check value + #kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['aliyun.com/gpu-mem'] = "2" #str(use_gpus) + + + # Special handling for the management container: never run on gpu if management: management_workflow_conf = {} conf_file = '/opt/mfn/SandboxAgent/conf/management_workflow.conf' @@ -310,6 +423,17 @@ def create_k8s_deployment(email, workflow_info, runtime, management=False): kservice['spec']['template']['spec']['containers'][0]['volumeMounts'] = [{'name': 'new-workflow-conf', 'mountPath': '/opt/mfn/SandboxAgent/conf'}] kservice['spec']['template']['spec']['containers'][0]['resources'] = management_workflow_conf['resources'] kservice['spec']['template']['spec']['serviceAccountName'] = new_workflow_conf['mgmtserviceaccount'] + + # management container should not consume a CPU and use standard sandbox image + if (labels['workflowid'] == "Management"): + ###kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = "0" + ###kservice['spec']['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = "0" + imageName = kservice['spec']['template']['spec']['containers'][0]['image'] + imageRepoName = imageName.split("/")[0] + # kservice['spec']['template']['spec']['containers'][0]['image'] = "192.168.8.161:5000/microfn/sandbox" + + kservice['spec']['template']['spec']['containers'][0]['image'] = imageRepoName+"/microfn/sandbox" + if 'HTTP_GATEWAYPORT' in new_workflow_conf: env.append({'name': 'HTTP_GATEWAYPORT', 'value': new_workflow_conf['HTTP_GATEWAYPORT']}) if 'HTTPS_GATEWAYPORT' in new_workflow_conf: @@ -378,6 +502,7 @@ def create_k8s_deployment(email, workflow_info, runtime, management=False): print("ERROR deleting existing kservice") print(resp.text) + # no change for Java function print('Creating new kservice') resp = requests.post( "https://"+os.getenv("KUBERNETES_SERVICE_HOST")+":"+os.getenv("KUBERNETES_SERVICE_PORT_HTTPS")+"/apis/serving.knative.dev/v1/namespaces/"+namespace+"/services", @@ -438,6 +563,8 @@ def handle(value, sapi): raise Exception("malformed input") sapi.log(json.dumps(workflow)) wfmeta = sapi.get(email + "_workflow_" + workflow["id"], True) + print("WFMETA in deployWorkflow: "+ str(wfmeta)) + if wfmeta is None or wfmeta == "": raise Exception("workflow metadata is not valid.") try: @@ -466,6 +593,8 @@ def handle(value, sapi): if is_asl_workflow(wfobj): wf_type = WF_TYPE_ASL + #use_gpus = int(wfmeta._gpu_usage) + success, errmsg, resource_names, uploaded_resources = check_workflow_functions(wf_type, wfobj, email, sapi) if not success: raise Exception("Couldn't deploy workflow; " + errmsg) @@ -499,7 +628,37 @@ def handle(value, sapi): #dlc.put("deployment_info_workflow_" + workflow["id"], json.dumps(deployment_info)) # _XXX_: important! # put must not be queued as the function currently waits for the container to become ready - sapi.put("deployment_info_workflow_" + workflow["id"], json.dumps(deployment_info), True) + # case 1: gpu_usage is explicitly set in workflow metadata + if "gpu_usage" in wfmeta and wfmeta["gpu_usage"] != "None": + gpu_usage = float(wfmeta["gpu_usage"]) + else: + gpu_usage = 0. + + if "gpu_mem_usage" in wfmeta and wfmeta["gpu_mem_usage"] != "None": + gpu_mem_usage = float(wfmeta["gpu_mem_usage"]) + else: + gpu_mem_usage = 0. + + print("deduced gpu_usage from workflow metadata: " + str(gpu_usage)) + print("deduced gpu_mem_usage from workflow metadata: " + str(gpu_mem_usage)) + + print("print deployment_info[resources] to evaluate: " + str(deployment_info["resources"])) + # case 2: gpu_usage is set in deployment info + for res in deployment_info["resources"]: + if "gpu_usage" in deployment_info["resources"][res].keys(): + result_gpu = float(deployment_info["resources"][res]["gpu_usage"]) + print("gpu_usage in loop: " + str(result_gpu)) + if result_gpu > 0.: + gpu_usage += result_gpu + + if "gpu_mem_usage" in deployment_info["resources"][res].keys(): + result_mem_gpu = float(deployment_info["resources"][res]["gpu_mem_usage"]) + if result_mem_gpu > 0.: + gpu_mem_usage += result_mem_gpu + print("gpu_mem_usage in loop: " + str(result_mem_gpu)) + + print("GPUINFO" + str(gpu_usage)+ " " + str(gpu_mem_usage)) + sapi.put("deployment_info_workflow_" + workflow["id"], json.dumps(deployment_info), True, False) status = "deploying" @@ -510,7 +669,8 @@ def handle(value, sapi): runtime = "Java" else: runtime = "Python" - url, endpoint_key = create_k8s_deployment(email, workflow_info, runtime) + + url, endpoint_key = create_k8s_deployment(email, workflow_info, runtime, gpu_usage, gpu_mem_usage) if url is not None and len(url) > 0: status = "deploying" sapi.addSetEntry(workflow_info["workflowId"] + "_workflow_endpoints", str(url), is_private=True) @@ -522,21 +682,50 @@ def handle(value, sapi): status = "failed" else: # We're running BARE METAL mode - # _XXX_: due to the queue service still being in java in the sandbox - sandbox_image_name = "microfn/sandbox" - if any(resource_info_map[res_name]["runtime"] == "Java" for res_name in resource_info_map): + print("gpu_usage before decision:" + str(gpu_usage)) + if gpu_usage > 0: + sandbox_image_name = "microfn/sandbox_gpu" # sandbox uses GPU + elif any(resource_info_map[res_name]["runtime"] == "Java" for res_name in resource_info_map): sandbox_image_name = "microfn/sandbox_java" + else: + sandbox_image_name = "microfn/sandbox" # default value # TODO: intelligence on how to pick hosts hosts = sapi.get("available_hosts", True) - print("available_hosts: " + str(hosts)) - if hosts is not None and hosts != "": - hosts = json.loads(hosts) - deployed_hosts = {} + # hostst is string representation of list or dict + print("available_hosts: " + hosts) + hosts = json.loads(hosts) + + deployed_hosts = {} + if hosts is not None and hosts != "" and isinstance(hosts,dict): + host_has_gpu = False + gpu_hosts = {} + picked_hosts = None + plain_hosts={} + for hostname in hosts: # individual host dict + host_has_gpu = hosts[hostname]["has_gpu"] # check if host has a GPU + hostip = hosts[hostname]["ip"] + plain_hosts[hostname] = hostip # add to general hosts + if host_has_gpu: + gpu_hosts[hostname] = hostip # add to GPU hosts # instruct hosts to start the sandbox and deploy workflow - for hostname in hosts: - hostip = hosts[hostname] + print("selected host:" + str(hostname) + " " + str(hostip)) + #print("founds hosts:" + str(gpu_hosts) + " " + str(plain_hosts)) + if sandbox_image_name == "microfn/sandbox_gpu" and gpu_hosts: + picked_hosts = gpu_hosts + elif sandbox_image_name == "microfn/sandbox_gpu": + # can't deploy; no gpu hosts available. + picked_hosts = {} + elif sandbox_image_name == "microfn/sandbox" or sandbox_image_name=="microfn/sandbox_java": # can use any host + picked_hosts = plain_hosts + + print("picked_hosts: " + str(picked_hosts)) + + for hostname in picked_hosts: # loop over all hosts, need to pich gpu hosts for python/gpu workflows + hostip = hosts[hostname]["ip"] host_to_deploy = (hostname, hostip) + print("host_to_deploy: " + str(host_to_deploy) ) + #host_to_deploy = ("userslfu99", "192.168.8.99") success, endpoint_key = start_docker_sandbox(host_to_deploy, email, workflow_info["sandboxId"], workflow_info["workflowId"], workflow_info["workflowName"], sandbox_image_name) if success: deployed_hosts[hostname] = hostip @@ -558,15 +747,14 @@ def handle(value, sapi): sapi.putMapEntry(workflow_info["workflowId"] + "_sandbox_status_map", endpoint_key, json.dumps(sbinfo), is_private=True) #endpoints = sapi.retrieveMap(workflow_info["workflowId"] + "_workflow_endpoints", True) #sapi.log(str(endpoints)) - - if not bool(deployed_hosts): - status = "failed" - else: - #sapi.log("deployed on hosts: " + json.dumps(deployed_hosts)) - sapi.put(email + "_workflow_hosts_" + workflow["id"], json.dumps(deployed_hosts), True) else: - print("available_hosts is empty. Not deploying") + print("available_hosts is empty or not a dictionary; not deploying...") + + if not bool(deployed_hosts): status = "failed" + else: + #sapi.log("deployed on hosts: " + json.dumps(deployed_hosts)) + sapi.put(email + "_workflow_hosts_" + workflow["id"], json.dumps(deployed_hosts), True) # Update workflow status wfmeta["status"] = status diff --git a/ManagementService/python/modifyFunction.py b/ManagementService/python/modifyFunction.py index 17d03fbe..a10d5eac 100644 --- a/ManagementService/python/modifyFunction.py +++ b/ManagementService/python/modifyFunction.py @@ -49,7 +49,9 @@ def handle(value, sapi): f["name"] = function["name"] f["runtime"] = function["runtime"] f["modified"] = time.time() - + f["gpu_usage"] = function["gpu_usage"] + f["gpu_mem_usage"] = function["gpu_mem_usage"] + sapi.put(email + "_grain_" + function["id"], json.dumps(f), True, True) sapi.put(email + "_list_grains", json.dumps(functions), True, True) diff --git a/ManagementService/schema/mfndata-workflow-example.json b/ManagementService/schema/mfndata-workflow-example.json index 6f0a0933..7de810a7 100644 --- a/ManagementService/schema/mfndata-workflow-example.json +++ b/ManagementService/schema/mfndata-workflow-example.json @@ -52,6 +52,8 @@ "addFunction": { "name": "addFunction", "runtime": "Python 3.7", + "gpu_usage": "0.", + "gpu_mem_usage": "0.", "id": "00000000-0000-0000-0000-222222222222", "modified": 1581498771.096527 } @@ -92,6 +94,8 @@ "name":"addFunction", "id":"00000000-0000-0000-0000-222222222222", "runtime":"Python 3.7", + "gpu_usage":"0.", + "gpu_mem_usage":"0.", "type": "code", "ref": "grain_source_00000000-0000-0000-0000-222222222222" } @@ -160,4 +164,4 @@ "sbox_11111111-1111-1111-1111-111111111111;wf_maps_11111111-1111-1111-1111-111111111111": {}, "sbox_11111111-1111-1111-1111-111111111111;wf_sets_11111111-1111-1111-1111-111111111111": {}, "sbox_11111111-1111-1111-1111-111111111111;wf_counters_11111111-1111-1111-1111-111111111111": {} -} \ No newline at end of file +} diff --git a/Sandbox/Dockerfile_gpu b/Sandbox/Dockerfile_gpu new file mode 100644 index 00000000..8111c016 --- /dev/null +++ b/Sandbox/Dockerfile_gpu @@ -0,0 +1,74 @@ +# Copyright 2020 The KNIX Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#FROM ubuntu:18.04 +FROM nvidia/cuda:10.1-cudnn7-devel-ubuntu18.04 + +# Install (as root) +# Base +RUN apt-get update --fix-missing +RUN apt-get -y --no-install-recommends install build-essential +RUN apt-get -y --no-install-recommends install netbase unzip file libmagic1 + +# Python +RUN apt-get update --fix-missing && apt-get -y --no-install-recommends install python3 python3-dev +RUN apt-get -y --no-install-recommends install python3-pip +RUN apt-get -y --no-install-recommends install zlib1g libssl1.0 libsasl2-2 ca-certificates + +RUN /usr/bin/python3 -m pip install --upgrade pip + +RUN /usr/bin/python3 -m pip install setuptools +RUN /usr/bin/python3 -m pip install thrift>=0.12.0 +RUN /usr/bin/python3 -m pip install anytree +RUN /usr/bin/python3 -m pip install ujsonpath +RUN /usr/bin/python3 -m pip install requests +RUN /usr/bin/python3 -m pip install retry +# remove warnings from anytree package +RUN /usr/bin/python3 -m pip install fastcache +# Needed for multi-language support (currently just Java) +RUN /usr/bin/python3 -m pip install thriftpy2 + +# Add components (as mfn) +RUN groupadd -o -g 1000 -r mfn && useradd -d /opt/mfn -u 1000 -m -r -g mfn mfn +RUN mkdir /opt/mfn/logs + +RUN /usr/bin/python3 -m pip install redis +ADD build/redis-server.tar.gz /opt/mfn/ +ADD frontend/frontend /opt/mfn/frontend +ADD build/SandboxAgent.tar.gz /opt/mfn/ +ADD build/FunctionWorker.tar.gz /opt/mfn/ +ADD build/LoggingService.tar.gz /opt/mfn/ + +# CUDA 10.1 dependencies and tools to build dlib +RUN apt-get -y --no-install-recommends install libsm6 libxrender1 libxrender-dev libxext6 libglib2.0-0 git cmake +RUN apt-get install -y --no-install-recommends libnvinfer6=6.0.1-1+cuda10.1 libnvinfer-dev=6.0.1-1+cuda10.1 libnvinfer-plugin6=6.0.1-1+cuda10.1 + +# Install dlib for CUDA +RUN git clone https://github.com/davisking/dlib.git +RUN mkdir -p /dlib/build + +RUN cmake -H/dlib -B/dlib/build -DDLIB_USE_CUDA=1 -DUSE_AVX_INSTRUCTIONS=1 +RUN cmake --build /dlib/build + +RUN cd /dlib; python3 /dlib/setup.py install + +# Install the face recognition package and tensorflow +#RUN /usr/bin/python3 -m pip install face_recognition +#RUN /usr/bin/python3 -m pip install tensorflow==2.1.0 + + +RUN chown mfn:mfn -R /opt/mfn +USER mfn +WORKDIR /opt/mfn +CMD ["python3", "/opt/mfn/SandboxAgent/sandboxagent.py"] diff --git a/Sandbox/Dockerfile_gpu_opencv b/Sandbox/Dockerfile_gpu_opencv new file mode 100644 index 00000000..588e75f5 --- /dev/null +++ b/Sandbox/Dockerfile_gpu_opencv @@ -0,0 +1,172 @@ +# Copyright 2020 The KNIX Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#FROM ubuntu:18.04 +FROM nvidia/cuda:10.1-cudnn7-devel-ubuntu18.04 + +# Install (as root) +# Base +RUN apt-get update --fix-missing +RUN apt-get -y --no-install-recommends install build-essential +RUN apt-get -y --no-install-recommends install netbase unzip file libmagic1 + + +# Python +RUN apt-get update --fix-missing && apt-get -y --no-install-recommends install python3 python3-dev +RUN apt-get -y --no-install-recommends install python3-pip +RUN apt-get -y --no-install-recommends install zlib1g libssl1.0 libsasl2-2 ca-certificates + +RUN /usr/bin/python3 -m pip install --upgrade pip + +RUN /usr/bin/python3 -m pip install setuptools +RUN /usr/bin/python3 -m pip install thrift>=0.12.0 +RUN /usr/bin/python3 -m pip install anytree +RUN /usr/bin/python3 -m pip install ujsonpath +RUN /usr/bin/python3 -m pip install requests +RUN /usr/bin/python3 -m pip install retry +# remove warnings from anytree package +RUN /usr/bin/python3 -m pip install fastcache +# Needed for multi-language support (currently just Java) +RUN /usr/bin/python3 -m pip install thriftpy2 + +# Add components (as mfn) +RUN groupadd -o -g 1000 -r mfn && useradd -d /opt/mfn -u 1000 -m -r -g mfn mfn +RUN mkdir /opt/mfn/logs + +RUN /usr/bin/python3 -m pip install redis +ADD build/redis-server.tar.gz /opt/mfn/ +ADD frontend/frontend /opt/mfn/frontend +ADD build/SandboxAgent.tar.gz /opt/mfn/ +ADD build/FunctionWorker.tar.gz /opt/mfn/ +ADD build/LoggingService.tar.gz /opt/mfn/ + +# CUDA 10.1 dependencies and tools to build dlib +RUN apt-get -y --no-install-recommends install libsm6 libxrender1 libxrender-dev libxext6 libglib2.0-0 git cmake +RUN apt-get install -y --no-install-recommends libnvinfer6=6.0.1-1+cuda10.1 libnvinfer-dev=6.0.1-1+cuda10.1 libnvinfer-plugin6=6.0.1-1+cuda10.1 + +# Install dlib for CUDA +#RUN git clone https://github.com/davisking/dlib.git +#RUN mkdir -p /dlib/build + +#RUN cmake -H/dlib -B/dlib/build -DDLIB_USE_CUDA=1 -DUSE_AVX_INSTRUCTIONS=1 +#RUN cmake --build /dlib/build + +#RUN cd /dlib; python3 /dlib/setup.py install + + +ENV DEBIAN_FRONTEND noninteractive + +ARG OPENCV_VERSION='4.4.0' +ARG GPU_ARCH='6.1' +WORKDIR /opt + +# Build tools +RUN apt update && \ + apt install -y \ + sudo \ + tzdata \ + git \ + cmake \ + wget \ + unzip \ + build-essential + +# Media I/O: +RUN apt update && apt install -y \ + zlib1g-dev \ + libjpeg-dev \ + libwebp-dev \ + libpng-dev \ + libtiff5-dev \ + libopenexr-dev \ + libgdal-dev \ + libgtk2.0-dev + +# Video I/O: +RUN apt update && apt install -y \ + libdc1394-22-dev \ + libavcodec-dev \ + libavformat-dev \ + libswscale-dev \ + libtheora-dev \ + libvorbis-dev \ + libxvidcore-dev \ + libx264-dev \ + yasm \ + libopencore-amrnb-dev \ + libopencore-amrwb-dev \ + libv4l-dev \ + libxine2-dev \ + libgstreamer1.0-dev \ + libgstreamer-plugins-base1.0-dev \ + libopencv-highgui-dev \ + ffmpeg + +# Parallelism and linear algebra libraries: +RUN apt update && apt install -y \ + libtbb-dev \ + libeigen3-dev + +# Python: +RUN apt update && apt install -y \ + python3-dev \ + python3-tk \ + python3-numpy + +# Build OpenCV +RUN wget https://github.com/opencv/opencv/archive/${OPENCV_VERSION}.zip && \ + unzip ${OPENCV_VERSION}.zip && rm ${OPENCV_VERSION}.zip && \ + mv opencv-${OPENCV_VERSION} OpenCV && \ + cd OpenCV && \ + wget https://github.com/opencv/opencv_contrib/archive/${OPENCV_VERSION}.zip && \ + unzip ${OPENCV_VERSION}.zip && \ + mkdir build && \ + cd build && \ + cmake \ + -D WITH_TBB=ON \ + -D CMAKE_BUILD_TYPE=RELEASE \ + -D BUILD_EXAMPLES=ON \ + -D WITH_FFMPEG=ON \ + -D WITH_V4L=ON \ + -D WITH_OPENGL=ON \ + -D WITH_CUDA=ON \ + -D WITH_CUDNN=ON \ + -D OPENCV_DNN_CUDA=ON \ + -D ENABLE_FAST_MATH=1 \ + -D CUDA_FAST_MATH=1 \ + -D CUDA_ARCH_BIN=7.0 \ + -D CUDA_ARCH_BIN=${GPU_ARCH} \ + -D CUDA_ARCH_PTX=${GPU_ARCH} \ + -D WITH_CUBLAS=ON \ + -D WITH_CUFFT=ON \ + -D WITH_EIGEN=ON \ + -D EIGEN_INCLUDE_PATH=/usr/include/eigen3 \ + -D HAVE_opencv_python3=ON \ + -D OPENCV_EXTRA_MODULES_PATH=../opencv_contrib-${OPENCV_VERSION}/modules/ \ + .. && \ + make all -j$(nproc) && \ + make install + +#RUN apt-get update && apt-get install -y python3-opencv +#RUN pip install opencv-python + +# Install the face recognition package and tensorflow +#RUN /usr/bin/python3 -m pip install face_recognition +#RUN /usr/bin/python3 -m pip install tensorflow==2.1.0 + + +RUN chown mfn:mfn -R /opt/mfn +USER mfn +WORKDIR /opt/mfn +CMD ["python3", "/opt/mfn/SandboxAgent/sandboxagent.py"] diff --git a/Sandbox/Makefile b/Sandbox/Makefile index 8fff8dfe..753eaa64 100644 --- a/Sandbox/Makefile +++ b/Sandbox/Makefile @@ -20,6 +20,8 @@ include ../build_env.mk default: build_thrift \ image \ + image_gpu \ + image_gpu_opencv \ image_java clean: @@ -93,6 +95,25 @@ image: \ build/SandboxAgent.tar.gz $(call build_image,Dockerfile,microfn/sandbox) +image_gpu: \ + Dockerfile_gpu \ + build/redis-server.tar.gz \ + frontend/frontend \ + build/LoggingService.tar.gz \ + build/FunctionWorker.tar.gz \ + build/SandboxAgent.tar.gz + $(call build_image,Dockerfile_gpu,microfn/sandbox_gpu) + +image_gpu_opencv: \ + Dockerfile_gpu_opencv \ + build/redis-server.tar.gz \ + frontend/frontend \ + build/LoggingService.tar.gz \ + build/FunctionWorker.tar.gz \ + build/SandboxAgent.tar.gz + $(call build_image,Dockerfile_gpu_opencv,microfn/sandbox_gpu_opencv) + + image_java: \ Dockerfile_java \ build/redis-server.tar.gz \ @@ -103,8 +124,10 @@ image_java: \ build/SandboxAgent.tar.gz $(call build_image,Dockerfile_java,microfn/sandbox_java) -push: image image_java +push: image image_gpu image_java image_gpu_opencv $(call push_image,microfn/sandbox) + $(call push_image,microfn/sandbox_gpu) + $(call push_image,microfn/sandbox_gpu_opencv) $(call push_image,microfn/sandbox_java) diff --git a/SandboxAgent/deployment.py b/SandboxAgent/deployment.py index 8accc9ef..19658334 100644 --- a/SandboxAgent/deployment.py +++ b/SandboxAgent/deployment.py @@ -378,7 +378,7 @@ def _install_sandbox_requirements(self, parameters): else: cmd = "python " cmd = cmd + "-m pip install --user" - cmd += " --no-compile --no-clean" + cmd += " --no-compile --no-clean --no-cache-dir" for opt in additional_installer_options: cmd = cmd + " " + opt + " " + additional_installer_options[opt] diff --git a/deploy/ansible/Makefile b/deploy/ansible/Makefile index 07b32080..141dd369 100644 --- a/deploy/ansible/Makefile +++ b/deploy/ansible/Makefile @@ -21,7 +21,7 @@ NAMES := $(YAML:%.yaml=%) .PHONY: $(NAMES) default: prepare_packages install -install: init_once riak elasticsearch fluentbit datalayer sandbox management nginx triggers_frontend +install: init_once installnvidiadocker riak elasticsearch fluentbit datalayer sandbox management nginx triggers_frontend echo "Installed KNIX MicroFunctions" clean: diff --git a/deploy/ansible/README.md b/deploy/ansible/README.md index 1eb144ce..375e47a0 100644 --- a/deploy/ansible/README.md +++ b/deploy/ansible/README.md @@ -125,6 +125,7 @@ Note that for a local installation, your public key should be put into `~/.ssh/a # For a single remote host installation, the hostname should be added to all groups. # For a cluster of hosts (preferably 3 or more), all host names must be added to [riak] group. + # Hosts with a NVIDIA GPU should be added to group [gpu_workstations] so that they can be used by KNIX workflow deployments. # [nginx] and [elasticsearch] group should contain a single host. # At least one host should be in [management] and [triggers_frontend] group. ``` diff --git a/deploy/ansible/init_once.yaml b/deploy/ansible/init_once.yaml index 85b42da7..7ec11baa 100644 --- a/deploy/ansible/init_once.yaml +++ b/deploy/ansible/init_once.yaml @@ -153,13 +153,13 @@ ExecStart= ExecStart=/usr/bin/dockerd -H unix:// -H tcp://0.0.0.0:2375 - - name: override configuration for docker daemon to disable tls - copy: - dest: /etc/docker/daemon.json - content: | - { - "tls": false - } +# - name: override configuration for docker daemon to disable tls +# copy: +# dest: /etc/docker/daemon.json +# content: | +# { +# "tls": false +# } - name: configure docker proxy copy: diff --git a/deploy/ansible/installnvidiadocker.yaml b/deploy/ansible/installnvidiadocker.yaml new file mode 100644 index 00000000..865da857 --- /dev/null +++ b/deploy/ansible/installnvidiadocker.yaml @@ -0,0 +1,10 @@ +--- +- hosts: all + vars: + proxy_env: + http_proxy: "{{ lookup('env','http_proxy') }}" + https_proxy: "{{ lookup('env','https_proxy') }}" + become: true + roles: + - ../.. + diff --git a/deploy/ansible/inventory.cfg.sample b/deploy/ansible/inventory.cfg.sample index 014d5583..b6944a46 100644 --- a/deploy/ansible/inventory.cfg.sample +++ b/deploy/ansible/inventory.cfg.sample @@ -10,5 +10,8 @@ knix-test [nginx] knix-test +[gpu_workstations] +knix-test + [triggers_frontend] knix-test diff --git a/deploy/ansible/sandbox.yaml b/deploy/ansible/sandbox.yaml index 8c1ce570..b3f25e53 100644 --- a/deploy/ansible/sandbox.yaml +++ b/deploy/ansible/sandbox.yaml @@ -43,6 +43,11 @@ - sandbox installation directory {{ install_dir }} - riak_connect = {{ riak_connect }} + - name: get GPU environment + shell: "su - -c 'echo $KNIX_node_hasGPU'" + become: true + register: KNIX_node_hasGPU + - name: get http_proxy shell: "su - -c 'echo $http_proxy'" become: true @@ -62,12 +67,14 @@ http_proxy: "{{ http_proxy.stdout }}" https_proxy: "{{ https_proxy.stdout }}" no_proxy: "{{ no_proxy.stdout }}" + KNIX_node_hasGPU: "{{ KNIX_node_hasGPU.stdout }}" - debug: msg: - http_proxy = {{ http_proxy }} - https_proxy = {{ https_proxy }} - no_proxy = {{ no_proxy }} + - KNIX_node_hasGPU = {{KNIX_node_hasGPU }} ################# - name: create sandbox folder @@ -161,6 +168,7 @@ no_proxy={{ no_proxy }} HTTP_PROXY={{ http_proxy }} HTTPS_PROXY={{ https_proxy }} + KNIX_node_hasGPU={{ KNIX_node_hasGPU }} mode: '0755' - name: create available_hosts.sh @@ -194,6 +202,7 @@ no_proxy: "{{ no_proxy }}" HTTP_PROXY: "{{ http_proxy }}" HTTPS_PROXY: "{{ https_proxy }}" + KNIX_node_hasGPU: "{{ KNIX_node_hasGPU }}" register: availablehosts - debug: diff --git a/deploy/ansible/scripts/available_hosts.py b/deploy/ansible/scripts/available_hosts.py index df94cb00..375086ef 100755 --- a/deploy/ansible/scripts/available_hosts.py +++ b/deploy/ansible/scripts/available_hosts.py @@ -21,6 +21,8 @@ import sys import riak import socket +import subprocess +#import platform ### global variables set at runtime DLCLIENT=None @@ -61,6 +63,13 @@ def add_host(hostname,hostip=None): if hostip is None: hostip = socket.gethostbyname(hostname) print("Adding host: " + str(hostname)) + + hasGPU = False + # get environment of current hostname + if os.environ['KNIX_node_hasGPU'] == "True": + print("found GPU environment: " +str(os.environ['KNIX_node_hasGPU']) ) + hasGPU = True + v = dl_get("available_hosts") if v.encoded_data is not None and len(v.encoded_data) > 0: hosts = json.loads((v.encoded_data).decode()) @@ -69,10 +78,26 @@ def add_host(hostname,hostip=None): hosts = {host: socket.gethostbyname(host) for host in hosts} else: hosts = {} - if hostname != None and hostname not in hosts: - hosts[hostname] = hostip - v.encoded_data = json.dumps(hosts).encode() - v.store() + + cur_entry2 = {} + + if hostname is not None and hostname in hosts: + cur_entry = hosts[hostname] + if isinstance(cur_entry, str): + hostip = cur_entry + del hosts[hostname] + elif isinstance(cur_entry, dict): + cur_entry2 = cur_entry + + cur_entry2["ip"] = hostip + cur_entry2["has_gpu"] = hasGPU + + hosts[hostname] = cur_entry2 + + v.encoded_data = json.dumps(hosts).encode() + v.store() + + print("found hosts: " + str(hosts)) return hosts @@ -85,10 +110,13 @@ def remove_host(hostname): hosts = {host: socket.gethostbyname(host) for host in hosts} else: hosts = {} + if hostname != None and hostname in hosts: del hosts[hostname] v.encoded_data = json.dumps(hosts).encode() v.store() + + print("found hosts: " + str(hosts)) return hosts @@ -99,22 +127,24 @@ def remove_host(hostname): workflowid = "Management" hosts = [] set_bucket_name(sandboxid,workflowid) - try: - host=defaulthost - if len(sys.argv) > 2: - host = sys.argv[2] - if sys.argv[1] == "add": - hosts = add_host(host) - elif sys.argv[1] == "remove": - hosts = remove_host(host) - else: - raise Exception() - except Exception as e: - print(e) + + host=defaulthost + if len(sys.argv) > 2: + host = sys.argv[2] + + if len(sys.argv) <= 1: print("usage: python "+sys.argv[0]+" [add|remove] ()") print(" optional defaults to %s" % defaulthost) + sys.exit(1) + + if sys.argv[1] == "add": + hosts = add_host(host) + elif sys.argv[1] == "remove": + hosts = remove_host(host) + else: v = dl_get("available_hosts") if v.encoded_data is not None and len(v.encoded_data) > 0: hosts = json.loads((v.encoded_data).decode()) - print("Current available_hosts=" + str(hosts)) + + print("Current available_hosts=" + str(hosts)) diff --git a/deploy/ansible/scripts/install-nvidia-docker.sh b/deploy/ansible/scripts/install-nvidia-docker.sh new file mode 100755 index 00000000..c60d6e89 --- /dev/null +++ b/deploy/ansible/scripts/install-nvidia-docker.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Copyright 2020 The KNIX Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +git clone https://github.com/NVIDIA/ansible-role-nvidia-docker.git ansible-role-nvidia-docker +cd ansible-role-nvidia-docker +cp ../installnvidiadocker.yaml tests/installnvidiadocker.yaml +ansible-playbook gpu_workstations --inventory ../inventory.cfg tests/installnvidiadocker.yaml +cd .. +rm -r -f ansible-role-nvidia-docker + diff --git a/deploy/helm/Makefile b/deploy/helm/Makefile index 1811f0e1..1b06e2ec 100644 --- a/deploy/helm/Makefile +++ b/deploy/helm/Makefile @@ -39,4 +39,4 @@ push: make -C ../../TriggersFrontend push deploy: push - helm install --name mfn microfunctions/ \ No newline at end of file + helm install mfn microfunctions/ diff --git a/deploy/helm/helm_deploy_with_gpu.sh b/deploy/helm/helm_deploy_with_gpu.sh new file mode 100755 index 00000000..e37cb3ce --- /dev/null +++ b/deploy/helm/helm_deploy_with_gpu.sh @@ -0,0 +1,22 @@ +# Check all possible clusters, as your .KUBECONFIG may have multiple contexts: +#kubectl config view -o jsonpath='{"Cluster name\tServer\n"}{range .clusters[*]}{.name}{"\t"}{.cluster.server}{"\n"}{end}' + +# Select name of cluster you want to interact with from above output: +#export CLUSTER_NAME="cluster.local" +CLUSTER_NAME=$(kubectl config view -o jsonpath="{.clusters[0].name}") + +# Point to the API server referring the cluster name +APISERVER=$(kubectl config view -o jsonpath="{.clusters[?(@.name==\"$CLUSTER_NAME\")].cluster.server}") + +# Gets the token value a access api/v1/nodes +TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='default')].data.token}"|base64 --decode) +echo "This is the found api access token:" +echo $TOKEN + +#REGISTRY=192.168.8.181:5000 + +helm install mfn $PWD/microfunctions --set apiKey=$TOKEN --set persistence.storageClass=manual + + +helm upgrade mfn $PWD/microfunctions --set apiKey=$TOKEN --set persistence.storageClass=manual + diff --git a/deploy/helm/microfunctions/Chart.yaml b/deploy/helm/microfunctions/Chart.yaml index 324f562b..47e75606 100644 --- a/deploy/helm/microfunctions/Chart.yaml +++ b/deploy/helm/microfunctions/Chart.yaml @@ -16,4 +16,4 @@ apiVersion: v1 appVersion: "latest" description: High Performance Serverless system name: MicroFunctions -version: 0.0.1 \ No newline at end of file +version: 0.0.1 diff --git a/deploy/helm/microfunctions/README_GPU_Installation.md b/deploy/helm/microfunctions/README_GPU_Installation.md new file mode 100644 index 00000000..6ff132dc --- /dev/null +++ b/deploy/helm/microfunctions/README_GPU_Installation.md @@ -0,0 +1,311 @@ +# Installing GPU node and adding it to a KNIX cluster + + + +This is a guide on how to install a GPU node and join it in a running Kubernetes cluster deployed with kubeadm. The guide was tested on a Kubernetes cluster v1.16.6 installed with kubespray, where cluster nodes can be depoyed as VMs using vagrant. VMs in this configuration are running Ubuntu 16.04.4 LTS. + +The node with GPU has a single NVIDIA GTX1050 GPU card. + + +## Step-by-step guide + +1. We start with a blank node with a GPU. This is the node, we would like to join in our Kubernetes cluster. First, update the node and install graphic drivers. The version of the drivers has to be at least 361.93\. We have installed version 450.51.05 and CUDA Version 11.0\. Drivers and CUDA installation is not a part of this guide. + + **NVIDIA drivers information** + +```bat +ksatzke@gpuhost:~$ nvidia-smi +Thu Jul 23 10:57:05 2020 ++-----------------------------------------------------------------------------+ +| NVIDIA-SMI 450.51.05 Driver Version: 450.51.05 CUDA Version: 11.0 | +|-------------------------------+----------------------+----------------------+ +| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | +| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | +| | | MIG M. | +|===============================+======================+======================| +| 0 GeForce GTX 1050 On | 00000000:01:00.0 On | N/A | +| 30% 46C P0 N/A / 65W | 604MiB / 1992MiB | 2% Default | +| | | N/A | ++-------------------------------+----------------------+----------------------+ ++-----------------------------------------------------------------------------+ +| Processes: | +| GPU GI CI PID Type Process name GPU Memory | +| ID ID Usage | +|=============================================================================| +| 0 N/A N/A 2163 G /usr/lib/xorg/Xorg 369MiB | +| 0 N/A N/A 2904 G /usr/bin/gnome-shell 182MiB | +| 0 N/A N/A 3000 G /usr/lib/firefox/firefox 1MiB | +| 0 N/A N/A 8757 G /usr/lib/firefox/firefox 1MiB | +| 0 N/A N/A 11670 C ...ffice/program/soffice.bin 41MiB | +| 0 N/A N/A 16245 G /usr/lib/firefox/firefox 1MiB | ++-----------------------------------------------------------------------------+ +``` +**CUDA information** + +```bat +ksatzke@gpuhost:~$ cat /usr/local/cuda-10.1/version.txt +CUDA Version 10.1.243 +``` + +2. The next step is to install Docker on the GPU node. Install Docker CE 19.03 from Docker’s repositories for Ubuntu. Proceed with the following commands as a root user. +```bat +sudo apt-get update +sudo apt-get install -y \ + apt-transport-https \ + ca-certificates \ + curl \ + software-properties-common +curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - +sudo add-apt-repository \ + "deb https://download.docker.com/linux/$(. /etc/os-release; echo "$ID") \ + $(lsb_release -cs) \ + stable" +sudo apt-get update && sudo apt-get install -y docker-ce=$(apt-cache madison docker-ce | grep 19.03 | head -1 | awk '{print $3}') +``` + +**Docker installation test** +```bat +ksatzke@gpuhost:~$ docker -–version + +Docker version 19.03.11, build 42e35e61f3 +``` + +3. On the GPU node, add nvidia-docker package repositories, install it and reload Docker daemon configuration, which might be altered by nvidia-docker installation. + Note that with the release of Docker 19.03, usage of nvidia-docker2 packages are deprecated since NVIDIA GPUs are now natively supported as devices in the Docker runtime. + +```bat +# Add the package repositories +distribution=$(. /etc/os-release;echo $ID$VERSION_ID) +curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add - +curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list + +sudo apt-get update && sudo apt-get install -y nvidia-container-toolkit + +sudo systemctl restart docker +``` + +**nvidia-docker GPU test** + +```bat +ksatzke@gpuhost:~$ docker run --runtime=nvidia --rm nvidia/cuda nvidia-smi +Thu Jul 23 09:17:18 2020 ++-----------------------------------------------------------------------------+ +| NVIDIA-SMI 450.51.05 Driver Version: 450.51.05 CUDA Version: 11.0 | +|-------------------------------+----------------------+----------------------+ +| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | +| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | +| | | MIG M. | +|===============================+======================+======================| +| 0 GeForce GTX 1050 On | 00000000:01:00.0 On | N/A | +| 30% 44C P0 N/A / 65W | 749MiB / 1992MiB | 1% Default | +| | | N/A | ++-------------------------------+----------------------+----------------------+ ++-----------------------------------------------------------------------------+ +| Processes: | +| GPU GI CI PID Type Process name GPU Memory | +| ID ID Usage | +|=============================================================================| ++-----------------------------------------------------------------------------+ +``` + +4. Set nvidia-runtime as the default runtime for Docker on the GPU node. Edit the ```bat /etc/docker/daemon.json``` configuration file and set the ```bat default-runtime``` parameter to nvidia. This also allows us to ommit the ```bat –runtime=nvidia``` parameter for Docker. +```bat +{ + "default-runtime": "nvidia", + "runtimes": { + "nvidia": { + "path": "/usr/bin/nvidia-container-runtime", + "runtimeArgs": [] + } + } + } +``` + +5. As a root user on the GPU node, add Kubernetes package repositories and install kubeadm, kubectl and kubelet. Then turn the swap off as it is not supported by Kubernetes. + +```bat +apt-get update && apt-get install -y apt-transport-https +curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - +cat </etc/apt/sources.list.d/kubernetes.list +deb http://apt.kubernetes.io/ kubernetes-xenial main +EOF +apt-get update +apt-get install -y kubelet kubeadm kubectl +# turn off swap or comment the swap line in /etc/fstab +sudo swapoff -a +``` +**Specific version installation; e.g., 1.****16****.****6****-00** + +```bat +# install aptitude, an interface to package manager +ksatzke@gpuhost:~$: apt install aptitude -y + +# show available kubeadm versions in the repositories +ksatzke@gpuhost:~$ aptitude versions kubeadm +Package kubeadm: +p 1.5.7-00 kubernetes-xenial 500 +p 1.6.1-00 kubernetes-xenial 500 +p 1.6.2-00 kubernetes-xenial 500 +... +p 1.16.5-00 kubernetes-xenial 500 +p 1.16.6-00 kubernetes-xenial 500 +... + +# install specific version of kubelet, kubeadm and kubectl +ksatzke@gpuhost:~$: apt-get install -y kubelet=1.16.6-00 kubeadm=1.16.6-00 kubectl=1.16.6-00 +``` + +6. On the GPU node, edit the /etc/systemd/system/kubelet.service.d/10-kubeadm.conf file and add the following environment argument to enable the DevicePlugins feature gate. If there is already Accelerators feature gate set, remove it. +```bat +Environment="KUBELET_EXTRA_ARGS=--feature-gates=DevicePlugins=true" +``` + +**/etc/systemd/system/kubelet.service.d/10-kubeadm.conf** + +Note: This drop-in only works with kubeadm and kubelet v1.11+ + +```bat +[Service] +Environment="KUBELET_KUBECONFIG_ARGS=--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf -–kubeconfig=/etc/kubernetes/kubelet.conf" + +Environment="KUBELET_CONFIG_ARGS=--config=/var/lib/kubelet/config.yaml" +Environment="KUBELET_EXTRA_ARGS=--feature-gates=DevicePlugins=true" + +# This is a file that "kubeadm init" and "kubeadm join" generates at runtime, populating the KUBELET_KUBEADM_ARGS variable dynamically + +EnvironmentFile=-/var/lib/kubelet/kubeadm-flags.env +# This is a file that the user can use for overrides of the kubelet args as a last resort. Preferably, the user should use +# the .NodeRegistration.KubeletExtraArgs object in the configuration files instead. KUBELET_EXTRA_ARGS should be sourced from this file. + +EnvironmentFile=-/etc/default/kubelet +ExecStart= +ExecStart=/usr/local/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_CONFIG_ARGS $KUBELET_KUBEADM_ARGS $KUBELET_EXTRA_ARGS + +``` + +On the GPU node, reload and restart kubelet to apply previous changes to the configuration. + +```bat +sudo systemctl daemon-reload +sudo systemctl restart kubelet +``` + +7. If not already done, enable GPU support on the Kubernetes master by deploying following Daemonset. + +```bat +kubectl create -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.6.0/nvidia-device-plugin.yml +``` + +8. For the simplicity, generate a new token on the Kubernetes master and print the join command. + +```bat +ksatzke@node1:~$ sudo kubeadm token create --print-join-command + +kubeadm join 192.168.1.161:6443 --token gxzpmv.hzqw4q0xxrw8zai7 --discovery-token-ca-cert-hash sha256:696c21540f4de7bd600be843dddc1b362582f4a378547c2cb0d37f3be40d5699 +``` + +9. Go back to the GPU node and use the printed join command to add GPU node into the cluster. + +```bat +ksatzke@gpuhost:~$ sudo kubeadm join 192.168.1.159:6443 --token gxzpmv.hzqw4q0xxrw8zai7 --discovery-token-ca-cert-hash + sha256:696c21540f4de7bd600be843dddc1b362582f4a378547c2cb0d37f3be40d5699 +[preflight] Running pre-flight checks + [WARNING IsDockerSystemdCheck]: detected "cgroupfs" as the Docker cgroup driver. The recommended driver is "systemd". Please follow the guide at https://kubernetes.io/docs/setup/cri/ + [WARNING SystemVerification]: this Docker version is not on the list of validated versions: 19.03.11. Latest validated version: 18.09 +[preflight] Reading configuration from the cluster... +[preflight] FYI: You can look at this config file with 'kubectl -n kube-system get cm kubeadm-config -oyaml' +W0723 13:19:02.377909 27185 defaults.go:199] The recommended value for "clusterDNS" in "KubeletConfiguration" is: [10.233.0.10]; the provided value is: [169.254.25.10] +[kubelet-start] Downloading configuration for the kubelet from the "kubelet-config-1.16" ConfigMap in the kube-system namespace +[kubelet-start] Writing kubelet configuration to file "/var/lib/kubelet/config.yaml" +[kubelet-start] Writing kubelet environment file with flags to file "/var/lib/kubelet/kubeadm-flags.env" +[kubelet-start] Activating the kubelet service +[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap... + +This node has joined the cluster: +* Certificate signing request was sent to apiserver and a response was received. +* The Kubelet was informed of the new secure connection details. + +Run 'kubectl get nodes' on the control-plane to see this node join the cluster. +``` + +10. Run following command on master to see the GPU node (gpuhost) status on the cluster. + +```bat + ksatzke@node1:~$ kubectl get nodes + +NAME STATUS ROLES AGE VERSION +gpuhost NotReady 2m12s v1.16.6 +node1 Ready master 19h v1.16.6 +node2 Ready 19h v1.16.6 +node3 Ready 19h v1.16.6 +node4 Ready 19h v1.16.6 +``` + +11. After a while, the node is ready. + +```bat +gpuhost Ready 7m v1.16.6 +``` + +12. Now we have a GPU node ready in our KNIX Kubernetes cluster. We can label this recently added node (gpuhost) with the "accelerator" type by running following command on the master. + +```bat +kubectl label nodes gpuhost accelerator=nvidia-gtx-1050 +``` + +13. To check nodes for accelerator label, run +```bat +kubectl get nodes -L accelerator +``` +on Kubernetes master. + +```bat +ksatzke@gpuhost:~/kubernetes$ kubectl get nodes -L accelerator + +NAME STATUS ROLES AGE VERSION ACCELERATOR + +gpuhost Ready 18m v1.16.6 nvidia-gtx-1050 +node1 Ready master 19h v1.16.6 +node2 Ready 19h v1.16.6 +node3 Ready 19h v1.16.6 +node4 Ready 19h v1.16.6 +``` + +14. To test the GPU nodes, go to the master and create a yml file with the following content and execute it. + +**gpu-test.yml** + +```yaml +apiVersion: v1 +kind: Pod +metadata: + name: cuda-vector-add +spec: + restartPolicy: OnFailure + containers: + - name: cuda-vector-add + # https://github.com/kubernetes/kubernetes/blob/v1.7.11/test/images/nvidia-cuda/Dockerfile + image: "k8s.gcr.io/cuda-vector-add:v0.1" + resources: + limits: + nvidia.com/gpu: 1 # requesting 1 GPU per container +nodeSelector: + accelerator: nvidia-gtx-1050 # or other nvidia GPU type etc. +``` + +```bat +ksatzke@node1:~/kubernetes$ kubectl create -f gpu-test.yml +pod "cuda-vector-add" created + +ksatzke@node1:~/kubernetes$ kubectl get pods +NAME READY STATUS RESTARTS AGE +cuda-vector-add 0/1 Completed 0 19s + +ksatzke@node1:~/kubernetes$ kubectl logs cuda-vector-add +[Vector addition of 50000 elements] +Copy input data from the host memory to the CUDA device +CUDA kernel launch with 196 blocks of 256 threads +Copy output data from the CUDA device to the host memory +Test PASSED +Done +``` diff --git a/deploy/helm/microfunctions/templates/management.yaml b/deploy/helm/microfunctions/templates/management.yaml index 127838ab..127c417e 100644 --- a/deploy/helm/microfunctions/templates/management.yaml +++ b/deploy/helm/microfunctions/templates/management.yaml @@ -27,14 +27,14 @@ metadata: name: "{{ template "manager.fullname" . }}-role" namespace: {{ .Release.Namespace }} rules: - # Allow reading/writing "knative services" + # Allow reading/writing "knative services" - apiGroups: ["serving.knative.dev"] resources: ["services"] verbs: ["get", "list", "create", "update", "delete"] - apiGroups: ["serving.knative.dev"] resources: ["configurations", "routes", "revisions"] verbs: ["get", "list"] - # Allow reading/writing "pods" + # Allow reading/writing "pods" - apiGroups: [""] resources: ["pods", "pods/status", "pods/log"] verbs: ["*"] @@ -67,10 +67,10 @@ metadata: name: "wf-{{ .Release.Name }}-role" namespace: {{ .Release.Namespace }} rules: - # Allow reading "endpoints" + # Allow reading "endpoints" and "nodes" - apiGroups: [""] resources: ["endpoints","pods"] - verbs: ["get","list"] + verbs: ["get", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -136,16 +136,16 @@ data: "workflowid": null } }, - "spec": { + "spec": { {{/* not allowed in KNative - "nodeSelector": {{ toJson .Values.manager.nodeSelector }}, - "tolerations": {{ toJson .Values.manager.tolerations }}, + "nodeSelector": {{ toJson .Values.manager.nodeSelector }}, + "tolerations": {{ toJson .Values.manager.tolerations }}, */}} {{/* not allowed in KNative - "affinity": { - "podAffinity": { + "affinity": { + "podAffinity": { "preferredDuringSchedulingIgnoredDuringExecution": [{ - "weight": 100, + "weight": 100, "podAffinityTerm": { "topologyKey":"kubernetes.io/hostname", "labelSelector": { @@ -162,8 +162,8 @@ data: "serviceAccountName": "wf-{{ .Release.Name }}", {{- else -}} "serviceAccountName": "default", - {{- end -}} - "imagePullSecrets": {{ toJson .Values.manager.setup.imagePullSecrets }}, + {{- end -}} + "imagePullSecrets": {{ toJson .Values.manager.setup.imagePullSecrets }}, "containers": [ { "securityContext": { "runAsUser": 1000 }, @@ -173,16 +173,17 @@ data: "env": [ {{/* not allowed in KNative {"name": "MFN_HOSTNAME", "valueFrom": { "fieldRef": { "fieldPath": "spec.nodeName" }}}, - */}} + */}} {"name": "MFN_DATALAYER", "value": "datalayer.{{ .Release.Namespace }}.svc:{{ .Values.datalayer.port }}" }, {"name": "MFN_ELASTICSEARCH", "value": {{ include "esConnect.url" . | quote }} }, {"name": "MFN_MANAGEMENT", "value": "http://wf-{{ .Release.Name }}-management.{{ .Release.Namespace }}.svc" }, + {"name": "API_TOKEN", "value": "{{.Values.apiKey}}" }, {"name": "LOG_LEVEL", "value": "{{ .Values.manager.sandbox.logLevel }}" } ], {{/* not allowed in KNative "lifecycle": {"preStop": {"exec":{"command":["python3","/sand/SandboxAgent/shutdown.py"]}}}, */}} - "resources": {{ toJson .Values.manager.sandbox.resources }}, + "resources": {{ toJson .Values.manager.sandbox.resources }}, "ports": [ {"containerPort":8080} ] @@ -195,6 +196,7 @@ data: new_workflow.conf: |- { "app.fullname.prefix": "wf-{{ .Release.Name }}", + "image.PythonGPU": "{{ .Values.imageRepo }}{{ .Values.manager.sandbox.imagePathPythonGPU }}:{{ .Values.manager.sandbox.imageTag | default .Values.imageTag | default .Chart.AppVersion}}", "image.Python": "{{ .Values.imageRepo }}{{ .Values.manager.sandbox.imagePathPython }}:{{ .Values.manager.sandbox.imageTag | default .Values.imageTag | default .Chart.AppVersion }}", "image.Java": "{{ .Values.imageRepo }}{{ .Values.manager.sandbox.imagePathJava }}:{{ .Values.manager.sandbox.imageTag | default .Values.imageTag | default .Chart.AppVersion }}", {{- if (.Values.manager.createServiceAccounts) -}} @@ -250,13 +252,13 @@ spec: tolerations: {{- toYaml . | nindent 8 }} {{- end }} - imagePullSecrets: + imagePullSecrets: {{- toYaml .Values.manager.setup.imagePullSecrets | nindent 8 }} containers: - name: management image: "{{ .Values.imageRepo }}{{ .Values.manager.setup.imagePath }}:{{ .Values.manager.setup.imageTag | default .Values.imageTag | default .Chart.AppVersion }}" imagePullPolicy: "{{ .Values.manager.setup.imagePullPolicy }}" - + env: - name: MFN_HOSTNAME valueFrom: diff --git a/deploy/helm/microfunctions/values.yaml b/deploy/helm/microfunctions/values.yaml index eef1ba44..0f254fe5 100644 --- a/deploy/helm/microfunctions/values.yaml +++ b/deploy/helm/microfunctions/values.yaml @@ -19,13 +19,13 @@ #------------------------------------------------------------------------------ # MicroFunction management workflow #------------------------------------------------------------------------------ -imageRepo: "registry.kube-system.svc.cluster.local" -# imageTag: "latest" <- set all KNIX image tags to latest +imageRepo: "registry.kube-system.svc.cluster.local:5000" +apiKey: "abcdef" +#imageRepo: "localhost:5000" # kubespray deployment manager: - #httpProxy: "http://:" - #httpsProxy: "http://:" - #httpGatewayPort: 80 - #httpsGatewayPort: 443 + #httpProxy: "http://192.109.76.93:8080" + httpsProxy: "http://192.109.76.93:8080" + #httpGatewayPort: 31380 # kubespray deployment nameOverride: "microfunctions" newWorkflow: hpa: @@ -51,14 +51,17 @@ manager: logLevel: "info" # imageTag: "latest" (default .Chart.AppVersion) imagePullPolicy: "Always" + env: + name: API_TOKEN + value: "123test" imagePullSecrets: [] resources: limits: cpu: 1 - memory: 2Gi + memory: 4Gi requests: cpu: 1 - memory: 1Gi + memory: 2Gi managementSandbox: resources: limits: @@ -75,7 +78,8 @@ manager: # MicroFunction Datalayer #------------------------------------------------------------------------------ datalayer: - replicas: 3 + replicas: 1 # kubespray deployment + #replicas: 3 imagePath: "/microfn/datalayer" # imageTag: "latest" (default .Chart.AppVersion) imagePullPolicy: "Always" @@ -85,20 +89,21 @@ datalayer: javaOpts: "-Xmx4096m" resources: limits: - cpu: 4 - memory: 4Gi + cpu: 1 + memory: 8Gi requests: cpu: 1 - memory: 2Gi + memory: 8Gi nodeSelector: {} tolerations: {} - + #------------------------------------------------------------------------------ # Riak global data storage #------------------------------------------------------------------------------ riak: - replicas: 3 + replicas: 1 # kubespray deployment + #replicas: 3 imagePath: "/microfn/riak" # imageTag: "latest" (default .Chart.AppVersion) imagePullPolicy: "Always" @@ -108,13 +113,13 @@ riak: ClientPortProtobuf: 8087 resources: limits: - cpu: 4 + cpu: 1 memory: 8Gi requests: - cpu: 4 + cpu: 1 memory: 8Gi DataStorage: 8Gi - #DataStorageClass: local-volume + DataStorageClass: manual ClusterName: "riak" WaitForErlang: 180 ErlangDistributionPortRangeMinimum: 6000 @@ -125,8 +130,8 @@ riak: LevelDbMaximumMemory: 4294967296 AntiEntropy: "passive" # Should be <= resources.limits.cpu - ErlangSchedulersTotal: 4 - ErlangSchedulersOnline: 4 + ErlangSchedulersTotal: 2 + ErlangSchedulersOnline: 2 ErlangSchedulersForceWakeupInterval: 500 ErlangSchedulersCompactionOfLoad: "false" nodeSelector: {} @@ -158,6 +163,12 @@ elastic: nginx: Replicas: 1 nameOverride: "microfunctions-nginx" + # Knative Serving creates an FQDN for the management kservice "wf-{{ .Release.Name }}-management". + # In most installations, it would be {ksvc}.{namespace}.example.com + # Please use --set nginx.managementService to specify the FQDN that the service will get from your Knative setup + # _helpers.tpl would default it to wf-mfn1-management.knix.example.com + #managementService: wf-mfn1-management.knix.example.com + managementService: wf-mfn-management.default.192.168.8.161.xip.io imagePath: "/microfn/nginx" # imageTag: "latest" (default .Chart.AppVersion) httpPort: 32180 @@ -180,7 +191,7 @@ nginx: tolerations: {} #------------------------------------------------------------------------------ -# TriggersFrontend +# TriggersFrontend #------------------------------------------------------------------------------ triggersFrontend: imagePath: "/microfn/triggers_frontend" @@ -193,10 +204,10 @@ triggersFrontend: statusReportIntervalSec: 30 resources: limits: - cpu: 4 + cpu: 1 memory: 8Gi requests: - cpu: 2 + cpu: 1 memory: 1Gi nodeSelector: {} tolerations: {} \ No newline at end of file diff --git a/mfn_sdk/mfn_sdk/mfnclient.py b/mfn_sdk/mfn_sdk/mfnclient.py index a78ca230..ab989092 100644 --- a/mfn_sdk/mfn_sdk/mfnclient.py +++ b/mfn_sdk/mfn_sdk/mfnclient.py @@ -346,14 +346,14 @@ def find_function(self,name): return res[0] @deprecated(reason="Grains have been renamed to functions, use add_function(..) instead") - def addGrain(self,name,runtime='Python 3.6'): - return self.add_function(name,runtime) + def addGrain(self,name,runtime='Python 3.6',gpu_usage="0.",gpu_mem_usage="0."): + return self.add_function(name,runtime,gpu_usage,gpu_mem_usage) @deprecated(reason="Grains have been renamed to functions, use add_function(..) instead") - def add_grain(self,name,runtime='Python 3.6'): - return self.add_function(name, runtime) + def add_grain(self,name,runtime='Python 3.6',gpu_usage="0.",gpu_mem_usage="0."): + return self.add_function(name, runtime, gpu_usage,gpu_mem_usage) - def add_function(self,name,runtime='Python 3.6'): + def add_function(self,name,runtime='Python 3.6',gpu_usage="0.",gpu_mem_usage="0."): """ add a function returns an existing function if the name exists, registers a new function name if it doesn't exist @@ -362,7 +362,7 @@ def add_function(self,name,runtime='Python 3.6'): for f in self.functions: if f._name == name: return f - data = self.action('addFunction',{'function':{'name':name,'runtime':runtime}}) + data = self.action('addFunction',{'function':{'name':name,'runtime':runtime, 'gpu_usage': gpu_usage, 'gpu_mem_usage': gpu_mem_usage}}) gd = data['function'] f = Function(self,gd) self._functions.append(f) @@ -465,7 +465,7 @@ def _get_state_names_and_resource(self, desired_state_type, wf_dict): return state_list - def add_workflow(self,name,filename=None): + def add_workflow(self,name,filename=None, gpu_usage=None, gpu_mem_usage=None): """ add a workflow returns an existing workflow if the name exists, registers a new workflow name if it doesn't exist @@ -474,7 +474,7 @@ def add_workflow(self,name,filename=None): for wf in self._workflows: if wf._name == name: return wf - data = self.action('addWorkflow',{'workflow':{'name':name}}) + data = self.action('addWorkflow',{'workflow':{'name':name, "gpu_usage":gpu_usage, "gpu_mem_usage":gpu_mem_usage}}) wfd = data['workflow'] wf = Workflow(self,wfd) self._workflows.append(wf) @@ -521,7 +521,6 @@ def add_workflow(self,name,filename=None): with open(fpyname, 'r') as f: fcode = f.read() f.code = fcode - return wf diff --git a/mfn_sdk/mfn_sdk/workflow.py b/mfn_sdk/mfn_sdk/workflow.py index 45b9acd7..231ad031 100644 --- a/mfn_sdk/mfn_sdk/workflow.py +++ b/mfn_sdk/mfn_sdk/workflow.py @@ -55,6 +55,9 @@ def __init__(self,client,wf): self.client=client self.id=wf["id"] self._name=wf["name"] + self._gpu_usage=None + if "gpu_usage" in wf: + self._gpu_usage=wf["gpu_usage"] self._modified=wf["modified"] self._status=wf.get("status",None) self._endpoints=wf.get("endpoints",None) @@ -68,6 +71,19 @@ def __str__(self): else: return f"{self.id} ({self._name}, status: {self._status})" + @property + def gpu_usage(self): + # TODO: workflow GPU usage could have been updated, decide if we should fetch workflow status + return self._gpu_usage + + """ + @gpu_usage.setter + def gpu_usage(self,gpu_usage): + # TODO: workflow GPU could have been updated, decide if we should fetch workflow status + res = self.client.action('modifyWorkflow',{'workflow':{'id':self.id,'name':name,'gpu_usage':self._gpu_usage}}) + self.gpu_usage = gpu_usage + """ + @property def name(self): # TODO: workflow name could have been updated, decide if we should fetch workflow status @@ -124,15 +140,17 @@ def json(self): def json(self,json): if json != self.json: self._json = json + #print ("uploaded workflow JSON"+ str( json)) self.client.action('uploadWorkflowJSON',{'workflow':{'id':self.id,'json':base64.b64encode(self._json.encode()).decode()}}) - def deploy(self, timeout=None): + def deploy(self, timeout=None): """ deploy a workflow and optionally wait in linearly increasing multiples of 1000ms :timeout: By default returns after calling deploy on the workflow without waiting for it to be actually deployed. If timeout is set to a numeric <= 0, it waits indefinitely in intervals of 1000ms, 2000ms, 3000ms, ... If timeout is set to a numeric > 0, it waits for the workflow to be deployed in increasing multiples of 100ms, but no longer than the timeout. When the timeout expires and the workflow is not deployed, the function raises an Exception """ + s = self.status if s == 'deployed': log.debug("deploy: wf %s already deployed",self.name) @@ -145,6 +163,7 @@ def deploy(self, timeout=None): else: self.client.action('deployWorkflow',{'workflow':{'id':self.id}}) + # if timeout is None, do not wait but return immediately even if it's not yet deployed if timeout is None: return diff --git a/riak/src/workflow_triggers.erl b/riak/src/workflow_triggers.erl old mode 100755 new mode 100644 index acbdf9b8..9895c83b --- a/riak/src/workflow_triggers.erl +++ b/riak/src/workflow_triggers.erl @@ -242,7 +242,7 @@ handle_nometadata() -> generate_trigger_message(Key, Value, Table) -> Message = {[{<<"trigger_type">>, <<"storage">>}, - {<<"key">>, Key}, {<<"value">>, Value}, + {<<"key">>, Key}, {<<"source">>, list_to_binary(Table)}]}, MessageEncoded = jiffy:encode(Message), MessageEncoded. @@ -305,3 +305,4 @@ test() -> io:format((?LOG_PREFIX) ++ " ~p~n", [Message]), io:format((?LOG_PREFIX) ++ " ~p~n", [MessageEncoded]), io:format((?LOG_PREFIX) ++ "Execid ~p~n", [Execid]). + diff --git a/tests/asl_DLIB/python/dlibtest.py b/tests/asl_DLIB/python/dlibtest.py new file mode 100644 index 00000000..97115360 --- /dev/null +++ b/tests/asl_DLIB/python/dlibtest.py @@ -0,0 +1,24 @@ +# Copyright 2020 The KNIX Authors + +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#import json + +def handle(event, context): + import dlib + # Simple hello world using TensorFlow + + #return "Hello from Tensorflow " + str(tf.__version__) + #return "GPU available: " + str(tf.test.is_gpu_available(cuda_only=False, min_cuda_compute_capability=None)) + return "GPU available: " + str(dlib.DLIB_USE_CUDA) + diff --git a/tests/asl_DLIB/settings.json b/tests/asl_DLIB/settings.json new file mode 100644 index 00000000..02108534 --- /dev/null +++ b/tests/asl_DLIB/settings.json @@ -0,0 +1,4 @@ +{ + "workflow_name": "__dlib_", + "workflow_description_file": "workflow_dlib_test.json" +} diff --git a/tests/asl_DLIB/test.py b/tests/asl_DLIB/test.py new file mode 100644 index 00000000..9705075c --- /dev/null +++ b/tests/asl_DLIB/test.py @@ -0,0 +1,39 @@ +# Copyright 2020 The KNIX Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os, sys +import json + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class DlibTest(unittest.TestCase): + + """ Example ASL state test with Dlib + + """ + def test_dlib(self): + """ testing dlib """ + + inp1 = '"abc"' + #res1 = '"Hello from Tensorflow 2.1.0"' + + res1 = '"GPU available: True"' + + testtuplelist =[(inp1, res1)] + + test = MFNTest(test_name = "Dlib_Test", gpu_usage = "1", gpu_mem_usage="6") + test.exec_tests(testtuplelist) + diff --git a/tests/asl_DLIB/workflow_dlib_test.json b/tests/asl_DLIB/workflow_dlib_test.json new file mode 100644 index 00000000..6dadd070 --- /dev/null +++ b/tests/asl_DLIB/workflow_dlib_test.json @@ -0,0 +1,11 @@ +{ + "Comment": "Dlib State Machine Example", + "StartAt": "dlibtest", + "States": { + "dlibtest":{ + "Resource":"dlibtest", + "Type":"Task", + "End":true + } + } +} diff --git a/tests/asl_Face_Recognition/requirements/facer_requirements.txt b/tests/asl_Face_Recognition/requirements/facer_requirements.txt new file mode 100644 index 00000000..23d75aa7 --- /dev/null +++ b/tests/asl_Face_Recognition/requirements/facer_requirements.txt @@ -0,0 +1 @@ +face_recognition diff --git a/tests/asl_Face_Recognition/settings.json b/tests/asl_Face_Recognition/settings.json new file mode 100644 index 00000000..5934407d --- /dev/null +++ b/tests/asl_Face_Recognition/settings.json @@ -0,0 +1,4 @@ +{ + "workflow_name": "__face_recognition_", + "workflow_description_file": "workflow_face_recognition_test.json" +} diff --git a/tests/asl_Face_Recognition/test.py b/tests/asl_Face_Recognition/test.py new file mode 100644 index 00000000..414c2a1d --- /dev/null +++ b/tests/asl_Face_Recognition/test.py @@ -0,0 +1,40 @@ +# Copyright 2020 The KNIX Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os, sys +import json +#import time + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class FaceRecognitionTest(unittest.TestCase): + + """ Example ASL state test with face_recognition + + """ + def test_tensorflow(self): + """ testing face_recognition """ + + inp1 = '"abc"' + + #res1 = '"1.2.3"' # version number + res1 = '"[(68, 497, 175, 390)]"' + testtuplelist =[(inp1, res1)] + + test = MFNTest(test_name = "FaceRecognition__Test", gpu_usage="50", gpu_mem_usage="10") + #time.sleep(10) # wait for deployment + test.exec_tests(testtuplelist) + diff --git a/tests/asl_Face_Recognition/workflow_face_recognition_test.json b/tests/asl_Face_Recognition/workflow_face_recognition_test.json new file mode 100644 index 00000000..88d3318b --- /dev/null +++ b/tests/asl_Face_Recognition/workflow_face_recognition_test.json @@ -0,0 +1,11 @@ +{ + "Comment": "Tensorflow State Machine Example", + "StartAt": "tensorf", + "States": { + "tensorf":{ + "Resource":"facer", + "Type":"Task", + "End":true + } + } +} diff --git a/tests/asl_Face_Recognition/zips/facer.zip b/tests/asl_Face_Recognition/zips/facer.zip new file mode 100644 index 00000000..d7e98255 Binary files /dev/null and b/tests/asl_Face_Recognition/zips/facer.zip differ diff --git a/tests/asl_Map/test.py b/tests/asl_Map/test.py index 84e982f6..9d9bdffd 100644 --- a/tests/asl_Map/test.py +++ b/tests/asl_Map/test.py @@ -25,18 +25,19 @@ class MapStateTest(unittest.TestCase): """ event = '[{"who": "bob"},{"who": "meg"},{"who": "joe"}]' expectedResponse = '[{"ContextValue": {"who": "bob"}, "ContextIndex": 0},{"ContextValue": {"who": "meg"}, "ContextIndex": 1}, {"ContextValue": {"who": "joe"}, "ContextIndex": 2 }]' - test_map = [("asl_Map_State_Context_Data", "workflow_map_state_context_test/workflow_map_state_context_test.json", [(event, expectedResponse)])] + test_map = [("asl_Map_State_Context_Data", "wfms_context_test/wfms_context_test.json", [(event, expectedResponse)])] """ def test_map_state(self): - - file_list = ["workflow_map_state_delivery_test.data", - "workflow_map_state_context_test.data", - "workflow_map_state_example_test.data", - "workflow_map_state_parameters_test.data", - "workflow_map_state_thingspiratessay_test.data", - "workflow_map_state_iro_paths_processing_test.data", - "workflow_map_state_hardcoded_test.data"] + + file_list = ["wfms_delivery_test.data", + "wfms_context_test.data", + "wfms_example_test.data", + "wfms_parameters_test.data", + "wfms_thingspiratessay_test.data", + "wfms_iro_paths_processing_test.data", + "wfms_hardcoded_test.data" + ] for file in file_list: with open(file) as json_input: @@ -49,7 +50,8 @@ def test_map_state(self): et = time.time() print ("test duration (s): %s" % str(et-st)) - for mc in [0,2,3]: # set maxConcurrency parameter + + for mc in range(1,4): # set maxConcurrency parameter """ creates and executes the Map state test workflow from the ASL description """ testtuplelist = [] @@ -66,16 +68,18 @@ def test_map_state(self): expectedResponse = ["Hello, joe!", "Hello, bob!", "Hello, meg!"] testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + event = [{"who": "joe"}, {"who": "bob"}, {"who": "meg"}, {"who":"dave"}, {"who":"tom"}, {"who":"ray"}] expectedResponse = ["Hello, joe!", "Hello, bob!", "Hello, meg!", "Hello, dave!", "Hello, tom!", "Hello, ray!"] testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + - test = MFNTest(test_name="Map State Test", workflow_filename=("workflow_map_state_test_mc%s.json" % mc)) + test = MFNTest(test_name="Map State Test", workflow_filename=("wfms_test_mc%s.json" % mc)) print("MaxConcurrency level: %i " % mc) st = time.time() - test.exec_tests(testtuplelist) + test.exec_tests(testtuplelist, should_undeploy=False) et = time.time() print ("test duration (s): %s" % str(et-st)) diff --git a/tests/asl_Map/workflow_map_state_context_test.data b/tests/asl_Map/wfms_context_test.data similarity index 76% rename from tests/asl_Map/workflow_map_state_context_test.data rename to tests/asl_Map/wfms_context_test.data index 7efbb4e7..1c4c1116 100644 --- a/tests/asl_Map/workflow_map_state_context_test.data +++ b/tests/asl_Map/wfms_context_test.data @@ -2,7 +2,7 @@ { "test_name": "Map State Context Test", -"workflow_name": "workflow_map_state_context_test/workflow_map_state_context_test.json", +"workflow_name": "wfms_context_test/wfms_context.json", "event": [{"who": "bob"},{"who": "meg"},{"who": "joe"}], diff --git a/tests/asl_Map/workflow_map_state_context_test/workflow_map_state_context_test.json b/tests/asl_Map/wfms_context_test/wfms_context.json similarity index 100% rename from tests/asl_Map/workflow_map_state_context_test/workflow_map_state_context_test.json rename to tests/asl_Map/wfms_context_test/wfms_context.json diff --git a/tests/asl_Map/workflow_map_state_delivery_test.data b/tests/asl_Map/wfms_delivery_test.data similarity index 94% rename from tests/asl_Map/workflow_map_state_delivery_test.data rename to tests/asl_Map/wfms_delivery_test.data index 7f8e7c9a..6cb77f4a 100644 --- a/tests/asl_Map/workflow_map_state_delivery_test.data +++ b/tests/asl_Map/wfms_delivery_test.data @@ -1,7 +1,7 @@ {"test_name": "Map State Delivery Test", -"workflow_name": "workflow_map_state_delivery_test/workflow_map_state_delivery_test.json", +"workflow_name": "wfms_delivery_test/wfms_delivery_test.json", "event": {"orderId": "12345678", diff --git a/tests/asl_Map/workflow_map_state_delivery_test/checkAvailability.py b/tests/asl_Map/wfms_delivery_test/checkAvailability.py similarity index 100% rename from tests/asl_Map/workflow_map_state_delivery_test/checkAvailability.py rename to tests/asl_Map/wfms_delivery_test/checkAvailability.py diff --git a/tests/asl_Map/workflow_map_state_delivery_test/workflow_map_state_delivery_test.json b/tests/asl_Map/wfms_delivery_test/wfms_delivery_test.json similarity index 100% rename from tests/asl_Map/workflow_map_state_delivery_test/workflow_map_state_delivery_test.json rename to tests/asl_Map/wfms_delivery_test/wfms_delivery_test.json diff --git a/tests/asl_Map/workflow_map_state_example_test.data b/tests/asl_Map/wfms_example_test.data similarity index 90% rename from tests/asl_Map/workflow_map_state_example_test.data rename to tests/asl_Map/wfms_example_test.data index 7253d596..0a94747d 100644 --- a/tests/asl_Map/workflow_map_state_example_test.data +++ b/tests/asl_Map/wfms_example_test.data @@ -2,7 +2,7 @@ { "test_name": "Map State Example Test", -"workflow_name": "workflow_map_state_example_test/workflow_map_state_example_test.json", +"workflow_name": "wfms_example_test/wfms_example_test.json", "event": {"ship-date": "2016-03-14T01:59:00Z", "detail": diff --git a/tests/asl_Map/workflow_map_state_example_test/ship-val.py b/tests/asl_Map/wfms_example_test/ship-val.py similarity index 100% rename from tests/asl_Map/workflow_map_state_example_test/ship-val.py rename to tests/asl_Map/wfms_example_test/ship-val.py diff --git a/tests/asl_Map/workflow_map_state_example_test/workflow_map_state_example_test.json b/tests/asl_Map/wfms_example_test/wfms_example_test.json similarity index 100% rename from tests/asl_Map/workflow_map_state_example_test/workflow_map_state_example_test.json rename to tests/asl_Map/wfms_example_test/wfms_example_test.json diff --git a/tests/asl_Map/workflow_map_state_hardcoded_test.data b/tests/asl_Map/wfms_hardcoded_test.data similarity index 72% rename from tests/asl_Map/workflow_map_state_hardcoded_test.data rename to tests/asl_Map/wfms_hardcoded_test.data index d3e24b18..5f8bb520 100644 --- a/tests/asl_Map/workflow_map_state_hardcoded_test.data +++ b/tests/asl_Map/wfms_hardcoded_test.data @@ -1,7 +1,6 @@ - { "test_name": "Map State Hardcoded Input Test", -"workflow_name": "workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json", +"workflow_name": "wfms_hardcoded_test/wfms_hardcoded_test.json", "event" : "trigger map state input", "expectedResponse" : [{"Value": "Iterate", "Index": 0}, {"Value": "Over", "Index": 1}, {"Value": "This", "Index": 2}, {"Value": "Array", "Index": 3}] } diff --git a/tests/asl_Map/workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json b/tests/asl_Map/wfms_hardcoded_test/wfms_hardcoded_test.json similarity index 100% rename from tests/asl_Map/workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json rename to tests/asl_Map/wfms_hardcoded_test/wfms_hardcoded_test.json diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test.data b/tests/asl_Map/wfms_iro_paths_processing_test.data similarity index 94% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test.data rename to tests/asl_Map/wfms_iro_paths_processing_test.data index 456045bf..3894a118 100644 --- a/tests/asl_Map/workflow_map_state_iro_paths_processing_test.data +++ b/tests/asl_Map/wfms_iro_paths_processing_test.data @@ -1,7 +1,7 @@ { "test_name": "Map State IRO Paths Processing Test", -"workflow_name": "workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json", +"workflow_name": "wfms_iro_paths_test/wfms_iro_paths_processing_test.json", "event": { "orderId": "12345678", diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/checkAvailability.py b/tests/asl_Map/wfms_iro_paths_test/checkAvailability.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/checkAvailability.py rename to tests/asl_Map/wfms_iro_paths_test/checkAvailability.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/paymentFailed.py b/tests/asl_Map/wfms_iro_paths_test/paymentFailed.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/paymentFailed.py rename to tests/asl_Map/wfms_iro_paths_test/paymentFailed.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/prepareForDelivery.py b/tests/asl_Map/wfms_iro_paths_test/prepareForDelivery.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/prepareForDelivery.py rename to tests/asl_Map/wfms_iro_paths_test/prepareForDelivery.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/sendOrderSummary.py b/tests/asl_Map/wfms_iro_paths_test/sendOrderSummary.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/sendOrderSummary.py rename to tests/asl_Map/wfms_iro_paths_test/sendOrderSummary.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/startDelivery.py b/tests/asl_Map/wfms_iro_paths_test/startDelivery.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/startDelivery.py rename to tests/asl_Map/wfms_iro_paths_test/startDelivery.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/test.py b/tests/asl_Map/wfms_iro_paths_test/test.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/test.py rename to tests/asl_Map/wfms_iro_paths_test/test.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/validatePayment.py b/tests/asl_Map/wfms_iro_paths_test/validatePayment.py similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/validatePayment.py rename to tests/asl_Map/wfms_iro_paths_test/validatePayment.py diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json b/tests/asl_Map/wfms_iro_paths_test/wfms_iro_paths_processing_test.json similarity index 100% rename from tests/asl_Map/workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json rename to tests/asl_Map/wfms_iro_paths_test/wfms_iro_paths_processing_test.json diff --git a/tests/asl_Map/workflow_map_state_parameters_test.data b/tests/asl_Map/wfms_parameters_test.data similarity index 87% rename from tests/asl_Map/workflow_map_state_parameters_test.data rename to tests/asl_Map/wfms_parameters_test.data index e26d5f41..4dbed3e9 100644 --- a/tests/asl_Map/workflow_map_state_parameters_test.data +++ b/tests/asl_Map/wfms_parameters_test.data @@ -2,7 +2,7 @@ { "test_name": "Map State Parameters Test", -"workflow_name": "workflow_map_state_parameters_test/workflow_map_state_parameters_test.json", +"workflow_name": "wfms_parameters_test/wfms_parameters_test.json", "event": [{"who": "bob"},{"who": "meg"},{"who": "joe"}], diff --git a/tests/asl_Map/workflow_map_state_parameters_test/workflow_map_state_parameters_test.json b/tests/asl_Map/wfms_parameters_test/wfms_parameters_test.json similarity index 100% rename from tests/asl_Map/workflow_map_state_parameters_test/workflow_map_state_parameters_test.json rename to tests/asl_Map/wfms_parameters_test/wfms_parameters_test.json diff --git a/tests/asl_Map/workflow_map_state_test_mc0.json b/tests/asl_Map/wfms_test_mc0.json similarity index 100% rename from tests/asl_Map/workflow_map_state_test_mc0.json rename to tests/asl_Map/wfms_test_mc0.json diff --git a/tests/asl_Map/workflow_map_state_test_mc1.json b/tests/asl_Map/wfms_test_mc1.json similarity index 100% rename from tests/asl_Map/workflow_map_state_test_mc1.json rename to tests/asl_Map/wfms_test_mc1.json diff --git a/tests/asl_Map/workflow_map_state_test_mc2.json b/tests/asl_Map/wfms_test_mc2.json similarity index 100% rename from tests/asl_Map/workflow_map_state_test_mc2.json rename to tests/asl_Map/wfms_test_mc2.json diff --git a/tests/asl_Map/workflow_map_state_test_mc3.json b/tests/asl_Map/wfms_test_mc3.json similarity index 100% rename from tests/asl_Map/workflow_map_state_test_mc3.json rename to tests/asl_Map/wfms_test_mc3.json diff --git a/tests/asl_Map/workflow_map_state_thingspiratessay_test.data b/tests/asl_Map/wfms_thingspiratessay_test.data similarity index 82% rename from tests/asl_Map/workflow_map_state_thingspiratessay_test.data rename to tests/asl_Map/wfms_thingspiratessay_test.data index e40ae570..9af524e0 100644 --- a/tests/asl_Map/workflow_map_state_thingspiratessay_test.data +++ b/tests/asl_Map/wfms_thingspiratessay_test.data @@ -1,7 +1,7 @@ { "test_name": "Map State Pirates Say Test", -"workflow_name": "workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json", +"workflow_name": "wfms_thingspiratessay_test/wfms_thingspiratessay_test.json", "event" : {"ThingsPiratesSay": [{ "say": "Avast!"},{ diff --git a/tests/asl_Map/workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json b/tests/asl_Map/wfms_thingspiratessay_test/wfms_thingspiratessay_test.json similarity index 100% rename from tests/asl_Map/workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json rename to tests/asl_Map/wfms_thingspiratessay_test/wfms_thingspiratessay_test.json diff --git a/tests/asl_Tensorflow_HelloWorld/python/tensorf.py b/tests/asl_Tensorflow_HelloWorld/python/tensorf.py new file mode 100644 index 00000000..731e510b --- /dev/null +++ b/tests/asl_Tensorflow_HelloWorld/python/tensorf.py @@ -0,0 +1,29 @@ +# Copyright 2020 The KNIX Authors + +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tensorflow as tf + +def handle(event, context): + # Simple hello world using TensorFlow + + x = [[2.]] + hello = tf.constant('Hello, TensorFlow!') + print('tensorflow version', tf.__version__) + print('hello, {}'.format(tf.matmul(x, x))) + + #return "Hello from Tensorflow " + str(tf.__version__) + #return "GPU available: " + str(tf.test.is_gpu_available(cuda_only=False, min_cuda_compute_capability=None)) + return "GPU available: " + str(tf.test.is_built_with_cuda()) + diff --git a/tests/asl_Tensorflow_HelloWorld/requirements/tensorf_requirements.txt b/tests/asl_Tensorflow_HelloWorld/requirements/tensorf_requirements.txt new file mode 100644 index 00000000..0f571440 --- /dev/null +++ b/tests/asl_Tensorflow_HelloWorld/requirements/tensorf_requirements.txt @@ -0,0 +1 @@ +tensorflow diff --git a/tests/asl_Tensorflow_HelloWorld/settings.json b/tests/asl_Tensorflow_HelloWorld/settings.json new file mode 100644 index 00000000..e6560eec --- /dev/null +++ b/tests/asl_Tensorflow_HelloWorld/settings.json @@ -0,0 +1,4 @@ +{ + "workflow_name": "__tensorflow_", + "workflow_description_file": "workflow_tensorflow_test.json" +} diff --git a/tests/asl_Tensorflow_HelloWorld/test.py b/tests/asl_Tensorflow_HelloWorld/test.py new file mode 100644 index 00000000..eeb730eb --- /dev/null +++ b/tests/asl_Tensorflow_HelloWorld/test.py @@ -0,0 +1,41 @@ +# Copyright 2020 The KNIX Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os, sys +import json +#import time + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class TensorFlowTest(unittest.TestCase): + + """ Example ASL state test with Tensorflow + + """ + def test_tensorflow(self): + """ testing tensorflow """ + + inp1 = '"abc"' + #res1 = '"Hello from Tensorflow 2.1.0"' + + res1 = '"GPU available: True"' + + testtuplelist =[(inp1, res1)] + test = MFNTest(test_name = "Tensorflow__Test", gpu_usage = "50", gpu_mem_usage="10") + + #time.sleep(10) # wait for deployment + test.exec_tests(testtuplelist) + diff --git a/tests/asl_Tensorflow_HelloWorld/workflow_tensorflow_test.json b/tests/asl_Tensorflow_HelloWorld/workflow_tensorflow_test.json new file mode 100644 index 00000000..bfbf0bc9 --- /dev/null +++ b/tests/asl_Tensorflow_HelloWorld/workflow_tensorflow_test.json @@ -0,0 +1,11 @@ +{ + "Comment": "Tensorflow State Machine Example", + "StartAt": "tensorf", + "States": { + "tensorf":{ + "Resource":"tensorf", + "Type":"Task", + "End":true + } + } +} diff --git a/tests/mfn_test_utils.py b/tests/mfn_test_utils.py index ddc85178..e2655cb0 100644 --- a/tests/mfn_test_utils.py +++ b/tests/mfn_test_utils.py @@ -43,7 +43,7 @@ class MfnAppTextFormat(): mfntestfailed = MfnAppTextFormat.STYLE_BOLD + MfnAppTextFormat.COLOR_RED + 'FAILED' + MfnAppTextFormat.END + MfnAppTextFormat.END class MFNTest(): - def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_user=False, delete_user=False): + def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_user=False, delete_user=False, gpu_usage=None, gpu_mem_usage=None): self._settings = self._get_settings() @@ -67,7 +67,6 @@ def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_use self._workflow_folder = self._workflow_filename[:ind+1] else: self._workflow_folder = "./" - #print("Workflow folder: " + self._workflow_folder) self._workflow_description = self._get_json_file(self._workflow_filename) @@ -84,6 +83,14 @@ def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_use if timeout is not None: self._settings["timeout"] = timeout + if gpu_usage is not None: + self._settings["gpu_usage"] = gpu_usage + + if gpu_mem_usage is not None: + self._settings["gpu_mem_usage"] = gpu_mem_usage + + self._log_clear_timestamp = int(time.time() * 1000.0 * 1000.0) + # will be the deployed workflow object in self._client self._workflow = None self._deployment_error = "" @@ -92,7 +99,7 @@ def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_use self.upload_workflow() self.deploy_workflow() - time.sleep(5) + time.sleep(15) def _get_json_file(self, filename): json_data = {} @@ -114,6 +121,9 @@ def _get_settings(self): # Defaults settings.setdefault("timeout", 60) + settings.setdefault("gpu_usage", "None") + + settings.setdefault("gpu_mem_usage", "None") return settings @@ -189,6 +199,9 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No resource_info["resource_req_filename"] = "requirements/" + resource_ref + "_requirements.txt" resource_info["resource_env_filename"] = "environment_variables/" + resource_ref + "_environment_variables.txt" resource_info_map[resource_ref] = resource_info + #resource_info_map[resource_ref]['num_gpu'] = self._settings['num_gpu'] + #resource_info_map['num_gpu'] = self._settings['num_gpu'] + #print("resource_info: " + json.dumps(resource_info)) elif "States" in workflow_description: states = workflow_description["States"] @@ -202,6 +215,9 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No resource_info["resource_req_filename"] = "requirements/" + resource_name + "_requirements.txt" resource_info["resource_env_filename"] = "environment_variables/" + resource_name + "_environment_variables.txt" resource_info_map[resource_name] = resource_info + #resource_info_map[resource_name]['num_gpu'] = self._settings['num_gpu'] + #resource_info_map['num_gpu'] = self._settings['num_gpu'] + #print("resource_info: " + json.dumps(resource_info)) if "Type" in state and state["Type"] == "Parallel": branches = state['Branches'] @@ -210,14 +226,12 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No if "Type" in state and state["Type"] == "Map": branch = state['Iterator'] - #print(str(branch)) resource_info_map = self._get_resource_info_map(branch, resource_info_map) - #print(str(resource_info_map)) else: print("ERROR: invalid workflow description.") assert False - + #print("RESOURCE_INFO_MAP: " + json.dumps(resource_info_map)) return resource_info_map def _delete_resource_if_existing(self, existing_resources, resource_name): @@ -281,6 +295,7 @@ def upload_workflow(self): existing_resources = self._client.functions for resource_name in resource_info_map.keys(): + #if not resource_name == 'num_gpu': self._delete_resource_if_existing(existing_resources, resource_name) resource_info = resource_info_map[resource_name] @@ -292,9 +307,11 @@ def get_deployment_error(self): def deploy_workflow(self): try: - wf = self._client.add_workflow(self._workflow_name) + gpu_usage=self._settings["gpu_usage"] + gpu_mem_usage=self._settings["gpu_mem_usage"] + wf = self._client.add_workflow(self._workflow_name, None, gpu_usage, gpu_mem_usage) wf.json = json.dumps(self._workflow_description) - wf.deploy(self._settings["timeout"]) + wf.deploy(self._settings["timeout"]) self._workflow = wf if self._workflow.status != "failed": print("MFN workflow " + self._workflow_name + " deployed; workflow id: " + self._workflow.id + " endpoints: " + str(self._workflow._endpoints))