Skip to content

Commit

Permalink
cleanup, minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
xy committed Jan 12, 2024
1 parent 75d2f51 commit 545762a
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 239 deletions.
143 changes: 40 additions & 103 deletions src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@

SCAN_DAT_FILE = 'scaninfo'
SEARCH_DAT_FILE = 'searchinfo'
SIGINT_FILE = 'signal'
SIGNAL_FILE = 'signal'

def get_dev_labes_dict():
lsblk = subprocess_run(['lsblk','-fJ'],capture_output = True,text = True)
Expand Down Expand Up @@ -211,8 +211,7 @@ def popen_lin(command,shell,stdin=DEVNULL):

def send_signal(subproc,temp_dir,kind=0):
try:
signal_file = sep.join([temp_dir,SIGINT_FILE])
#print(f'sending signal in file {signal_file}')
signal_file = sep.join([temp_dir,SIGNAL_FILE])

temp_signal_file = signal_file+ '_temp'
with open(temp_signal_file,'w') as tsf:
Expand Down Expand Up @@ -304,19 +303,15 @@ def __init__(self,label='',scan_path=''):

#######################################################################
class LibrerRecord:
def __init__(self,log,label=None,scan_path=None,file_path=None):
def __init__(self,label=None,scan_path=None,file_path=None):
self.header = Header(label,scan_path)

self.filestructure = ()
self.customdata = []
self.filenames = []

self.log = log
self.find_results = []

self.info_line = ''
#self.info_line_current = ''

self.abort_action = False

self.file_name = ''
Expand All @@ -335,9 +330,6 @@ def load(self,file_path):
self.file_path = file_path
self.file_name = basename(normpath(file_path))

#self.log.info('loading %s' % file_name)
#TODO - problem w podprocesie

try:
with ZipFile(file_path, "r") as zip_file:
header_ser_compr = zip_file.read('header')
Expand All @@ -346,16 +338,12 @@ def load(self,file_path):
self.header.zipinfo["header"]=(asizeof(header_ser_compr),asizeof(header_ser),asizeof(self.header))

if self.header.data_format_version != DATA_FORMAT_VERSION:
message = f'loading "{file_path}" error: incompatible data format version: {self.header.data_format_version} vs {DATA_FORMAT_VERSION}'
self.log.error(message)
return message
return f'loading "{file_path}" error: incompatible data format version: {self.header.data_format_version} vs {DATA_FORMAT_VERSION}'

self.prepare_info()

except Exception as e:
message = f'loading "{file_path}" error: "{e}"'
#self.log.error(message)
return message
return f'loading "{file_path}" error: "{e}"'

return False

Expand All @@ -368,18 +356,12 @@ def save(self,print_func,file_path=None,compression_level=9):

self.file_path = file_path

#self.info_line = f'saving {filename}'

#self.log.info('saving %s' % file_path)
#print_func(['save',f'saving {file_path}'])

self_header = self.header

self.header.compression_level = compression_level

with ZipFile(file_path, "w") as zip_file:
def compress_with_header_update_wrapp(data,datalabel):
#self.info_line = f'compressing {datalabel}'
print_func(['save',f'compressing {datalabel}'],True)
compress_with_header_update(self.header,data,compression_level,datalabel,zip_file)

Expand Down Expand Up @@ -434,16 +416,11 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
if is_file:
self_header_ext_stats[ext]+=1

#self.info_line_current = entry_name

#print_func(('scan-line',entry_name))

try:
stat_res = stat(entry)
mtime = int(stat_res.st_mtime)
dev=stat_res.st_dev
except Exception as e:
#self.log.error('stat error:%s', e )
print_func( ('error',f'stat {entry_name} error:{e}') )
#size -1 <=> error, dev,in ==0
is_bind = False
Expand Down Expand Up @@ -501,16 +478,10 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
self_header.quant_folders += local_folder_folders_count

print_func( ('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,path) )
#t_now = perf_counter()
#if t_now>self.progress_update_time+1.0:
#self.progress_update_time = t_now

except Exception as e:
#self.log.error('scandir error:%s',e )
print_func( ('error', f'scandir {path} error:{e}') )

#self.info_line_current = ''

return (local_folder_size_with_subtree+local_folder_size,subitems)

def scan(self,print_func,abort_list,cde_list,check_dev=True):
Expand Down Expand Up @@ -608,18 +579,19 @@ def extract_customdata(self,print_func,abort_list):
self_header = self.header
scan_path = self_header.scan_path

#self.info_line = 'custom data extraction ...'
print_func( ('info','custom data extraction ...'),True)

self_header.files_cde_quant = 0
self_header.files_cde_size = 0
self_header.files_cde_size_extracted = 0
self_header.files_cde_errors_quant = defaultdict(int)
self_header.files_cde_errors_quant_all = 0
self_header.files_cde_quant_sum = len(self.customdata_pool)

files_cde_quant_sum = self_header.files_cde_quant_sum = len(self.customdata_pool)
files_cde_size_sum = self_header.files_cde_size_sum
cde_list = self.header.cde_list

print_func( ('cdeinit',files_cde_quant_sum,files_cde_size_sum),True)

customdata_helper={}

customdata_stats_size=defaultdict(int)
Expand All @@ -643,10 +615,10 @@ def threaded_cde(timeout_semi_list):
files_cde_size = 0
files_cde_size_extracted = 0

files_cde_errors_quant_all = 0
for (scan_like_list,subpath,rule_nr,size) in self.customdata_pool.values():

self.killed=False
#self.abort_action_single=False

time_start = perf_counter()
if abort_list[0] : #wszystko
Expand All @@ -661,8 +633,8 @@ def threaded_cde(timeout_semi_list):
full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep)
command,command_info = get_command(executable,parameters,full_file_path,shell)

info_line = f'{full_file_path} ({bytes_to_str(size)})'
print_func( ('cde',info_line,size, files_cde_size_extracted,self_header.files_cde_errors_quant_all,files_cde_quant,self_header.files_cde_quant_sum,files_cde_size,self_header.files_cde_size_sum) )
#print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_quant_sum,files_cde_size,files_cde_size_sum) )
print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_size) )

timeout_val=time()+timeout if timeout else None
#####################################
Expand Down Expand Up @@ -706,6 +678,7 @@ def threaded_cde(timeout_semi_list):

if returncode or self.killed or aborted:
files_cde_errors_quant[rule_nr]+=1
files_cde_errors_quant_all+=1

if not aborted:
files_cde_quant += 1
Expand Down Expand Up @@ -737,7 +710,7 @@ def threaded_cde(timeout_semi_list):
time_end_all = perf_counter()

self_header.files_cde_errors_quant=files_cde_errors_quant
self_header.files_cde_errors_quant_all = sum(files_cde_errors_quant.values())
self_header.files_cde_errors_quant_all = files_cde_errors_quant_all

self_header.files_cde_quant = files_cde_quant
self_header.files_cde_size = files_cde_size
Expand Down Expand Up @@ -910,8 +883,6 @@ def find_items(self,
filestructure = self.filestructure

search_progress = 0
#search_progress_update_quant = 0
#progress_update_time = perf_counter()

if cd_search_kind!='dont':
self.decompress_customdata()
Expand All @@ -937,8 +908,6 @@ def find_items(self,

self_customdata = self.customdata

#results_queue_put = results_queue.append

while search_list:
filestructure,parent_path_components = search_list_pop()

Expand Down Expand Up @@ -979,8 +948,6 @@ def find_items(self,
if name_func_to_call:
if name_func_to_call(name):
print_func( (search_progress,size,mtime,*next_level) )
#search_progress_update_quant=0
#progress_update_time = perf_counter()

if sub_data:
search_list_append( (sub_data,next_level) )
Expand Down Expand Up @@ -1047,22 +1014,8 @@ def find_items(self,
continue

print_func( (search_progress,size,mtime,*next_level) )
#search_progress_update_quant=0
#progress_update_time = perf_counter()

#print_func((search_progress))

#t_now = perf_counter()
#if t_now>progress_update_time+1.0:
# progress_update_time = t_now

#if search_progress_update_quant>1024:
#search_progress_update_quant=0
#else:
# search_progress_update_quant+=1

print_func( [search_progress] )
#print_func(True)

def find_items_sort(self,what,reverse):
if what=='data':
Expand Down Expand Up @@ -1288,7 +1241,6 @@ def __init__(self,db_dir,log):
self.db_dir = db_dir
self.log=log
self.info_line = 'init'
#self.info_line_current = ''

self.records_to_show=[]
self.abort_action=False
Expand All @@ -1304,7 +1256,7 @@ def update_sorted(self):
self.records_sorted = sorted(self.records,key = lambda x : x.header.creation_time)

def create(self,label='',scan_path=''):
new_record = LibrerRecord(self.log,label=label,scan_path=scan_path)
new_record = LibrerRecord(label=label,scan_path=scan_path)
new_record.db_dir = self.db_dir

self.records.add(new_record)
Expand Down Expand Up @@ -1573,14 +1525,14 @@ def find_results_clean(self):
########################################################################################################################
def create_new_record(self,temp_dir,update_callback):
self.log.info(f'create_new_record')
self_log_info = self.log.info


new_file_path = sep.join([self.db_dir,f'rep.{int(time())}.dat'])

#new_record_filename = str(int(time()) + .dat
command = self.record_exe()
command.append('create')
command.append(new_file_path)
#command.append(settings_file)
command.append(temp_dir)

self.abort_action=False
Expand All @@ -1602,8 +1554,6 @@ def create_new_record(self,temp_dir,update_callback):
self.stdout_files_cde_size_sum=0

def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
#results_list_append = results_semi_list[0].find_results.append

try:
subprocess = uni_popen(command,stdin=PIPE)
except Exception as re:
Expand All @@ -1617,12 +1567,12 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):

while True:
if line := subprocess_stdout_readline():
line_strip = line.strip()
self_log_info(f'rec:{line_strip}')
try:
#print(line)
if line[0]!='#':
val = json_loads(line.strip())
val = json_loads(line_strip)

self.info_line = val
kind = val[0]
if kind == 'stage':
self.stage = val[1]
Expand All @@ -1637,25 +1587,20 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
self.stdout_sum_size,self.stdout_quant_files,self.stdout_quant_folders,self.stdout_info_line_current = val[1:5]

elif self.stage==1: #cde
self.stdout_info_line_current = val[1]
self.stdout_cde_size = val[2]

self.stdout_files_cde_size_extracted=val[3]
self.stdout_files_cde_errors_quant_all=val[4]
self.stdout_files_cde_quant=val[5]
self.stdout_files_cde_quant_sum=val[6]
self.stdout_files_cde_size=val[7]
self.stdout_files_cde_size_sum=val[8]

self.stdout_info_line_current
self.stdout_cde_size

#print(type(self.stdout_files_cde_size_extracted))
#print(type(self.stdout_files_cde_errors_quant_all))
#print(type(self.stdout_files_cde_quant))
#print(type(self.stdout_files_cde_quant_sum))
#print(type(self.stdout_files_cde_size))
#print(type(self.stdout_files_cde_size_sum))
if val[0]=='cdeinit':
#files_cde_quant_sum,files_cde_size_sum
self.stdout_files_cde_quant_sum = val[1]
self.stdout_files_cde_size_sum = val[2]
elif val[0]=='cde':
self.stdout_info_line_current = val[1]
self.stdout_cde_size = val[2]

self.stdout_files_cde_size_extracted=val[3]
self.stdout_files_cde_errors_quant_all=val[4]
self.stdout_files_cde_quant=val[5]
self.stdout_files_cde_size=val[6]
else:
self_log_info('ERROR UNRECOGNIZED LINE')

elif self.stage==2: #pack
self.stdout_info_line_current = val[1]
Expand All @@ -1666,10 +1611,11 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
elif self.stage==4: #end
pass
else:
info_semi_list[0]=line.strip()
info_semi_list[0]=line_strip
except Exception as e:
print(f'threaded_run work error:{e} line:{line}')
info_semi_list[0]=f'threaded_run work error:{e} line:{line}'
self_log_info(f'threaded_run work error:{e} line:{line}')
else:
if subprocess_poll() is not None:
break
Expand All @@ -1683,32 +1629,24 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
job.start()
job_is_alive = job.is_alive

aborted=False
###########################################
while job_is_alive():
subprocess=processes_semi_list[0]
if subprocess:
if self.abort_action:
self.info_line = 'Aborting ...'
send_signal(subprocess,temp_dir,0)
self.abort_action=False
aborted=True
if self.abort_action_single:
self.info_line = 'Aborting single ...'
send_signal(subprocess,temp_dir,1)
self.abort_action_single=False
sleep(0.1)

#try:
# subprocess.kill()
#except Exception as ke:
# print('killing error:',ke)

#break
sleep(0.01)

self.info_line = f'scanning czy costam'
job.join()
###########################################

if not self.abort_action:
if not aborted:
new_record = self.create()

if res:=new_record.load(new_file_path) :
Expand All @@ -1717,7 +1655,6 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
print(res)
else:
update_callback(new_record)
#self.records_to_show.append( (new_record,info_curr_quant,info_curr_size) )

return True

Expand Down
Loading

0 comments on commit 545762a

Please sign in to comment.