-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathdir-snapper
executable file
·130 lines (109 loc) · 5.01 KB
/
dir-snapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft
import traceback as tb, pathlib as pl, datetime as dt
import os, sys, re, shutil
def bak_ret_parse(s):
ret_map, ret_re = dict(), dict((k, rf'(\d+|\*){k}') for k in 'hdwmy')
ret_re.update(n=r'(\d+)')
for sv in s.split():
for k, pat in ret_re.items():
if not (pat and (m := re.search(rf'(?i)^{pat}$', sv))): continue
m = m.group(1)
ret_re[k], ret_map[k] = None, int(m) if m != '*' else 2**30
break
else: raise ValueError(f'Unrecognized/extra retention-pattern component: {sv!r} in {s!r}')
for k, unused in ret_re.items():
if unused: ret_map[k] = 0
return ret_map
def bak_ret_keys(bak_dt_map, ret_str):
'''Returns list of preserved backups from key-datetime
bak_dt_map, according to specified retention string.'''
ret_map = bak_ret_parse(ret_str)
bak_dt_list = sorted(bak_dt_map.items(), key=op.itemgetter(1))
dt_tpl_map = dict( y='%Y', w='%Y%W',
m='%Y%m', d='%Y%m%d', h='%Y%m%d%H' )
dt_buckets = dict((k, dict()) for k in dt_tpl_map)
for bak, bak_dt in bak_dt_list:
for k, bb_map in dt_buckets.items():
bn = ret_map[k]
if bn <= 0: continue
bk = bak_dt.strftime(dt_tpl_map[k])
if bk in bb_map: continue
bb_map[bk] = bak
while len(bb_map) > bn: del bb_map[next(iter(bb_map))]
bak_ret_set = set(it.chain.from_iterable(
bb_map.values() for bb_map in dt_buckets.values() ))
bn = max(1, ret_map['n'] - len(bak_ret_set))
for bak, bak_dt in reversed(bak_dt_list):
if bn <= 0: break
if bak in bak_ret_set: continue
bak_ret_set.add(bak)
bn -= 1
return bak_ret_set
def run_src_backup(p_snap, bak_fmt, src, ret_policy, rn, rv):
dt_fmt, dt_re = '%Y%m%d_%H%M%S', r'(\d{8}_\d{6})'
assert '\uf320' not in bak_fmt, bak_fmt
snaps, snap_re = dict(), re.compile( r'^' +
re.escape(bak_fmt.format(src=src, dt='\uf320')).replace('\uf320', dt_re) + '$' )
for snap in sorted(s.resolve() for s in p_snap.iterdir()):
if not (m := snap_re.search(snap.name)): continue
snaps[snap] = dt.datetime.strptime(m.group(1), dt_fmt)
snap_new_dt = dt.datetime.now()
snap_new = (p_snap / bak_fmt.format(
src=src, dt=snap_new_dt.strftime(dt_fmt) )).resolve()
if not rn: src.rename(snap_new)
if rv: print(f' Creating new dir snapshot: {snap_new}')
snaps[snap_new] = snap_new_dt
snaps_keep = bak_ret_keys(snaps, ret_policy)
if rv: print(' Managed old dir list/status:')
for snap in snaps:
s, rm = snap.name, snap not in snaps_keep
if rv:
if snap == snap_new: print(' ' + '[new] ' + s)
else: print(' ' + (' [rm] ' if rm else ' '*6) + s)
if rm and not rn: shutil.rmtree(snap)
def main(args=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description='Script to move/rename/rotate directory'
' to add date suffix and keep a configurable number of earlier ones.')
parser.add_argument('snap_dir',
help='Directory where all rotated dirs will be stored and managed.')
parser.add_argument('src', nargs='+', help='Dir(s) to rotate.')
parser.add_argument('-f', '--fmt', metavar='fmt', default='{src}.{dt}', help=dd('''
Format for resulting dir basename. Default: "%(default)s".
Supported formatting (str.format) keywords: src - src dir basename,
dt - datetime in roughly iso8601 format with tz (e.g. 20210925_081451).
All other dirs not matching same pattern exactly will be ignored.'''))
parser.add_argument('-p', '--ret-policy',
default='5 5d 5w 10m *y', metavar='retention', help=dd('''
Retention settings for how many and older directories to keep.
Should be a line using following format:
[<n>] [<hourly>h] [<daily>d] [<weekly>w] [<monthly>m] [<yearly>y]
Default value: %(default)s
Values are integers, or "*" (asterisk) for "all", "n" value is for total min.
See more in-depth description of these values in btrbk.conf(5):
https://digint.ch/btrbk/doc/btrbk.conf.5.html#_retention_policy'''))
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose operation mode.')
parser.add_argument('-n', '--dry-run', action='store_true',
help='Print all actions instead of executing'
' them and exit without doing anything. Implies --verbose.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
p_snap, rn = pl.Path(opts.snap_dir), opts.dry_run
fmt, policy, rv = opts.fmt, opts.ret_policy, rn or opts.verbose
if not p_snap.is_dir(): parser.error(f'Failed to find snapshot dir: {p_snap}')
err, src_list = None, list(map(pl.Path, opts.src))
for src in src_list:
if not src.is_dir(): parser.error(f'Failed to access src dir [ {src} ]')
for src in src_list:
if rv: print(f'Src: {src.name}' + (f' [ {src} ]' if str(src).startswith('/') else ''))
try: run_src_backup(p_snap, opts.fmt, src, opts.ret_policy, rn, rv)
except Exception:
print(f'ERROR: rotate/cleanup failed for src-dir - {src}', file=sys.stderr)
tb.print_exc(file=sys.stderr)
err = 1
return err
if __name__ == '__main__': sys.exit(main())