diff --git a/src/core.py b/src/core.py index d64a51e..abd85ae 100644 --- a/src/core.py +++ b/src/core.py @@ -82,7 +82,6 @@ CD_INDEX_ID = 1 CD_DATA_ID = 2 - def get_dev_labes_dict(): lsblk = subprocess_run(['lsblk','-fJ'],capture_output = True,text = True) lsblk.dict = json_loads(lsblk.stdout) @@ -232,20 +231,20 @@ def send_signal(subproc,temp_dir,kind=0): except Exception as se: print(f'subprocess signal error: {se}') -def kill_subprocess(subproc,print_func=print): +def kill_subprocess( subproc,print_func=lambda x,force=None : print(x) ): try: pid = subproc.pid if windows: kill_cmd = ['taskkill', '/F', '/T', '/PID', str(pid)] - print_func( ('info',f'killing pid: {pid}') ) + print_func( ('info',f'killing pid: {pid}'),True ) subprocess_run(kill_cmd) else: - print_func( ('info',f'killing process group of pid {pid}') ) + print_func( ('info',f'killing process group of pid {pid}'),True ) killpg(getpgid(pid), SIGTERM) except Exception as ke: - print_func( ('error',f'kill_subprocess error: {ke}') ) + print_func( ('error',f'kill_subprocess error: {ke}'),True ) def compress_with_header_update(header,data,compression,datalabel,zip_file): t0 = perf_counter() @@ -283,7 +282,7 @@ def __init__(self,label='',scan_path=''): self.files_cde_quant = 0 self.files_cde_quant_sum = 0 - self.files_cde_size_extracted = 0 + self.cde_size_extracted = 0 self.items_names=0 self.items_cd=0 @@ -293,7 +292,7 @@ def __init__(self,label='',scan_path=''): self.cde_list = [] self.files_cde_errors_quant = {} - self.files_cde_errors_quant_all = 0 + self.cde_errors_quant_all = 0 self.cde_stats_time_all = 0 self.zipinfo = {} @@ -358,6 +357,7 @@ def load(self,file_path): return False + label_of_datalabel = {'filestructure':'Filestructure','filenames':'Filenames','customdata':'Custom Data','header':'Header'} def save(self,print_func,file_path=None,compression_level=9): if file_path: filename = basename(normpath(file_path)) @@ -371,9 +371,10 @@ def save(self,print_func,file_path=None,compression_level=9): self.header.compression_level = compression_level + self_label_of_datalabel = self.label_of_datalabel with ZipFile(file_path, "w") as zip_file: def compress_with_header_update_wrapp(data,datalabel): - print_func(['save',f'compressing {datalabel}'],True) + print_func(('save',f'Compressing {self_label_of_datalabel[datalabel]} ({bytes_to_str(asizeof(data))})'),True) compress_with_header_update(self.header,data,compression_level,datalabel,zip_file) compress_with_header_update_wrapp(self.filestructure,'filestructure') @@ -405,7 +406,7 @@ def compress_with_header_update_wrapp(data,datalabel): self.prepare_info() - print_func(['save','finished'],True) + print_func(('save','finished'),True) def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check_dev=True,dev_call=None) : if any(abort_list) : @@ -448,7 +449,7 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check mtime = int(stat_res.st_mtime) dev=stat_res.st_dev except Exception as e: - print_func( ('error',f'stat {entry_name} error:{e}') ) + print_func( ('error',f'stat {entry_name} error:{e}'),True ) #size -1 <=> error, dev,in ==0 is_bind = False size=-1 @@ -462,7 +463,7 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check if dev_call: if dev_call!=dev: #self.log.info('devices mismatch:%s %s %s %s' % (path,entry_name,dev_call,dev) ) - print_func( ('info',f'devices mismatch:{path},{entry_name},{dev_call},{dev}') ) + print_func( ('info',f'devices mismatch:{path},{entry_name},{dev_call},{dev}'),True ) is_bind=True else: dev_call=dev @@ -507,7 +508,7 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check print_func( ('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,path) ) except Exception as e: - print_func( ('error', f'scandir {path} error:{e}') ) + print_func( ('error', f'scandir {path} error:{e}'),True ) return (local_folder_size_with_subtree+local_folder_size,subitems) @@ -537,7 +538,7 @@ def scan(self,print_func,abort_list,cde_list,check_dev=True): self.customdata_pool_index = 0 if cde_list: - print_func( ('info','estimating files pool for custom data extraction') ) + print_func( ('info','Estimating files pool for custom data extraction.'),True ) self.prepare_customdata_pool_rec(print_func,abort_list,self.scan_data,[]) def prepare_customdata_pool_rec(self,print_func,abort_list,scan_like_data,parent_path): @@ -600,84 +601,74 @@ def prepare_customdata_pool_rec(self,print_func,abort_list,scan_like_data,parent except Exception as e: #self.log.error('prepare_customdata_pool_rec error::%s',e ) #print('prepare_customdata_pool_rec',e,entry_name,size,is_dir,is_file,is_symlink,is_bind,has_files,mtime) - print_func( ('error','prepare_customdata_pool_rec:{e},{entry_name},{size},{is_dir},{is_file},{is_symlink},{is_bind},{has_files},{mtime}') ) + print_func( ('error','prepare_customdata_pool_rec:{e},{entry_name},{size},{is_dir},{is_file},{is_symlink},{is_bind},{has_files},{mtime}'),True ) - def extract_customdata(self,print_func,abort_list,threads_quant=0): + def extract_customdata(self,print_func,abort_list,threads=0): self_header = self.header scan_path = self_header.scan_path - print_func( ('info',f'custom data extraction {threads_quant=}...'),True) + print_func( ('info',f'custom data extraction {threads=}...'),True) self_header.files_cde_quant = 0 self_header.files_cde_size = 0 - self_header.files_cde_size_extracted = 0 + self_header.cde_size_extracted = 0 self_header.files_cde_errors_quant = defaultdict(int) - self_header.files_cde_errors_quant_all = 0 + self_header.cde_errors_quant_all = 0 + self_header.threads = threads + files_cde_quant_sum = self_header.files_cde_quant_sum = len(self.customdata_pool) files_cde_size_sum = self_header.files_cde_size_sum cde_list = self.header.cde_list print_func( ('cdeinit',files_cde_quant_sum,files_cde_size_sum),True) - customdata_stats_size=defaultdict(int) - customdata_stats_uniq=defaultdict(int) - customdata_stats_refs=defaultdict(int) - customdata_stats_time=defaultdict(float) - - customdata_stats_time_all=[0] - - if threads_quant==0: - threads_quant = cpu_count() + if threads==0: + threads = cpu_count() customdata_pool_per_thread = defaultdict(list) - timeout_semi_list_per_thread = { thread_index:[None] for thread_index in range(threads_quant) } - self.killed = { thread_index:False for thread_index in range(threads_quant) } + timeout_semi_list_per_thread = { thread_index:[None] for thread_index in range(threads) } + self.killed = { thread_index:False for thread_index in range(threads) } - #per_thread_customdata_dict={} thread_index = 0 for val_tuple in self.customdata_pool.values(): customdata_pool_per_thread[thread_index].append(val_tuple) thread_index+=1 - thread_index %= threads_quant - #per_thread_customdata_dict[thread_index]={} - - #print(f'{thread_index=}') + thread_index %= threads CD_OK_ID_loc = CD_OK_ID CD_DATA_ID_loc = CD_DATA_ID all_threads_data_list={} - #files_cde_errors_quant={} all_threads_files_cde_errors_quant = {} + all_threads_customdata_stats_time = {} - for thread_index in range(threads_quant): + for thread_index in range(threads): all_threads_data_list[thread_index]=[0,0,0,0] - #files_cde_errors_quant[thread_index]=defaultdict(int) all_threads_files_cde_errors_quant[thread_index]=defaultdict(int) + all_threads_customdata_stats_time[thread_index]=defaultdict(float) time_start_all = perf_counter() + single_thread = bool(threads==1) ############################################################# - def threaded_cde(timeout_semi_list,thread_index,thread_data_list,files_cde_errors_quant): - - #curr_per_thread_customdata_dict = per_thread_customdata_dict[thread_index] - #cd_index_per_thread = 0 + def threaded_cde(timeout_semi_list,thread_index,thread_data_list,cde_errors_quant,cde_stats_time): aborted_string = 'Custom data extraction was aborted.' - #files_cde_errors_quant = defaultdict(int) - files_cde_quant = 0 files_cde_size = 0 - files_cde_size_extracted = 0 + cde_size_extracted = 0 + + cde_errors_quant_all = 0 + + perf_counter_loc = perf_counter + self_killed = self.killed - files_cde_errors_quant_all = 0 - #for (scan_like_list,subpath,rule_nr,size) in self.customdata_pool.values(): for (scan_like_list,subpath,rule_nr,size) in customdata_pool_per_thread[thread_index]: - self.killed[thread_index]=False + self_killed[thread_index]=False - time_start = perf_counter() + time_start = perf_counter_loc() if abort_list[0] : #wszystko returncode=200 output = aborted_string @@ -690,8 +681,8 @@ def threaded_cde(timeout_semi_list,thread_index,thread_data_list,files_cde_error full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep) command,command_info = get_command(executable,parameters,full_file_path,shell) - #print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_quant_sum,files_cde_size,files_cde_size_sum) ) - #print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_size) ) + if single_thread: + print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,cde_size_extracted,cde_errors_quant_all,files_cde_quant,files_cde_size) ) timeout_val=time()+timeout if timeout else None ##################################### @@ -720,7 +711,7 @@ def threaded_cde(timeout_semi_list,thread_index,thread_data_list,files_cde_error timeout_semi_list[0] = None break - if self.killed[thread_index]: + if self_killed[thread_index]: output_list_append('Killed.') output = '\n'.join(output_list).strip() @@ -730,25 +721,21 @@ def threaded_cde(timeout_semi_list,thread_index,thread_data_list,files_cde_error ##################################### - time_end = perf_counter() - customdata_stats_time[rule_nr]+=time_end-time_start + cde_stats_time[rule_nr]+=perf_counter_loc()-time_start - if returncode or self.killed[thread_index] or aborted: - files_cde_errors_quant[rule_nr]+=1 - files_cde_errors_quant_all+=1 + if returncode or self_killed[thread_index] or aborted: + cde_errors_quant[rule_nr]+=1 + cde_errors_quant_all+=1 if not aborted: files_cde_quant += 1 files_cde_size += size - files_cde_size_extracted += asizeof(output) + cde_size_extracted += asizeof(output) - thread_data_list[0]=files_cde_size_extracted - thread_data_list[1]=files_cde_errors_quant_all - thread_data_list[2]=files_cde_quant - thread_data_list[3]=files_cde_size + thread_data_list[0:4]=[cde_size_extracted,cde_errors_quant_all,files_cde_quant,files_cde_size] new_elem={ - CD_OK_ID_loc:bool(returncode==0 and not self.killed[thread_index] and not aborted), + CD_OK_ID_loc:bool(returncode==0 and not self_killed[thread_index] and not aborted), CD_DATA_ID_loc:(rule_nr,returncode,output) } @@ -756,88 +743,74 @@ def threaded_cde(timeout_semi_list,thread_index,thread_data_list,files_cde_error sys.exit() #thread - #timeout_semi_list = [None] - cde_threads = {} cde_thread_is_alive = {} any_thread_alive = True - for thread_index in range(threads_quant): - cde_threads[thread_index] = cde_thread = Thread(target = lambda : threaded_cde(timeout_semi_list_per_thread[thread_index],thread_index,all_threads_data_list[thread_index],all_threads_files_cde_errors_quant[thread_index]),daemon=True) + for thread_index in range(threads): + cde_threads[thread_index] = cde_thread = Thread(target = lambda : threaded_cde(timeout_semi_list_per_thread[thread_index],thread_index,all_threads_data_list[thread_index],all_threads_files_cde_errors_quant[thread_index],all_threads_customdata_stats_time[thread_index]),daemon=True) cde_thread.start() - #rules - files_cde_errors_quant = defaultdict(int) - while any_thread_alive: any_thread_alive = False now = time() - for thread_index in range(threads_quant): - #cde_thread = cde_threads[thread_index] - #cde_threads[thread_index] = cde_thread = Thread(target = lambda : threaded_cde(timeout_semi_list_per_thread[thread_index]),daemon=True) - #cde_thread.start() - #cde_thread_is_alive[thread_index] = cde_thread.is_alive - + for thread_index in range(threads): if cde_threads[thread_index].is_alive(): any_thread_alive = True if timeout_semi_list_per_thread[thread_index][0]: timeout_val,subprocess = timeout_semi_list_per_thread[thread_index][0] if any(abort_list) or (timeout_val and now>timeout_val): kill_subprocess(subprocess,print_func) - self.killed[thread_index]=True + self_killed[thread_index]=True abort_list[1]=False - #sleep(0.2) - #else: - #sleep(0.4) - - #print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_size) ) - files_cde_size_extracted=0 - files_cde_errors_quant_all=0 + cde_size_extracted=0 + cde_errors_quant_all=0 files_cde_quant=0 files_cde_size=0 - for thread_index in range(threads_quant): + for thread_index in range(threads): thread_data_list = all_threads_data_list[thread_index] - files_cde_size_extracted+=thread_data_list[0] - files_cde_errors_quant_all+=thread_data_list[1] + cde_size_extracted+=thread_data_list[0] + cde_errors_quant_all+=thread_data_list[1] files_cde_quant+=thread_data_list[2] files_cde_size+=thread_data_list[3] - for rule_nr,val in all_threads_files_cde_errors_quant[thread_index].items(): - files_cde_errors_quant[rule_nr] += val - - - print_func( ('cde',f'(multithread run)',0,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_size) ) - - sleep(0.4) - + if threads!=1: + print_func( ('cde','(multithread run)',0,cde_size_extracted,cde_errors_quant_all,files_cde_quant,files_cde_size) ) + sleep(0.1) - time_end_all = perf_counter() - - self_header.files_cde_errors_quant=files_cde_errors_quant - self_header.files_cde_errors_quant_all = files_cde_errors_quant_all + self_header.cde_errors_quant_all = cde_errors_quant_all self_header.files_cde_quant = files_cde_quant self_header.files_cde_size = files_cde_size - self_header.files_cde_size_extracted = files_cde_size_extracted - - customdata_stats_time_all[0]=time_end_all-time_start_all + self_header.cde_size_extracted = cde_size_extracted + self_header.cde_stats_time_all = perf_counter()-time_start_all - - - - print_func( ('info','custom data extraction finished.'),True) + print_func( ('info','Custom data extraction finished. Merging ...'),True) customdata_helper={} cd_index=0 self_customdata_append = self.customdata.append + files_cde_errors_quant = defaultdict(int) + customdata_stats_size=defaultdict(int) + customdata_stats_uniq=defaultdict(int) + customdata_stats_refs=defaultdict(int) + customdata_stats_time=defaultdict(float) + CD_INDEX_ID_loc = CD_INDEX_ID - for thread_index in range(threads_quant): + for thread_index in range(threads): + + for rule_nr,val in all_threads_files_cde_errors_quant[thread_index].items(): + files_cde_errors_quant[rule_nr] += val + + for rule_nr,val in all_threads_customdata_stats_time[thread_index].items(): + customdata_stats_time[rule_nr] += val + for (scan_like_list,subpath,rule_nr,size) in customdata_pool_per_thread[thread_index]: new_elem = scan_like_list[-1] cd_field = new_elem[CD_DATA_ID_loc] @@ -857,39 +830,20 @@ def threaded_cde(timeout_semi_list,thread_index,thread_data_list,files_cde_error customdata_stats_refs[rule_nr]+=1 - #if cd_field not in customdata_helper: - # customdata_helper[cd_field]=cd_index - # new_elem[CD_INDEX_ID_loc] = cd_index - # new_elem['cd_index']=cd_index - # cd_index+=1 - - # self_customdata_append(cd_field) - - # customdata_stats_size[rule_nr]+=asizeof(cd_field) - # customdata_stats_uniq[rule_nr]+=1 - # customdata_stats_refs[rule_nr]+=1 - #else: - # new_elem['cd_index']=customdata_helper[cd_field] - # new_elem[CD_INDEX_ID_loc]=customdata_helper[cd_field] - # customdata_stats_refs[rule_nr]+=1 - + print_func( ('info','Custom data post-processing finished.'),True) - print_func( ('info','custom data post-processing finished.'),True) - - for thread_index in range(threads_quant): + for thread_index in range(threads): cde_threads[thread_index].join() - #print(f'{customdata_helper=}') - #print(f'{self.customdata=}') - del self.customdata_pool - del customdata_helper - self.header.cde_stats_size=customdata_stats_size - self.header.cde_stats_uniq=customdata_stats_uniq - self.header.cde_stats_refs=customdata_stats_refs - self.header.cde_stats_time=customdata_stats_time - self.header.cde_stats_time_all=customdata_stats_time_all[0] + self_header.files_cde_errors_quant=files_cde_errors_quant + self_header.cde_stats_size=customdata_stats_size + self_header.cde_stats_uniq=customdata_stats_uniq + self_header.cde_stats_refs=customdata_stats_refs + self_header.cde_stats_time=customdata_stats_time + + ############################################################# def sld_recalc_rec(self,scan_like_data): @@ -1250,15 +1204,6 @@ def prepare_info(self): local_time = strftime('%Y/%m/%d %H:%M:%S',localtime_catched(self_header.creation_time)) info_list.append(f'record label : {self_header.label}') - #if file_name in self.aliases: - # info_list.append(f'record label : {self_header.label} alias:{file_name}') - #else: - - info_list.append('') - info_list.append(f'scanned path : {self_header.scan_path}') - info_list.append(f'scanned space : {bytes_to_str(self_header.sum_size)}') - info_list.append(f'scanned files : {fnumber(self_header.quant_files)}') - info_list.append(f'scanned folders : {fnumber(self_header.quant_folders)}') info_list.append('') info_list.append(f'creation host : {self_header.creation_host} ({self_header.creation_os})') @@ -1267,50 +1212,75 @@ def prepare_info(self): self.txtinfo_short = '\n'.join(info_list) self.txtinfo_basic = '\n'.join(info_list) - info_list.append('') - info_list.append(f'record file : {file_name} ({bytes_to_str(file_size)}, compression level:{self.header.compression_level})') - info_list.append('') - info_list.append( 'data collection times:') - info_list.append(f'filesystem : {str(round(self_header.scanning_time,2))}s') - if self_header.cde_stats_time_all: - info_list.append(f'custom data : {str(round(self_header.cde_stats_time_all,2))}s') + threads_str = None + try: + threads_str= str(self_header.threads) + except: + pass + info_list.append(f'record file : {file_name} ({bytes_to_str(file_size)}, compression level:{self.header.compression_level}, cde threads:{threads_str})') info_list.append('') - info_list.append( 'serializing and compression times:') + info_list.append(f'scanned path : {self_header.scan_path}') + info_list.append(f'scanned space : {bytes_to_str(self_header.sum_size)}') + info_list.append(f'scanned files : {fnumber(self_header.quant_files)}') + info_list.append(f'scanned folders : {fnumber(self_header.quant_folders)}') + + scanning_time_str = f'{str(round(self_header.scanning_time,2))}' + + cde_stats_time_all_str = '' + if self_header.cde_stats_time_all: + cde_stats_time_all_str = f'{str(round(self_header.cde_stats_time_all,2))}' filestructure_time = self.header.compression_time['filestructure'] filenames_time = self.header.compression_time['filenames'] customdata_time = self.header.compression_time['customdata'] - info_list.append(f'file structure : {str(round(filestructure_time,2))}s') - info_list.append(f'file names : {str(round(filenames_time,2))}s') - info_list.append(f'custom data : {str(round(customdata_time,2))}s') - info_list.append('') - info_list.append(f'custom data extraction errors : {fnumber(self_header.files_cde_errors_quant_all)}') + cde_errors = 0 + try: + #obsolete + cde_errors = self_header.files_cde_errors_quant_all + except: + pass + + try: + cde_errors = self_header.cde_errors_quant_all + except: + pass + info_list.append('') - info_list.append( 'internal sizes : compressed serialized original items references CDE time CDE errors') - info_list.append('') + info_list.append('----------------+------------------------------------------------------------------------------------------------') + info_list.append('Internals | compressed serialized original items references read time compr.time CDE errors') + info_list.append('----------------+------------------------------------------------------------------------------------------------') h_data = self_header.zipinfo["header"] fs_data = self_header.zipinfo["filestructure"] fn_data = self_header.zipinfo["filenames"] cd_data = self_header.zipinfo["customdata"] - info_list.append(f'header :{bytes_to_str_mod(h_data[0]).rjust(12) }{bytes_to_str_mod(h_data[1]).rjust(12) }{bytes_to_str_mod(h_data[2]).rjust(12) }') - info_list.append(f'filestructure :{bytes_to_str_mod(fs_data[0]).rjust(12) }{bytes_to_str_mod(fs_data[1]).rjust(12) }{bytes_to_str_mod(fs_data[2]).rjust(12) }') - info_list.append(f'file names :{bytes_to_str_mod(fn_data[0]).rjust(12) }{bytes_to_str_mod(fn_data[1]).rjust(12) }{bytes_to_str_mod(fn_data[2]).rjust(12) }{fnumber(self_header.items_names).rjust(12) }{fnumber(self_header.references_names).rjust(12)}') + info_list.append(f'Header |{bytes_to_str_mod(h_data[0]).rjust(12) }{bytes_to_str_mod(h_data[1]).rjust(12) }{bytes_to_str_mod(h_data[2]).rjust(12) }') + info_list.append(f'Filestructure |{bytes_to_str_mod(fs_data[0]).rjust(12) }{bytes_to_str_mod(fs_data[1]).rjust(12) }{bytes_to_str_mod(fs_data[2]).rjust(12) }{"".rjust(12)}{"".rjust(12)}{scanning_time_str.rjust(11)}s{str(round(filestructure_time,2)).rjust(11)}s') + info_list.append(f'File Names |{bytes_to_str_mod(fn_data[0]).rjust(12) }{bytes_to_str_mod(fn_data[1]).rjust(12) }{bytes_to_str_mod(fn_data[2]).rjust(12) }{fnumber(self_header.items_names).rjust(12) }{fnumber(self_header.references_names).rjust(12)}{"".rjust(12)}{str(round(filenames_time,2)).rjust(11)}s') if cd_data[0]: - info_list.append(f'custom data :{bytes_to_str_mod(cd_data[0]).rjust(12) }{bytes_to_str_mod(cd_data[1]).rjust(12) }{bytes_to_str_mod(cd_data[2]).rjust(12) }{fnumber(self_header.items_cd).rjust(12) }{fnumber(self_header.references_cd).rjust(12)}') + info_list.append(f'Custom Data |{bytes_to_str_mod(cd_data[0]).rjust(12) }{bytes_to_str_mod(cd_data[1]).rjust(12) }{bytes_to_str_mod(cd_data[2]).rjust(12) }{fnumber(self_header.items_cd).rjust(12) }{fnumber(self_header.references_cd).rjust(12)}{cde_stats_time_all_str.rjust(11)}s{str(round(customdata_time,2)).rjust(11)}s{fnumber(cde_errors).rjust(12)}') + + try: + if self_header.cde_list: + info_list.append('----------------+------------------------------------------------------------------------------------------------') + for nr,(expressions,use_smin,smin_int,use_smax,smax_int,executable,parameters,shell,timeout,crc) in enumerate(self_header.cde_list): + info_list.append(f'rule nr {str(nr).rjust(2)} | {bytes_to_str(self_header.cde_stats_size[nr]).rjust(12)}{fnumber(self_header.cde_stats_uniq[nr]).rjust(12)}{fnumber(self_header.cde_stats_refs[nr]).rjust(12)}{str(round(self_header.cde_stats_time[nr],2)).rjust(11)}s{"".rjust(12)}{fnumber(self_header.files_cde_errors_quant[nr]).rjust(12)}') + info_list.append('----------------+------------------------------------------------------------------------------------------------') + except Exception as EE: + info_list.append(str(EE)) info_list.append('') try: if self_header.cde_list: - info_list.append('\nCustom data with details about the rules:') + info_list.append('Custom Data Extractors and rules:') for nr,(expressions,use_smin,smin_int,use_smax,smax_int,executable,parameters,shell,timeout,crc) in enumerate(self_header.cde_list): - info_list.append(f'\nrule nr : {nr} {bytes_to_str(self_header.cde_stats_size[nr]).rjust(12)}{fnumber(self_header.cde_stats_uniq[nr]).rjust(12)}{fnumber(self_header.cde_stats_refs[nr]).rjust(12)}{str(round(self_header.cde_stats_time[nr],2)).rjust(12)}s{fnumber(self_header.files_cde_errors_quant[nr]).rjust(11)}') + info_list.append(f'\nrule nr : {nr}') expressions_expanded = ','.join(list(expressions)) info_list.append(f'files : {expressions_expanded}') @@ -1324,11 +1294,9 @@ def prepare_info(self): if timeout: info_list.append(f'timeout : {timeout}s') - except Exception as EE: info_list.append(str(EE)) - info_list.append('') loaded_fs_info = 'filesystem - ' + ('loaded' if self.decompressed_filestructure else 'not loaded yet') loaded_cd_info = 'custom data - ' + ('not present' if not bool(cd_data[0]) else 'loaded' if self.decompressed_customdata else 'not loaded yet') @@ -2184,7 +2152,7 @@ def repack_record(self,record,new_label,new_compression,keep_cd,update_callback, if compression_change: data_filenames = loads(dec_dec(src_zip_file.read('filenames'))) - self.info_line = f'compressing filenames' + self.info_line = f'Compressing Filenames ({bytes_to_str(asizeof(data_filenames))})' compress_with_header_update(new_header,data_filenames,new_compression,'filenames',zip_file) else: zip_file.writestr('filenames',src_zip_file.read('filenames')) @@ -2192,16 +2160,16 @@ def repack_record(self,record,new_label,new_compression,keep_cd,update_callback, if keep_cd!=bool(record.header.items_cd): data_filestructure = record.remove_cd_rec(loads(dec_dec(src_zip_file.read('filestructure')))) - self.info_line = f'compressing filestructure' + self.info_line = f'Compressing Filestructure ({bytes_to_str(asizeof(data_filestructure))})' compress_with_header_update(new_header,data_filestructure,new_compression,'filestructure',zip_file) new_header.zipinfo["customdata"]=(0,0,0) - new_header.files_cde_size_extracted = 0 + new_header.cde_size_extracted = 0 new_header.items_cd=0 new_header.references_cd = 0 new_header.cde_list = [] new_header.files_cde_errors_quant = {} - new_header.files_cde_errors_quant_all = 0 + new_header.cde_errors_quant_all = 0 new_header.cde_stats_time_all = 0 new_header.compression_time['customdata']=0 @@ -2213,13 +2181,13 @@ def repack_record(self,record,new_label,new_compression,keep_cd,update_callback, else: data_filestructure = loads(dec_dec(src_zip_file.read('filestructure'))) - self.info_line = f'compressing filestructure' + self.info_line = f'compressing Filestructure ({bytes_to_str(asizeof(data_filestructure))})' compress_with_header_update(new_header,data_filestructure,new_compression,'filestructure',zip_file) if header.items_cd: data_customdata = loads(dec_dec(src_zip_file.read('customdata'))) - self.info_line = f'compressing customdata' + self.info_line = f'Compressing Custom Data ({bytes_to_str(asizeof(data_customdata))})' compress_with_header_update(new_header,data_customdata,new_compression,'customdata',zip_file) header_ser = dumps(new_header) @@ -2296,8 +2264,8 @@ def find_results_clean(self): stdout_info_line_current = '' stdout_cde_size = 0 - stdout_files_cde_size_extracted=0 - stdout_files_cde_errors_quant_all=0 + stdout_cde_size_extracted=0 + stdout_cde_errors_quant_all=0 stdout_files_cde_quant=0 stdout_files_cde_quant_sum=0 stdout_files_cde_size=0 @@ -2327,8 +2295,8 @@ def create_new_record(self,temp_dir,update_callback,group=None): self.stage = 0 - self.stdout_files_cde_size_extracted=0 - self.stdout_files_cde_errors_quant_all=0 + self.stdout_cde_size_extracted=0 + self.stdout_cde_errors_quant_all=0 self.stdout_files_cde_quant=0 self.stdout_files_cde_quant_sum=0 self.stdout_files_cde_size=0 @@ -2336,7 +2304,7 @@ def create_new_record(self,temp_dir,update_callback,group=None): def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): command_str = ' '.join(command) - print(f'create_new_record - threaded_run {command_str=}') + try: subprocess = uni_popen(command,stdin=PIPE) except Exception as re: @@ -2378,8 +2346,8 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list): self.stdout_info_line_current = val[1] self.stdout_cde_size = val[2] - self.stdout_files_cde_size_extracted=val[3] - self.stdout_files_cde_errors_quant_all=val[4] + self.stdout_cde_size_extracted=val[3] + self.stdout_cde_errors_quant_all=val[4] self.stdout_files_cde_quant=val[5] self.stdout_files_cde_size=val[6] else: diff --git a/src/record.py b/src/record.py index 0915b0e..4616af8 100644 --- a/src/record.py +++ b/src/record.py @@ -126,17 +126,21 @@ def caretaker(signal_file): sys_stdout_flush = sys.stdout.flush lines_non_stop=0 + json_dumps_loc = json_dumps + stdout_data_queue_loc = stdout_data_queue + path_exists_loc = path_exists + def flush_last_data_not_printed(flush): nonlocal last_data_not_printed if last_data_not_printed: - print(json_dumps(last_data_not_printed),flush=flush) + print(json_dumps_loc(last_data_not_printed),flush=flush) last_data_not_printed=None while True: now=perf_counter() now_grater_than_next_time_print = bool(now>next_time_print) - if stdout_data_queue: + if stdout_data_queue_loc: data,always=stdout_data_queue_get() if data==True: @@ -146,7 +150,7 @@ def flush_last_data_not_printed(flush): flush_last_data_not_printed(False) if always or now_grater_than_next_time_print: - print(json_dumps(data),flush=True) + print(json_dumps_loc(data),flush=True) next_time_print=now+print_min_time_period lines_non_stop+=1 last_data_not_printed=None @@ -163,7 +167,7 @@ def flush_last_data_not_printed(flush): if now>next_signal_file_check: next_signal_file_check=now+signal_file_check_period - if path_exists(signal_file): + if path_exists_loc(signal_file): try: with open(signal_file,'r') as sf: got_int = int(sf.read().strip()) @@ -174,7 +178,7 @@ def flush_last_data_not_printed(flush): except Exception as pe: print_info(f'check_abort error:{pe}') - sleep(0.01) + sleep(0.001) sys.exit(0) #thread @@ -342,7 +346,7 @@ def proper_exit(code): new_record = LibrerRecord(label=label,scan_path=path_to_scan) try: - print_func(['stage',0],True) + print_func(('stage',0),True) new_record.scan(print_func,abort_list,tuple(cde_list),check_dev) except Exception as fe: print_info(f'scan error:{fe}') @@ -350,16 +354,16 @@ def proper_exit(code): if not abort_list[0]: if cde_list : try: - print_func(['stage',1],True) - new_record.extract_customdata(print_func,abort_list,threads_quant=threads) + print_func(('stage',1),True) + new_record.extract_customdata(print_func,abort_list,threads=threads) except Exception as cde: print_info(f'cde error:{cde}') - print_func(['stage',2],True) + print_func(('stage',2),True) new_record.pack_data(print_func) - print_func(['stage',3],True) + print_func(('stage',3),True) new_record.save(print_func,file_path=args.file,compression_level=compression_level) - print_func(['stage',4],True) + print_func(('stage',4),True) ##################################################################### else: