diff --git a/closest_location.py b/closest_location.py index 515d56e..e4f88f7 100644 --- a/closest_location.py +++ b/closest_location.py @@ -76,34 +76,39 @@ def get_sorted_destinations(job_requirements, destinations: list, objectstores, cpu_required = job_requirements.cores memory_required = job_requirements.mem - # Calculate the distance to the input data location for all destinations - for dest in destinations: - dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes) - - # Filter out destinations that can't meet basic requirements based on the "real-time" data - viable_destinations = [ + # Filter out destinations that are offline + online_destinations = [ dest for dest in destinations if dest['dest_status'] == 'online' - and dest['dest_unconsumed_cpu'] > cpu_required - and dest['dest_unconsumed_mem'] > memory_required ] - # Fallback case if no viable destinations are found (e.g. no destination has enough resources) - if not viable_destinations: - sorted_destinations = sorted(destinations, key=lambda x: x['distance_to_data']) - return [dest['destination_id'] for dest in sorted_destinations] - - # Sort by distance to input data location (ascending) - viable_destinations.sort(key=lambda x: x['distance_to_data']) - - print("viable dest: ", viable_destinations) + if not online_destinations: + raise ValueError("All destinations are offline, job can't be scheduled!") - # Calculate matching scores for each viable destination - for dest in viable_destinations: + # For each destination that is online, compute: + # 1. the distance to the input data location + # 2. the matching score + for dest in online_destinations: + dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes) dest['matching_score'] = calculate_matching_score(dest) - # Sort by matching score (descending) - viable_destinations.sort(key=lambda x: x['matching_score'], reverse=True) - - sorted_destinations = [dest['destination_id'] for dest in viable_destinations] - print("sorted dest: ", sorted_destinations) - return sorted_destinations + # first separate out preferred destinations (those with enough cpu and ram) + # from fallback destinations (the rest) + preferred_destinations = [] + fallback_destinations = [] + for dest in online_destinations: + if dest['dest_unconsumed_cpu'] > cpu_required and dest['dest_unconsumed_mem'] > memory_required: + preferred_destinations.append(dest) + else: + fallback_destinations.append(dest) + + # sort destinations by + # - matching score (higher is better?) + # - distance to the data (less is better) + if preferred_destinations: + # sort and return + return preferred_list.sort(key=lambda x: (x['matching_score'],x['distance_to_data'])) + elif fallback_destinations: + # sort and return + return fallback_destinations.sort(key=lambda x: (x['matching_score'],x['distance_to_data'])) + else: + raise ValueError("Not available destinations, job can't be scheduled!")