Skip to content

Commit

Permalink
Stop keeping old copies of eventbus files (#941)
Browse files Browse the repository at this point in the history
We've never restored from these "backups", and as long as we ensure that
the current file points to a fully written file, it's fine to only keep
at most 2 files around: the actual current file and a temporary "new"
file.

To ensure that we're not pointing at a half-written file, we continue
using the age-old pattern for atomic file updates: write to another file
and swap a symlink once the write is complete
  • Loading branch information
nemacysts authored Feb 20, 2024
1 parent 60e4884 commit e4d4afd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
5 changes: 4 additions & 1 deletion tests/eventbus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ def test_sync_save_log_time(self, time):
self.eventbus.sync_save_log("test")
new_link = os.readlink(self.eventbus.log_current)
assert_equal(new_link, os.path.join(self.log_dir.name, "2.pickle"))
assert os.path.exists(current_link)
# we clean up the previous link so as not to have a million pickles
# on disk
assert not os.path.exists(current_link)
# so at this point, we should only have the new link
assert os.path.exists(new_link)

@mock.patch("tron.eventbus.time", autospec=True)
Expand Down
23 changes: 21 additions & 2 deletions tron/eventbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from collections import defaultdict
from collections import deque
from typing import Optional

from twisted.internet import reactor

Expand Down Expand Up @@ -144,9 +145,18 @@ def sync_load_log(self):
duration = time.time() - started
log.info(f"log read from disk, took {duration:.4}s")

def sync_save_log(self, reason):
def sync_save_log(self, reason: str) -> bool:
started = time.time()
new_file = os.path.join(self.log_dir, f"{int(started)}.pickle")
previous_file: Optional[str] = os.path.realpath(os.path.join(self.log_dir, "current"))
# if we're starting a fresh Tron server, there won't be a current symlink
# and the above line will give us the path to what will eventually be the current
# symlink...which is undesirable since we clean up whatever this points to :p
# we can tell if this is happening since this previous_file variable should
# always point to a file that ends in .pickle under normal operation
if previous_file and previous_file.endswith("current"):
previous_file = None

try:
with open(new_file, "xb") as f:
pickle.dump(self.event_log, f)
Expand All @@ -165,7 +175,16 @@ def sync_save_log(self, reason):
except FileNotFoundError:
pass
os.symlink(new_file, tmplink)
os.replace(tmplink, self.log_current)
os.replace(src=tmplink, dst=self.log_current)
# once we get here, `self.log_current` is now pointing to `new_file`
# so we can safely delete the previous `self.log_current` target without
# fear of losing data
if previous_file:
try:
os.remove(previous_file)
except Exception:
# this shouldn't happen - but we also shouldn't crash if the impossible happens
log.exception(f"unable to delete {previous_file} - continuing anyway.")

duration = time.time() - started
log.info(f"log dumped to disk because {reason}, took {duration:.4}s")
Expand Down

0 comments on commit e4d4afd

Please sign in to comment.