Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed update for metascheduling #22

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions closest_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,34 +76,39 @@ def get_sorted_destinations(job_requirements, destinations: list, objectstores,
cpu_required = job_requirements.cores
memory_required = job_requirements.mem

# Calculate the distance to the input data location for all destinations
for dest in destinations:
dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes)

# Filter out destinations that can't meet basic requirements based on the "real-time" data
viable_destinations = [
# Filter out destinations that are offline
online_destinations = [
dest for dest in destinations if dest['dest_status'] == 'online'
and dest['dest_unconsumed_cpu'] > cpu_required
and dest['dest_unconsumed_mem'] > memory_required
]

# Fallback case if no viable destinations are found (e.g. no destination has enough resources)
if not viable_destinations:
sorted_destinations = sorted(destinations, key=lambda x: x['distance_to_data'])
return [dest['destination_id'] for dest in sorted_destinations]

# Sort by distance to input data location (ascending)
viable_destinations.sort(key=lambda x: x['distance_to_data'])

print("viable dest: ", viable_destinations)
if not online_destinations:
raise ValueError("All destinations are offline, job can't be scheduled!")

# Calculate matching scores for each viable destination
for dest in viable_destinations:
# For each destination that is online, compute:
# 1. the distance to the input data location
# 2. the matching score
for dest in online_destinations:
dest['distance_to_data'] = closest_destination(dest, objectstores, dataset_attributes)
dest['matching_score'] = calculate_matching_score(dest)

# Sort by matching score (descending)
viable_destinations.sort(key=lambda x: x['matching_score'], reverse=True)

sorted_destinations = [dest['destination_id'] for dest in viable_destinations]
print("sorted dest: ", sorted_destinations)
return sorted_destinations
# first separate out preferred destinations (those with enough cpu and ram)
# from fallback destinations (the rest)
preferred_destinations = []
fallback_destinations = []
for dest in online_destinations:
if dest['dest_unconsumed_cpu'] > cpu_required and dest['dest_unconsumed_mem'] > memory_required:
preferred_destinations.append(dest)
else:
fallback_destinations.append(dest)

# sort destinations by
# - matching score (higher is better?)
# - distance to the data (less is better)
if preferred_destinations:
# sort and return
return preferred_list.sort(key=lambda x: (x['matching_score'],x['distance_to_data']))
elif fallback_destinations:
# sort and return
return fallback_destinations.sort(key=lambda x: (x['matching_score'],x['distance_to_data']))
else:
raise ValueError("Not available destinations, job can't be scheduled!")