Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject time losses mid-run, take 2 #1571

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def strip_run(run):
run = copy.deepcopy(run)
if "tasks" in run:
run["tasks"] = []
if "bad_tasks" in run:
run["bad_tasks"] = []
if "purged_tasks" in run:
run["purged_tasks"] = []
if "spsa" in run["args"] and "param_history" in run["args"]["spsa"]:
run["args"]["spsa"]["param_history"] = []
run["_id"] = str(run["_id"])
Expand Down Expand Up @@ -338,9 +338,9 @@ def get_task(self):
try:
run = self.request.rundb.get_run(self.request.matchdict["id"])
task_id = self.request.matchdict["task_id"]
if task_id.endswith("bad"):
if task_id.endswith("bad"): # Where in the code is this made so?
task_id = int(task_id[:-3])
task = copy.deepcopy(run["bad_tasks"][task_id])
task = copy.deepcopy(run["purged_tasks"][task_id])
else:
task_id = int(task_id)
task = copy.deepcopy(run["tasks"][task_id])
Expand Down
122 changes: 51 additions & 71 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
from fishtest.stats.stat_util import SPRT_elo
from fishtest.userdb import UserDb
from fishtest.util import (
BinaryHistory,
crash_or_time,
estimate_game_duration,
format_bounds,
format_results,
get_bad_workers,
get_bad_workers_by_residual,
get_chi2,
get_hash,
get_tc_ratio,
Expand Down Expand Up @@ -573,7 +574,7 @@ def get_results(self, run, save_run=True):
has_pentanomial = True
pentanomial = 5 * [0]
for task in run["tasks"]:
if "bad" in task:
if "purged" in task:
continue
if "stats" in task:
stats = task["stats"]
Expand Down Expand Up @@ -994,14 +995,21 @@ def finished_run_message(self, run):
return ret

def handle_crash_or_time(self, run, task_id):
purged = False
task = run["tasks"][task_id]
worker = self.worker_runs.get(task["worker_info"]["unique_key"], None)
if worker is None:
worker = self.worker_runs[task["worker_info"]["unique_key"]] = {}
history = worker.get("crash_time_history", None) # Caution, we don't always have the proper lock for this!
if history is None:
history = worker["crash_time_history"] = BinaryHistory(8)
if crash_or_time(task):
stats = task.get("stats", {})
total = (
stats.get("wins", 0) + stats.get("losses", 0) + stats.get("draws", 0)
)
if not total:
return
return False
crashes = stats.get("crashes", 0)
time_losses = stats.get("time_losses", 0)
message = f"Time losses:{time_losses}({time_losses/total:.1%}) Crashes:{crashes}({crashes/total:.1%})"
Expand All @@ -1011,6 +1019,19 @@ def handle_crash_or_time(self, run, task_id):
task_id=task_id,
message=message,
)
history.update(True)
# We have to guard against bad patches causing problems vs bad workers.
if history.sum() >= 3: # 3 or more bad tasks in the last 8
# If we're confident that the worker is bad (not the patch), then
# insta-purge this task in the middle of the run.
# Autopurge, *after* stopping a run, will purge crash_or_time regardless of history.
run["purged_tasks"].append(task_purge_and_copy(task))
# Having purged this task, we must recalculate the results without this task
run["results_stale"] = purged = True
self.buffer(run, False)
else:
history.update(False)
return purged

def update_task(self, worker_info, run_id, task_id, stats, spsa):
lock = self.active_run_lock(str(run_id))
Expand Down Expand Up @@ -1087,7 +1108,7 @@ def count_games(d):
task["active"] = False
return {"task_alive": False, "error": error}

# The update seems fine. Update run["tasks"][task_id] (=task).
# The update seems fine, or at least half-sane. Update run["tasks"][task_id] (=task).

task["stats"] = stats
task["last_updated"] = update_time
Expand All @@ -1103,46 +1124,40 @@ def count_games(d):

run["last_updated"] = update_time

purged = False
if task_finished:
# run["cores"] is also updated in request_task().
# We use the same lock.
with self.task_lock:
run["workers"] -= 1
run["cores"] -= task["worker_info"]["concurrency"]
assert run["cores"] >= 0
purged = self.handle_crash_or_time(run, task_id)

run["results_stale"] = True # force recalculation of results
updated_results = self.get_results(
run, False
) # computed from run["tasks"] which
# has just been updated. Sets run["results_stale"]=False.
updated_results = self.get_results(run, purged) # computed from
# run["tasks"] which has just been updated. Sets run["results_stale"]=False.

if "sprt" in run["args"]:
sprt = run["args"]["sprt"]
fishtest.stats.stat_util.update_SPRT(updated_results, sprt)
if sprt["state"] != "":
task_finished = True
task["active"] = False

if "spsa" in run["args"] and spsa_games == spsa["num_games"]:
self.update_spsa(task["worker_info"]["unique_key"], run, spsa)

# Record tasks with an excessive amount of crashes or time losses in the event log

if task_finished:
self.handle_crash_or_time(run, task_id)

# Check if the run is finished.

run_finished = False
if count_games(updated_results) >= run["args"]["num_games"]:
run_finished = True
elif "sprt" in run["args"] and sprt["state"] != "":
run_finished = True
run_finished = (
count_games(updated_results) >= run["args"]["num_games"]
or "sprt" in run["args"] and sprt["state"] != ""
)

# Return.

if run_finished:
if not task_finished:
task["active"] = False
self.handle_crash_or_time(run, task_id)
self.buffer(run, True)
self.stop_run(run_id)
# stop run may not actually stop a run because of autopurging!
Expand Down Expand Up @@ -1245,75 +1260,40 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1):
now = datetime.utcnow()
if "start_time" not in run or (now - run["start_time"]).days > 30:
return "Run too old to be purged"
# Do not revive failed runs
if run.get("failed", False):
return "You cannot purge a failed run"
message = "No bad workers"
# Transfer bad tasks to run["bad_tasks"]
if "bad_tasks" not in run:
run["bad_tasks"] = []
return "You cannot purge, and thus revive, a failed run"
message = "No purged workers"
if "purged_tasks" not in run:
run["purged_tasks"] = []

# First, purge tasks by crashes/time losses
tasks = copy.copy(run["tasks"])
zero_stats = {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
}

for task_id, task in enumerate(tasks):
if "bad" in task:
if "purged" in task:
continue
# Special cases: crashes or time losses.
if crash_or_time(task):
message = ""
bad_task = copy.deepcopy(task)
# The next two lines are a bit hacky but
# the correct residual and color may not have
# been set yet.
bad_task["residual"] = 10.0
bad_task["residual_color"] = "#FF6A6A"
bad_task["task_id"] = task_id
bad_task["bad"] = True
run["bad_tasks"].append(bad_task)
# Rather than removing the task, we mark
# it as bad.
# In this way the numbering of tasks
# does not change.
# For safety we also set the stats
# to zero.
task["bad"] = True
task["active"] = False
task["stats"] = copy.deepcopy(zero_stats)
run["purged_tasks"].append(task_purge_and_copy(task))

chi2 = get_chi2(run["tasks"])
# Make sure the residuals are up to date.
# Once a task is moved to run["bad_tasks"] its
# residual will no longer change.
update_residuals(run["tasks"], cached_chi2=chi2)
bad_workers = get_bad_workers(
# Make sure the residuals are up to date. Once a task is purged to
# run["purged_tasks"] its residual will no longer change.
update_residuals(run["tasks"], chi2=chi2) # Marks residual color (and calls crash_or_time, marking such failures not already caught)
bad_workers = get_bad_workers_by_residual(
run["tasks"],
cached_chi2=chi2,
first_chi2=chi2,
p=p,
res=res,
iters=iters - 1 if message == "" else iters,
)
# Second, purge tasks by residual
tasks = copy.copy(run["tasks"])
for task_id, task in enumerate(tasks):
if "bad" in task:
if "purged" in task:
continue
if task["worker_info"]["unique_key"] in bad_workers:
message = ""
purged = True
bad_task = copy.deepcopy(task)
bad_task["task_id"] = task_id
bad_task["bad"] = True
run["bad_tasks"].append(bad_task)
task["bad"] = True
task["active"] = False
task["stats"] = copy.deepcopy(zero_stats)
run["purged_tasks"].append(task_purge_and_copy(task))

if message == "":
run["results_stale"] = True
Expand Down
6 changes: 3 additions & 3 deletions server/fishtest/templates/tests_view.mak
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@
</tr>
</thead>
<tbody>
% for idx, task in enumerate(run['tasks'] + run.get('bad_tasks', [])):
% for idx, task in enumerate(run['tasks'] + run.get('purged_tasks', [])):
<%
if task in run["tasks"] and "bad" in task:
if task in run["tasks"] and "purged" in task:
continue
if "task_id" in task:
task_id = task["task_id"]
Expand All @@ -412,7 +412,7 @@
%>
<tr class="${active_style}" id=task${task_id}>
<td><a href=${f"/api/pgn/{run['_id']}-{task_id:d}.pgn"}>${task_id}</a></td>
% if 'bad' in task:
% if 'purged' in task:
<td style="text-decoration:line-through; background-color:#ffebeb">
% else:
<td>
Expand Down
Loading