Skip to content

Commit

Permalink
Add support for schedulers which can receive task state change updates (
Browse files Browse the repository at this point in the history
#290)

* Change scheduler API to include task removal and add tests

* Check if memorizing schduler works with the whole system

* Spotless apply

* Expand function name and improve documentation
  • Loading branch information
sacheendra authored Jan 16, 2025
1 parent 39aeb8e commit 1fc2017
Show file tree
Hide file tree
Showing 14 changed files with 1,058 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -49,6 +50,9 @@
import org.opendc.compute.simulator.host.HostState;
import org.opendc.compute.simulator.host.SimHost;
import org.opendc.compute.simulator.scheduler.ComputeScheduler;
import org.opendc.compute.simulator.scheduler.SchedulingRequest;
import org.opendc.compute.simulator.scheduler.SchedulingResult;
import org.opendc.compute.simulator.scheduler.SchedulingResultType;
import org.opendc.compute.simulator.telemetry.ComputeMetricReader;
import org.opendc.compute.simulator.telemetry.SchedulerStats;
import org.opendc.simulator.compute.power.SimPowerSource;
Expand Down Expand Up @@ -204,6 +208,7 @@ public void onStateChanged(@NotNull SimHost host, @NotNull ServiceTask task, @No
}

if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) {
scheduler.removeTask(task, hv);
setTaskToBeRemoved(task);
}

Expand Down Expand Up @@ -430,38 +435,37 @@ private void requestSchedulingCycle() {
private void doSchedule() {
// reorder tasks

while (!taskQueue.isEmpty()) {
SchedulingRequest request = taskQueue.peek();

if (request.isCancelled) {
taskQueue.poll();
tasksPending--;
continue;
for (Iterator<SchedulingRequest> iterator = taskQueue.iterator();
iterator.hasNext();
iterator = taskQueue.iterator()) {
final SchedulingResult result = scheduler.select(iterator);
if (result.getResultType() == SchedulingResultType.EMPTY) {
break;
}

final ServiceTask task = request.task;
final HostView hv = result.getHost();
final SchedulingRequest req = result.getReq();
final ServiceTask task = req.getTask();
final ServiceFlavor flavor = task.getFlavor();

if (task.getNumFailures() >= maxNumFailures) {
LOGGER.warn("task {} has been terminated because it failed {} times", task, task.getNumFailures());

taskQueue.poll();
taskQueue.remove(req);
tasksPending--;
tasksTerminated++;
task.setState(TaskState.TERMINATED);

scheduler.removeTask(task, hv);
this.setTaskToBeRemoved(task);
continue;
}

final ServiceFlavor flavor = task.getFlavor();
final HostView hv = scheduler.select(request.task);

if (hv == null || !hv.getHost().canFit(task)) {
if (result.getResultType() == SchedulingResultType.FAILURE) {
LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task);

if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) {
// Remove the incoming image
taskQueue.poll();
taskQueue.remove(req);
tasksPending--;
tasksTerminated++;

Expand All @@ -472,14 +476,15 @@ private void doSchedule() {
this.setTaskToBeRemoved(task);
continue;
} else {
// VM fits, but we don't have enough capacity
break;
}
}

SimHost host = hv.getHost();

// Remove request from queue
taskQueue.poll();
taskQueue.remove(req);
tasksPending--;

LOGGER.info("Assigned task {} to host {}", task, host);
Expand All @@ -488,7 +493,6 @@ private void doSchedule() {
task.host = host;

host.spawn(task);
// host.start(task);

tasksActive++;
attemptsSuccess++;
Expand All @@ -500,6 +504,7 @@ private void doSchedule() {
activeTasks.put(task, host);
} catch (Exception cause) {
LOGGER.error("Failed to deploy VM", cause);
scheduler.removeTask(task, hv);
attemptsFailure++;
}
}
Expand Down Expand Up @@ -679,19 +684,4 @@ public void rescheduleTask(@NotNull ServiceTask task, @NotNull Workload workload
internalTask.start();
}
}

/**
* A request to schedule a {@link ServiceTask} onto one of the {@link SimHost}s.
*/
static class SchedulingRequest {
final ServiceTask task;
final long submitTime;

boolean isCancelled;

SchedulingRequest(ServiceTask task, long submitTime) {
this.task = task;
this.submitTime = submitTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ public class HostView {
long availableMemory;
int provisionedCores;

/**
* Scheduler bookkeeping
* Use by schedulers which use a priority queue data structure
* to keep track of the order of hosts to scheduler tasks on.
* {@link MemorizingScheduler} for example.
* MemorizingScheduler has an array of lists
* The 0th index of the array has a list of hosts with 0 tasks,
* 1st index of the array has hosts with 1 task, and so on.
* The priorityIndex points to the index of this the list this host
* belongs to in the array.
* The listIndex is the position of this host in the list.
*/
public int priorityIndex;

public int listIndex;

/**
* Construct a {@link HostView} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opendc.compute.api.TaskState;
import org.opendc.compute.simulator.TaskWatcher;
import org.opendc.compute.simulator.host.SimHost;
import org.opendc.compute.simulator.scheduler.SchedulingRequest;
import org.opendc.simulator.compute.workload.Workload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,7 +60,7 @@ public class ServiceTask {
Instant createdAt;
Instant finishedAt;
SimHost host = null;
private ComputeService.SchedulingRequest request = null;
private SchedulingRequest request = null;

private int numFailures = 0;

Expand Down Expand Up @@ -221,10 +222,10 @@ void setState(TaskState newState) {
* Cancel the provisioning request if active.
*/
private void cancelProvisioningRequest() {
final ComputeService.SchedulingRequest request = this.request;
final SchedulingRequest request = this.request;
if (request != null) {
this.request = null;
request.isCancelled = true;
request.setCancelled(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,43 @@ public interface ComputeScheduler {
public fun removeHost(host: HostView)

/**
* Select a host for the specified [task].
* Select a host for the specified [iter].
* We implicity assume that the task has been scheduled onto the host.
*
* @param task The server to select a host for.
* @param iter The server to select a host for.
* @return The host to schedule the server on or `null` if no server is available.
*/
public fun select(task: ServiceTask): HostView?
public fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult

/**
* Inform the scheduler that a [task] has been removed from the [host].
* Could be due to completion or failure.
*/
public fun removeTask(
task: ServiceTask,
host: HostView?,
)
}

/**
* A request to schedule a [ServiceTask] onto one of the [SimHost]s.
*/
public data class SchedulingRequest internal constructor(
public val task: ServiceTask,
public val submitTime: Long,
) {
public var isCancelled: Boolean = false
public var timesSkipped: Int = 0
}

public enum class SchedulingResultType {
SUCCESS,
FAILURE,
EMPTY,
}

public data class SchedulingResult(
val resultType: SchedulingResultType,
val host: HostView? = null,
val req: SchedulingRequest? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@ public enum class ComputeSchedulerEnum {
ProvisionedCores,
ProvisionedCoresInv,
Random,
Replay,
}

public fun createComputeScheduler(
name: String,
seeder: RandomGenerator,
placements: Map<String, String> = emptyMap(),
): ComputeScheduler {
return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder, placements)
return createComputeScheduler(ComputeSchedulerEnum.valueOf(name.uppercase()), seeder)
}

/**
Expand All @@ -61,7 +59,6 @@ public fun createComputeScheduler(
public fun createComputeScheduler(
name: ComputeSchedulerEnum,
seeder: RandomGenerator,
placements: Map<String, String> = emptyMap(),
): ComputeScheduler {
val cpuAllocationRatio = 1.0
val ramAllocationRatio = 1.5
Expand Down Expand Up @@ -113,6 +110,5 @@ public fun createComputeScheduler(
subsetSize = Int.MAX_VALUE,
random = SplittableRandom(seeder.nextLong()),
)
ComputeSchedulerEnum.Replay -> ReplayScheduler(placements)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,20 @@ public class FilterScheduler(
hosts.remove(host)
}

override fun select(task: ServiceTask): HostView? {
override fun select(iter: MutableIterator<SchedulingRequest>): SchedulingResult {
var req = iter.next()

while (req.isCancelled) {
iter.remove()
if (iter.hasNext()) {
req = iter.next()
} else {
// No tasks in queue
return SchedulingResult(SchedulingResultType.EMPTY)
}
}

val task = req.task
val hosts = hosts
val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } }

Expand Down Expand Up @@ -102,10 +115,18 @@ public class FilterScheduler(
}

// fixme: currently finding no matching hosts can result in an error
return when (val maxSize = min(subsetSize, subset.size)) {
0 -> null
1 -> subset[0]
else -> subset[random.nextInt(maxSize)]
val maxSize = min(subsetSize, subset.size)
if (maxSize == 0) {
return SchedulingResult(SchedulingResultType.FAILURE, null, req)
} else {
iter.remove()
return SchedulingResult(SchedulingResultType.SUCCESS, subset[random.nextInt(maxSize)], req)
}
}

override fun removeTask(
task: ServiceTask,
host: HostView?,
) {
}
}
Loading

0 comments on commit 1fc2017

Please sign in to comment.