diff --git a/server/fishtest/api.py b/server/fishtest/api.py index b136fa3bd..59e8ab8b6 100644 --- a/server/fishtest/api.py +++ b/server/fishtest/api.py @@ -78,8 +78,8 @@ def strip_run(run): run = copy.deepcopy(run) if "tasks" in run: run["tasks"] = [] - if "bad_tasks" in run: - run["bad_tasks"] = [] + if "purged_tasks" in run: + run["purged_tasks"] = [] if "spsa" in run["args"] and "param_history" in run["args"]["spsa"]: run["args"]["spsa"]["param_history"] = [] run["_id"] = str(run["_id"]) @@ -338,9 +338,9 @@ def get_task(self): try: run = self.request.rundb.get_run(self.request.matchdict["id"]) task_id = self.request.matchdict["task_id"] - if task_id.endswith("bad"): + if task_id.endswith("bad"): # Where in the code is this made so? task_id = int(task_id[:-3]) - task = copy.deepcopy(run["bad_tasks"][task_id]) + task = copy.deepcopy(run["purged_tasks"][task_id]) else: task_id = int(task_id) task = copy.deepcopy(run["tasks"][task_id]) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 20d4da77c..2a79560be 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -18,11 +18,12 @@ from fishtest.stats.stat_util import SPRT_elo from fishtest.userdb import UserDb from fishtest.util import ( + BinaryHistory, crash_or_time, estimate_game_duration, format_bounds, format_results, - get_bad_workers, + get_bad_workers_by_residual, get_chi2, get_hash, get_tc_ratio, @@ -573,7 +574,7 @@ def get_results(self, run, save_run=True): has_pentanomial = True pentanomial = 5 * [0] for task in run["tasks"]: - if "bad" in task: + if "purged" in task: continue if "stats" in task: stats = task["stats"] @@ -994,14 +995,21 @@ def finished_run_message(self, run): return ret def handle_crash_or_time(self, run, task_id): + purged = False task = run["tasks"][task_id] + worker = self.worker_runs.get(task["worker_info"]["unique_key"], None) + if worker is None: + worker = self.worker_runs[task["worker_info"]["unique_key"]] = {} + history = worker.get("crash_time_history", None) # Caution, we don't always have the proper lock for this! + if history is None: + history = worker["crash_time_history"] = BinaryHistory(8) if crash_or_time(task): stats = task.get("stats", {}) total = ( stats.get("wins", 0) + stats.get("losses", 0) + stats.get("draws", 0) ) if not total: - return + return False crashes = stats.get("crashes", 0) time_losses = stats.get("time_losses", 0) message = f"Time losses:{time_losses}({time_losses/total:.1%}) Crashes:{crashes}({crashes/total:.1%})" @@ -1011,6 +1019,19 @@ def handle_crash_or_time(self, run, task_id): task_id=task_id, message=message, ) + history.update(True) + # We have to guard against bad patches causing problems vs bad workers. + if history.sum() >= 3: # 3 or more bad tasks in the last 8 + # If we're confident that the worker is bad (not the patch), then + # insta-purge this task in the middle of the run. + # Autopurge, *after* stopping a run, will purge crash_or_time regardless of history. + run["purged_tasks"].append(task_purge_and_copy(task)) + # Having purged this task, we must recalculate the results without this task + run["results_stale"] = purged = True + self.buffer(run, False) + else: + history.update(False) + return purged def update_task(self, worker_info, run_id, task_id, stats, spsa): lock = self.active_run_lock(str(run_id)) @@ -1087,7 +1108,7 @@ def count_games(d): task["active"] = False return {"task_alive": False, "error": error} - # The update seems fine. Update run["tasks"][task_id] (=task). + # The update seems fine, or at least half-sane. Update run["tasks"][task_id] (=task). task["stats"] = stats task["last_updated"] = update_time @@ -1103,6 +1124,7 @@ def count_games(d): run["last_updated"] = update_time + purged = False if task_finished: # run["cores"] is also updated in request_task(). # We use the same lock. @@ -1110,39 +1132,32 @@ def count_games(d): run["workers"] -= 1 run["cores"] -= task["worker_info"]["concurrency"] assert run["cores"] >= 0 + purged = self.handle_crash_or_time(run, task_id) run["results_stale"] = True # force recalculation of results - updated_results = self.get_results( - run, False - ) # computed from run["tasks"] which - # has just been updated. Sets run["results_stale"]=False. + updated_results = self.get_results(run, purged) # computed from + # run["tasks"] which has just been updated. Sets run["results_stale"]=False. if "sprt" in run["args"]: sprt = run["args"]["sprt"] fishtest.stats.stat_util.update_SPRT(updated_results, sprt) - if sprt["state"] != "": - task_finished = True - task["active"] = False if "spsa" in run["args"] and spsa_games == spsa["num_games"]: self.update_spsa(task["worker_info"]["unique_key"], run, spsa) - # Record tasks with an excessive amount of crashes or time losses in the event log - - if task_finished: - self.handle_crash_or_time(run, task_id) - # Check if the run is finished. - run_finished = False - if count_games(updated_results) >= run["args"]["num_games"]: - run_finished = True - elif "sprt" in run["args"] and sprt["state"] != "": - run_finished = True + run_finished = ( + count_games(updated_results) >= run["args"]["num_games"] + or "sprt" in run["args"] and sprt["state"] != "" + ) # Return. if run_finished: + if not task_finished: + task["active"] = False + self.handle_crash_or_time(run, task_id) self.buffer(run, True) self.stop_run(run_id) # stop run may not actually stop a run because of autopurging! @@ -1245,75 +1260,40 @@ def purge_run(self, run, p=0.001, res=7.0, iters=1): now = datetime.utcnow() if "start_time" not in run or (now - run["start_time"]).days > 30: return "Run too old to be purged" - # Do not revive failed runs if run.get("failed", False): - return "You cannot purge a failed run" - message = "No bad workers" - # Transfer bad tasks to run["bad_tasks"] - if "bad_tasks" not in run: - run["bad_tasks"] = [] + return "You cannot purge, and thus revive, a failed run" + message = "No purged workers" + if "purged_tasks" not in run: + run["purged_tasks"] = [] + # First, purge tasks by crashes/time losses tasks = copy.copy(run["tasks"]) - zero_stats = { - "wins": 0, - "losses": 0, - "draws": 0, - "crashes": 0, - "time_losses": 0, - "pentanomial": 5 * [0], - } - for task_id, task in enumerate(tasks): - if "bad" in task: + if "purged" in task: continue - # Special cases: crashes or time losses. if crash_or_time(task): message = "" - bad_task = copy.deepcopy(task) - # The next two lines are a bit hacky but - # the correct residual and color may not have - # been set yet. - bad_task["residual"] = 10.0 - bad_task["residual_color"] = "#FF6A6A" - bad_task["task_id"] = task_id - bad_task["bad"] = True - run["bad_tasks"].append(bad_task) - # Rather than removing the task, we mark - # it as bad. - # In this way the numbering of tasks - # does not change. - # For safety we also set the stats - # to zero. - task["bad"] = True - task["active"] = False - task["stats"] = copy.deepcopy(zero_stats) + run["purged_tasks"].append(task_purge_and_copy(task)) chi2 = get_chi2(run["tasks"]) - # Make sure the residuals are up to date. - # Once a task is moved to run["bad_tasks"] its - # residual will no longer change. - update_residuals(run["tasks"], cached_chi2=chi2) - bad_workers = get_bad_workers( + # Make sure the residuals are up to date. Once a task is purged to + # run["purged_tasks"] its residual will no longer change. + update_residuals(run["tasks"], chi2=chi2) # Marks residual color (and calls crash_or_time, marking such failures not already caught) + bad_workers = get_bad_workers_by_residual( run["tasks"], - cached_chi2=chi2, + first_chi2=chi2, p=p, res=res, iters=iters - 1 if message == "" else iters, ) + # Second, purge tasks by residual tasks = copy.copy(run["tasks"]) for task_id, task in enumerate(tasks): - if "bad" in task: + if "purged" in task: continue if task["worker_info"]["unique_key"] in bad_workers: message = "" - purged = True - bad_task = copy.deepcopy(task) - bad_task["task_id"] = task_id - bad_task["bad"] = True - run["bad_tasks"].append(bad_task) - task["bad"] = True - task["active"] = False - task["stats"] = copy.deepcopy(zero_stats) + run["purged_tasks"].append(task_purge_and_copy(task)) if message == "": run["results_stale"] = True diff --git a/server/fishtest/templates/tests_view.mak b/server/fishtest/templates/tests_view.mak index 1cfd9af3f..039a80f3f 100644 --- a/server/fishtest/templates/tests_view.mak +++ b/server/fishtest/templates/tests_view.mak @@ -389,9 +389,9 @@
- % for idx, task in enumerate(run['tasks'] + run.get('bad_tasks', [])): + % for idx, task in enumerate(run['tasks'] + run.get('purged_tasks', [])): <% - if task in run["tasks"] and "bad" in task: + if task in run["tasks"] and "purged" in task: continue if "task_id" in task: task_id = task["task_id"] @@ -412,7 +412,7 @@ %>