-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathfiletag
executable file
·399 lines (343 loc) · 15.4 KB
/
filetag
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
#!/usr/bin/env python
import itertools as it, operator as op, functools as ft
import pathlib as pl, collections as cs, contextlib as cl
import os, sys, stat, re, math, configparser, tempfile, time
p_err = lambda msg,*a,**k: print(msg, *a, **k, file=sys.stderr, flush=True)
conf_path_default = '~/.filetag.ini'
conf_example = r'''
;;;; This is an example INI file with specific sections/options.
;; For format description and examples, see:
;; https://docs.python.org/3/library/configparser.html#supported-ini-file-structure
;; Inline comments are not allowed. Multiline values are same as multiple same-key values.
[paths]
;;;; scan - paths to process files in, assigning tags to them.
;; Paths can contain spaces. Symlinks are not followed.
;; Can be specified multiple times for multiple paths.
;; All files outside of these will be removed from db on update.
;; Use separate configs/databases for multiple sets of scanned paths.
scan = ~/src
scan = ~/dev
;;;; db - tag database path to use.
db = ~/.filetag.db
[filter-regexps]
;; Regexps in python "re" module format to match scanned paths to include/skip.
;; Directives can be used to selectively
;; include/exclude files and skip scanning directories entirely.
;; All matched paths start with "/", which is scanned directory (from "scan=" above).
;; "include" directives override "skip" ones.
;; Scan this specific file from any ".git" directory.
include = /\.git/config$
;; And skip everything else in ".git" dirs
;; Note "." to match files in a dir, so that dir itself won't be skipped.
skip = /\.git/.
;; Skip these directories entirely - they won't be recursed into.
;; Slash at the end is the important part, "/\.hg/
skip = /\.(hg|bzr|redo)/
;; Skip files with specific extension(s).
;; Note that this won't match directories (no trailing "/").
skip = \.py[co]$
;; Skip specific directories in the root of scanned path(s).
skip = ^/(tmp|games)/
;; Example of using python regexp flags (i - case-insensitive match).
skip = (?i)/\.?svn(/|ignore)$
;; This will match both files and dirs starting with "venv-" anywhere
skip = /venv-
'''
tag_map = dict(
# Useful binary formats
bin_ext = {
# Images
'jpe?g': 'image jpg', 'png': 'image png', 'webp': 'image webp',
'gif': 'image gif', 'svgz?': 'image svg', 'ico': 'image ico',
'xpm': 'image xpm', 'bmp': 'image bmp', 'tiff': 'image tiff', 'tga': 'image tga',
'xcf': 'image xcf', 'xcf': 'image psd',
# Crypto
'ghg': 'crypt ghg', 'gpg': 'crypt gpg',
# Fonts
'ttf|woff2?|eot|otf|pcf': 'font' },
bin_ext_suff = r'(\.(bak|old|tmp|\d+))*',
# Code formats
code_ext = {
'py|tac': 'py', 'go': 'go', r'c(c|pp|xx|\+\+)?|hh?|lex|y(acc)?': 'c',
r'js(o?n(\.txt)?)?|coffee': 'js', 'co?nf|cf|cfg|ini': 'conf',
'nim': 'nim', 'ex|exs|erl|hrl': 'erlang',
'unit|service|taget|mount|desktop|rules': 'conf',
'[sx]?htm(l[45]?)?|css|less|jade': 'html', 'x[ms]l|xsd|dbk': 'xml',
'kml': 'kml', 'sgml|dtd': 'sgml',
'patch|diff|pat': 'diff', r'(ba|z|k|c|fi)?sh|env|exheres-\d+|ebuild|initd?': 'sh',
'sql': 'sql', 'p(l|m|erl|od)|al': 'perl', 'ph(p[s45t]?|tml)': 'php',
'[cej]l|li?sp|rkt|sc[mh]|stk|ss': 'lisp', r'el(\.gz)?': 'el', 'ml': 'ml ocaml',
'hs': 'haskell', 'rb': 'ruby', 'lua': 'lua', 'awk': 'awk', 'tcl': 'tcl', 'java': 'java',
'mk?d|markdown': 'md', 're?st': 'rst', 'te?xt': 'txt', 'log': 'log',
'rdf': 'rdf', 'xul': 'xul', 'po': 'po', 'csv': 'csv',
'f(or)?': 'fortran', 'p(as)?': 'pascal', 'dpr': 'delphi', 'ad[abs]|ad[bs].dg': 'ada',
'ya?ml': 'yaml', r'jso?n(\.txt)?': 'json', 'do': 'redo', 'm[k4c]|a[cm]|cmake': 'make' },
code_ext_suff = r'(\.(in|tpl|(src-)?bak|old|tmp|\d+'
r'|backup|default|example|sample|dist|\w+-new)|_t)*',
code_shebang = r'(?i)^#!((/usr/bin/env)?\s+)?(/.*/)?(?P<bin>\S+)',
code_bin = {
r'lua[-\d.]': 'lua', r'php\d?': 'php',
r'j?ruby(\d\.\d)?|rbx': 'ruby',
r'[jp]ython(\d(\.\d+)?)?': 'py',
'[gnm]?awk': 'awk',
r'(mini)?perl(\d(\.\d+)?)?': 'perl',
'wishx?|tcl(sh)?': 'tcl',
'scm|guile|clisp|racket|(sb)?cl|emacs': 'lisp',
'([bdo]?a|t?c|k|z)?sh': 'sh' },
code_path = {
'/(Makefile|CMakeLists.txt|Imakefile|makepp|configure)$': 'make',
r'/([Dd]ockerfile|[Dd]ocker(file)?\.[^/]+)$': 'docker',
'/config$': 'conf', 'rakefile': 'ruby', '/zsh/_[^/]+$': 'sh', 'patch': 'diff' } )
class TagDB:
# No migrations needed here, as db gets rebuilt on each run
_db, _db_schema = None, '''
create table if not exists paths (
tag text not null, path text not null, mtime real not null );
create index if not exists paths_tag on paths (tag);'''
def __init__(self, path, lock_timeout=60, lazy=True):
import sqlite3
self._sqlite, self._ts_activity = sqlite3, 0
self._db_kws = dict( database=path,
isolation_level='IMMEDIATE', timeout=lock_timeout )
if not lazy: self._db_init()
def close(self, inactive_timeout=None):
if ( inactive_timeout is not None
and (time.monotonic() - self._ts_activity) < inactive_timeout ): return
if self._db:
self._db.close()
self._db = None
def __enter__(self): return self
def __exit__(self, *err): self.close()
def _db_init(self):
self._db = self._sqlite.connect(**self._db_kws)
with self._db_cursor() as c:
for stmt in self._db_schema.split(';'): c.execute(stmt)
@cl.contextmanager
def _db_cursor(self):
self._ts_activity = time.monotonic()
if not self._db: self._db_init()
with self._db as conn, cl.closing(conn.cursor()) as c: yield c
@cl.contextmanager
def tagger_context(self):
with self._db_cursor() as c:
tag_func = lambda tag, path, mtime: c.execute(
'insert into paths (tag, path, mtime) values (?, ?, ?)', (tag, path, mtime) )
yield tag_func
def lookup_tags(self, tagsets):
q_where, q_args = list(), list()
for tag_group in filter(None, tagsets):
q_where.append('(' + ' and '.join(['tag = ?']*len(tag_group)) + ')')
q_args.extend(tag_group)
q_where = ('where ' + ' or '.join(q_where)) if q_where else ''
with self._db_cursor() as c:
c.execute(f'select path, mtime from paths {q_where} order by mtime desc', q_args)
return (row[0] for row in (c.fetchall() or list()))
def lookup_paths(self, paths):
q_where, q_args = list(), list()
for p_str in paths:
q_where.append('path = ?')
q_args.append(p_str)
q_where = ('where ' + ' or '.join(q_where)) if q_where else ''
with self._db_cursor() as c:
c.execute(f'select path, tag from paths {q_where} order by path', q_args)
return list(
(p_str, set(map(op.itemgetter(1), p_tags)))
for p_str, p_tags in it.groupby(c.fetchall() or list(), key=op.itemgetter(0)) )
class PathFilter:
def __init__(self, res_inc, res_skip):
self.res_inc, self.res_skip = res_inc, res_skip
def check(self, fn):
if any(rx.search(fn) for rx in self.res_inc): return True
if any(rx.search(fn) for rx in self.res_skip): return False
return True
class FileTreeTagger:
def __init__(self, tag_func, tag_map, p_filter, verbose=False):
self.tag_func, self.p_filter, self.verbose = tag_func, p_filter, verbose
self.tags = self.tag_map_compile(tag_map)
@staticmethod
def tag_map_compile(tag_map):
tag_map, tag_set = tag_map.copy(), set()
code_ext_suff, bin_ext_suff = (
tag_map.pop(k) for k in ['code_ext_suff', 'bin_ext_suff'] )
for k, v in tag_map.items():
if isinstance(v, str): tag_map[k] = re.compile(v)
elif isinstance(v, dict):
taggers = tag_map[k] = list()
for rx, tags in v.items():
if k == 'code_ext': rx = rf'\.({rx}){code_ext_suff}$'
elif k == 'bin_ext': rx = rf'(?i)\.({rx}){bin_ext_suff}$'
elif k == 'code_bin': rx = rf'^{rx}$'
tags = tags.split()
tag_set.update(tags)
taggers.append((re.compile(rx), tags))
return type('TagMap', (object,), tag_map)
def process_path(self, p):
p, ts0 = str(p.resolve()), time.monotonic()
p_len = len(p)
for root, dirs, files in os.walk(p):
if not root.startswith(p): raise RuntimeError(p, root)
fn_root, root = root[p_len:] + '/', pl.Path(root)
for n, fn in enumerate(dirs):
fn = fn_root + fn + '/'
if self.p_filter.check(fn): continue
del dirs[n] # will skip going into dir
for fn in files:
fn, p_file = fn_root + fn, root / fn
if not self.p_filter.check(fn): continue
tagset = self.tag_file(p_file)
if self.verbose: print(f'{time.monotonic()-ts0:.3f} {len(tagset)} {fn.lstrip("/")}')
def tag_file(self, p):
p_str, tagset = str(p), set()
try: p_mtime = p.stat().st_mtime
except OSError: p_mtime = 0
for rx, tags in self.tags.code_ext: # code file extension
if rx.search(p_str): tagset.update(tags)
if not tagset:
for rx, tags in self.tags.bin_ext: # bin file extension
if rx.search(p_str): tagset.update(tags)
if not tagset: # code shebang
try:
if not stat.S_ISREG(p.stat().st_mode): raise OSError
with p.open('rb', 255) as src:
os.posix_fadvise(src.fileno(), 0, 256, os.POSIX_FADV_NOREUSE)
shebang = src.readline(200).decode().strip()
except (OSError, UnicodeDecodeError): shebang = ''
if m := self.tags.code_shebang.search(shebang):
bin_name = m.group('bin')
for rx, tags in self.tags.code_bin:
if rx.search(bin_name): tagset.update(tags)
if not tagset: # code filenames/paths
for rx, tags in self.tags.code_path:
if rx.search(p_str): tagset.update(tags)
if tagset: # commit path tags to db
for tag in tagset: self.tag_func(tag, p_str, p_mtime)
return tagset
class ConfDict(cs.UserDict):
def __setitem__(self, k, v):
if isinstance(v, list) and k in self: self[k].extend(v)
else: super().__setitem__(k, v)
def main(args=None):
import argparse, textwrap
dd = lambda text: (textwrap.dedent(text).strip('\n') + '\n').replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [options] [-u|--update] [ tags... ]',
formatter_class=argparse.RawTextHelpFormatter,
description='Query or update file tagging database.')
parser.add_argument('tags', nargs='*', help=dd('''
For default tag-lookup mode - list files for specified tag(s).
"+" will combine tags via "and" logic, spaces and multiple args are "or".
Use DNF logic calculator to convert complex human-readable expressions
to simplified ones for use here, for example via
https://www.dcode.fr/boolean-expressions-calculator
Returned paths are ordered by stored modification time values.'''))
parser.add_argument('-c', '--conf',
action='append', metavar='path', default=list(), help=dd(f'''
Path to configuration file, specifying db file and paths to process.
Use "--conf help" to print example config with all supported options.
Can be specified multiple times, with values in later files overriding earlier ones.
Default config path, if none are specified: {conf_path_default}'''))
group = parser.add_argument_group('Update tags mode')
group.add_argument('-u', '--update', action='store_true',
help='Build/update tag database for all configured paths.')
group.add_argument('-v', '--verbose', action='store_true',
help='Print all files being processed and precise timestamps for benchmarking.')
group.add_argument('--nice',
metavar='[prio:][io-class[.io-level]]', default='15:idle', help=dd('''
Set "nice" and/or "ionice" (CFQ I/O) priorities.
"nice" prio value, if specified, must be
in -20-20 range, where lower = higher prio, and base=0.
"ionice" value should be in class[:level] format, where
"class" is one of [rt, be, idle] and "level" in 0-7 range (0=highest prio).
See setpriority(2) / ioprio_set(2) for more info. Default: %(default)s'''))
group = parser.add_argument_group('Query/lookup options')
group.add_argument('-e', '--existing', action='store_true',
help='Only print paths that are currently accessible for stat().')
group.add_argument('-0', '-z', '--print0', action='store_true',
help='Output NUL-separated (\\x00 byte) paths instead of newline-separated.')
group.add_argument('-p', '--path',
action='store_true', help='Print db tags for specified path(s).')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if 'help' in opts.conf: return print(conf_example.strip())
conf = configparser.ConfigParser(strict=False, dict_type=ConfDict, interpolation=None)
conf.read(list(pl.Path(p).expanduser() for p in (opts.conf or [conf_path_default])))
conf_paths = conf['paths']
try: conf_filter = conf['filter-regexps']
except KeyError: conf_filter = dict()
conf_list, conf_value = lambda v: re.split(r'\n+', v), lambda v: conf_list(v)[-1]
p_db = pl.Path(conf_value(conf_paths['db'])).expanduser()
p_scan_list = (pl.Path(p).expanduser() for p in conf_list(conf_paths['scan']))
path_filter = PathFilter(*(
(list(map(re.compile, conf_list(conf_filter[k]))) if k in conf_filter else [])
for k in ['include', 'skip'] ))
if opts.update:
if opts.nice is not None:
nice, ionice = (opts.nice + ':').split(':', 1)
if not ionice and not nice.isdigit(): nice, ionice = None, nice
if nice: os.setpriority(os.PRIO_PROCESS, os.getpid(), int(nice))
if ionice: # grep -r ioprio_set /usr/share/gdb/syscalls/
scid = os.uname().machine
try: scid = dict(x86_64=251, armv7l=314, aarch64=30)[scid]
except KeyError: parser.error(f'--nice ionice is not supported on arch: {scid}')
ionice = ionice.rstrip(':').split('.', 1)
if len(ionice) == 1: ionice.append(0)
ionice[0] = dict(rt=1, be=2, idle=3)[ionice[0].lower()]
if ionice[0] == 3: ionice[1] = 0
elif 0 <= ionice[1] <= 7: parser.error('--nice ionice prio level must be in 0-7 range')
ionice = (ionice[0] << 13) | ionice[1]
import ctypes as ct
err = ct.CDLL('libc.so.6', use_errno=True).syscall(scid, 1, os.getpid(), ionice)
if err != 0: raise OSError(e := ct.get_errno(), f'ionice_set failed - {os.strerror(e)}')
if not p_scan_list: parser.error('No paths to scan found in config file')
try:
with cl.ExitStack() as ctx:
p_db_tmp = ctx.enter_context(
tempfile.NamedTemporaryFile(dir=p_db.parent, prefix=p_db.name+'.', delete=False) )
p_db_tmp = pl.Path(p_db_tmp.name)
db = ctx.enter_context(TagDB(p_db_tmp))
tag_func = ctx.enter_context(db.tagger_context())
tagger = FileTreeTagger(tag_func, tag_map, path_filter, verbose=opts.verbose)
for p in p_scan_list: tagger.process_path(p)
except:
p_db_tmp.unlink(missing_ok=True)
raise
else: p_db_tmp.rename(p_db)
if not opts.tags: return
print_func = lambda *line: print(*line, end='\n' if not opts.print0 else '\0')
with TagDB(p_db) as db:
if not opts.path:
tagsets = list(set(ts.split('+')) for ts in ' '.join(opts.tags).split())
for p_str in db.lookup_tags(tagsets):
if opts.existing and not os.path.exists(p_str): continue
print_func(p_str)
return
paths = set()
for p_str in opts.tags:
p = pl.Path(p_str).resolve()
if opts.existing and not p.exists(): continue
paths.add(str(p))
for p_str, tags in db.lookup_paths(paths):
print_func(p_str, '::', ' '.join(tags))
paths.remove(p_str)
if not paths: return
for p_str in paths: # check if tags are missing due to filtering lists
for p_scan in p_scan_list:
p = str(p_scan.resolve())
if not p_str.startswith(f'{p}/'): continue
p_str = p_str[len(p):]
break
else: continue # can still be a symlink, but whatever
p_chunks = p_str.split('/')
for n in range(1, len(p_chunks)):
p_dir = '/'.join(p_chunks[:n]) + '/'
if not path_filter.check(p_dir):
p_err( f'{p_str}: parent dir {p_dir!r}'
f' is ignored due to filtering (scan root: {p_scan})' )
break
else:
if not path_filter.check(p_str):
p_err('{p_str}: path ignored due to filtering (scan root: {p_scan})')
if __name__ == '__main__':
try: sys.exit(main())
except BrokenPipeError: # stdout pipe closed
os.dup2(os.open(os.devnull, os.O_WRONLY), sys.stdout.fileno())
sys.exit(1)