-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsearch.py
117 lines (101 loc) · 3.47 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os, re, traceback
from dirtree_node import get_file_info
from util import is_text_file
class SearchAborted(Exception):
pass
def null_filter(info):
return True
class Search(object):
def __init__(self, path, match, output, file_filter=null_filter, dir_filter=null_filter):
self.path = path
self.match = match
self.output = output
self.file_filter = file_filter
self.dir_filter = dir_filter
self.encoding = "utf-8"
self.quit = False
def _search_file(self, filepath):
if self.quit:
raise SearchAborted()
self.output.begin_file(self, filepath)
if not is_text_file(filepath):
return
with open(filepath, "r") as f:
matched_file = False
for line_num, line in enumerate(f, 1):
line = line.rstrip("\r\n")
try:
line = line.decode(self.encoding)
except UnicodeDecodeError:
line = line.decode("latin-1")
if self.match(line):
if not matched_file:
self.output.add_file(self, filepath)
matched_file = True
self.output.add_line(self, line_num, line)
if self.quit:
raise SearchAborted()
if matched_file:
self.output.end_file(self)
def _search_dir(self, dirpath):
if self.quit:
raise SearchAborted()
try:
dirlist = os.listdir(dirpath)
except OSError:
pass
else:
dirlist.sort()
for name in dirlist:
self._search(dirpath, name)
def _search(self, dirpath, name):
if self.quit:
raise SearchAborted()
try:
info = get_file_info(dirpath, name)
if info.is_file and self.file_filter(info):
self._search_file(info.path)
elif info.is_dir and self.dir_filter(info):
self._search_dir(info.path)
except OSError:
pass
def search(self):
self.quit = False
try:
self._search(*os.path.split(self.path))
except SearchAborted:
self.output.abort_find(self)
except Exception as e:
self.output.end_find(self)
if not isinstance(e, (OSError, IOError)):
print traceback.format_exc()
else:
self.output.end_find(self)
def stop(self):
self.quit = True
class SearchFileOutput(object):
def __init__(self, file):
self.file = file
self.max_line_length = 100
def add_file(self, finder, filepath):
self.file.write(filepath + "\n")
def add_line(self, finder, line_num, line):
if len(line) > self.max_line_length:
line = line[:self.max_line_length] + "..."
self.file.write(" %d: %s\n" % (line_num, line))
def begin_file(self, finder, filepath):
pass
def end_file(self, finder):
self.file.write("\n")
def end_find(self, finder):
pass
def make_matcher(pattern, case_sensitive=True, is_regexp=False):
if not is_regexp:
pattern = "^.*" + re.escape(pattern)
flags = re.UNICODE
if not case_sensitive:
flags |= re.IGNORECASE
return re.compile(pattern, flags).search
if __name__ == "__main__":
import sys
Search(".", make_matcher("class"), SearchFileOutput(sys.stdout)).search()