Skip to content

Commit

Permalink
add compress arg to funtools.RotatingFileWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
lidong committed Nov 18, 2024
1 parent e5875de commit 2043e0e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
2. fix `utils.FileDict.save`
3. add `ipc.QueueManager` based on BaseManager, add JSON listener
4. add `compress`, `ensure_dir` arg to `funtools.SizedTimedRotatingFileHandler`
5. add `compress` arg to `funtools.RotatingFileWriter`

### 1.1.6 (2024-09-09)
1. add filename_filter to utils.format_error
Expand Down
102 changes: 85 additions & 17 deletions morebuiltins/functools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from itertools import chain
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from threading import Lock, Semaphore, Thread
from threading import Lock, RLock, Semaphore, Thread
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -901,7 +901,23 @@ class RotatingFileWriter:
>>> len(writer.backup_path_list())
0
>>> writer.clean_backups(writer.max_backups)
>>> writer.unlink_file()
>>> len(writer.backup_path_list())
0
>>> writer = RotatingFileWriter("test.log", max_size=20, max_backups=3)
>>> writer.print("1" * 100)
>>> writer.unlink(rotate=False)
>>> len(writer.backup_path_list())
1
>>> writer.unlink(rotate=True)
>>> len(writer.backup_path_list())
0
>>> writer = RotatingFileWriter("test.log", max_size=20, max_backups=3, compress=True)
>>> writer.print("1" * 100)
>>> len(writer.backup_path_list())
1
>>> writer.unlink(rotate=True)
>>> len(writer.backup_path_list())
0
"""

check_exist_every = 100
Expand All @@ -915,9 +931,12 @@ def __init__(
errors=None,
buffering=-1,
newline=None,
compress=False,
):
if max_backups < 0:
raise ValueError("max_backups must be greater than -1, 0 for itself.")
self._compress_threads: WeakSet = WeakSet()
self._rotate_lock = RLock()
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.max_size = max_size
Expand All @@ -926,17 +945,42 @@ def __init__(
self.errors = errors
self.buffering = buffering
self.newline = newline
self.compress = compress
self.file = self.reopen_file()
self._check_exist_count = self.check_exist_every + 1
self._rotate_lock = Lock()

def get_suffix(self):
return time.strftime("%Y%m%d%H%M%S")

def unlink_file(self):
return self.unlink(rotate=False)

def unlink(self, rotate=True, parent=False):
self.close_file()
self.path.unlink(missing_ok=True)
if rotate:
self.clean_backups(count=self.max_backups + 1)
if parent:
for _ in self.path.parent.iterdir():
return
else:
self.path.parent.rmdir()

def close(self):
return self.close_file()

def __enter__(self):
return self

def __exit__(self, *_):
self.shutdown()

def shutdown(self):
self.close_file()
if self.compress:
for t in self._compress_threads:
t.join()

def close_file(self):
file_obj = getattr(self, "file", None)
if file_obj and not file_obj.closed:
Expand Down Expand Up @@ -964,9 +1008,12 @@ def rotate(self, new_length):
if self.need_rotate(new_length):
if self.max_backups > 0:
self.close_file()
now = time.strftime("%Y%m%d%H%M%S")
_suffix = self.get_suffix()
for index in range(self.max_backups):
suffix = f"{now}_{index}" if index else now
if index:
suffix = f"{_suffix}_{index}"
else:
suffix = _suffix
target_path = self.path.with_name(f"{self.path.name}.{suffix}")
if target_path.is_file():
# already rotated
Expand All @@ -979,11 +1026,27 @@ def rotate(self, new_length):
)
self.path.rename(target_path)
self.reopen_file()
self.clean_backups(count=None)
if not self.compress:
self.clean_backups(count=None)
else:
self.file.seek(0)
self.file.truncate()

def do_compress(self):
with self._rotate_lock:
for path in self.path.parent.glob(f"{self.path.name}.*"):
if path.name == self.path.name:
continue
elif path.suffix == ".gz":
continue
temp_path = path.with_name(path.name + ".gz")
with GzipFile(temp_path, "wb") as gzip_file:
with path.open("rb") as src_file:
for line in src_file:
gzip_file.write(line)
path.unlink(missing_ok=True)
self.clean_backups()

def need_rotate(self, new_length):
return self.max_size and self.file.tell() + new_length > self.max_size

Expand All @@ -994,22 +1057,27 @@ def ensure_file(self, new_length=0):
self.reopen_file()
elif self.need_rotate(new_length):
self.rotate(new_length)
if self.compress:
t = Thread(target=self.do_compress)
t.start()
self._compress_threads.add(t)

def backup_path_list(self):
return list(self.path.parent.glob(f"{self.path.name}.*"))

def clean_backups(self, count=None):
"""Clean oldest {count} backups, if count is None, it will clean up to max_backups."""
path_list = self.backup_path_list()
if path_list:
if count is None:
count = len(path_list) - self.max_backups
if count > 0:
path_list.sort(key=lambda x: x.stat().st_mtime)
for deleted, path in enumerate(path_list, 1):
path.unlink(missing_ok=True)
if deleted >= count:
break
with self._rotate_lock:
path_list = self.backup_path_list()
if path_list:
if count is None:
count = len(path_list) - self.max_backups
if count > 0:
path_list.sort(key=lambda x: x.stat().st_mtime)
for deleted, path in enumerate(path_list, 1):
path.unlink(missing_ok=True)
if deleted >= count:
break

def flush(self):
self.file.flush()
Expand All @@ -1026,7 +1094,7 @@ def print(self, *strings, end="\n", sep=" ", flush=False):
self.write(text, flush=flush)

def __del__(self):
self.close_file()
self.shutdown()


def get_function(entrypoint: str):
Expand Down

0 comments on commit 2043e0e

Please sign in to comment.