forked from abrignoni/ALEAPP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearch_files.py
108 lines (95 loc) · 4.2 KB
/
search_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import fnmatch
import os
import tarfile
import time
from pathlib import Path
from scripts.ilapfuncs import *
from zipfile import ZipFile
class FileSeekerBase:
# This is an abstract base class
def search(self, filepattern_to_search, return_on_first_hit=False):
'''Returns a list of paths for files/folders that matched'''
pass
def cleanup(self):
'''close any open handles'''
pass
class FileSeekerDir(FileSeekerBase):
def __init__(self, directory):
FileSeekerBase.__init__(self)
self.directory = directory
self._all_files = []
logfunc('Building files listing...')
self.build_files_list(directory)
logfunc(f'File listing complete - {len(self._all_files)} files')
def build_files_list(self, directory):
'''Populates all paths in directory into _all_files'''
try:
files_list = os.scandir(directory)
for item in files_list:
self._all_files.append(item.path)
if item.is_dir(follow_symlinks=False):
self.build_files_list(item.path)
except Exception as ex:
logfunc(f'Error reading {directory} ' + str(ex))
def search(self, filepattern, return_on_first_hit=False):
if return_on_first_hit:
for item in self._all_files:
if fnmatch.fnmatch(item, filepattern):
return [item]
return []
return fnmatch.filter(self._all_files, filepattern)
class FileSeekerTar(FileSeekerBase):
def __init__(self, tar_file_path, temp_folder):
FileSeekerBase.__init__(self)
self.is_gzip = tar_file_path.lower().endswith('gz')
mode ='r:gz' if self.is_gzip else 'r'
self.tar_file = tarfile.open(tar_file_path, mode)
self.temp_folder = temp_folder
self.directory = temp_folder
def search(self, filepattern, return_on_first_hit=False):
pathlist = []
for member in self.tar_file.getmembers():
if fnmatch.fnmatch('root/' + member.name, filepattern):
try:
clean_name = sanitize_file_path(member.name)
full_path = os.path.join(self.temp_folder, Path(clean_name))
if member.isdir():
os.makedirs(full_path, exist_ok=True)
else:
parent_dir = os.path.dirname(full_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
with open(full_path, "wb") as fout:
fout.write(tarfile.ExFileObject(self.tar_file, member).read())
fout.close()
os.utime(full_path, (member.mtime, member.mtime))
pathlist.append(full_path)
except Exception as ex:
logfunc(f'Could not write file to filesystem, path was {member.name} ' + str(ex))
return pathlist
def cleanup(self):
self.tar_file.close()
class FileSeekerZip(FileSeekerBase):
def __init__(self, zip_file_path, temp_folder):
FileSeekerBase.__init__(self)
self.zip_file = ZipFile(zip_file_path)
self.name_list = self.zip_file.namelist()
self.temp_folder = temp_folder
self.directory = temp_folder
def search(self, filepattern, return_on_first_hit=False):
pathlist = []
for member in self.name_list:
if fnmatch.fnmatch('root/' + member, filepattern):
try:
extracted_path = self.zip_file.extract(member, path=self.temp_folder) # already replaces illegal chars with _ when exporting
f = self.zip_file.getinfo(member)
date_time = f.date_time
date_time = time.mktime(date_time + (0, 0, -1))
os.utime(extracted_path, (date_time, date_time))
pathlist.append(extracted_path)
except Exception as ex:
member = member.lstrip("/")
logfunc(f'Could not write file to filesystem, path was {member} ' + str(ex))
return pathlist
def cleanup(self):
self.zip_file.close()