diff --git a/src/core.py b/src/core.py index fc7a60c..f84a9c7 100644 --- a/src/core.py +++ b/src/core.py @@ -26,9 +26,14 @@ # #################################################################################### -from time import sleep +from copy import copy, deepcopy + +from time import sleep, perf_counter #from threading import Thread -from multiprocessing import Process, Manager +from multiprocessing import Process, Manager, Queue +from queue import Empty + +import multiprocessing from zstandard import ZstdCompressor,ZstdDecompressor @@ -138,9 +143,9 @@ def test_regexp(expr): data_format_version='1.0010' class LibrerRecordHeader : - def __init__(self,label='',path=''): + def __init__(self,label='',scan_path=''): self.label=label - self.scan_path = path + self.scan_path = scan_path self.creation_time = int(time()) self.rid = self.creation_time #record id @@ -163,8 +168,8 @@ def __init__(self,label='',path=''): ####################################################################### class LibrerRecord: - def __init__(self,label,path,log): - self.header = LibrerRecordHeader(label,path) + def __init__(self,label,scan_path,log): + self.header = LibrerRecordHeader(label,scan_path) self.filestructure = () self.customdata = [] @@ -176,7 +181,7 @@ def __init__(self,label,path,log): self.info_line_current = '' self.abort_action = False - self.files_search_progress = 0 + #self.files_search_progress = 0 #self.crc_progress_info=0 @@ -661,22 +666,20 @@ def clone_record(self,file_path,keep_cd=True,keep_crc=True,compression_level=16) ######################################################################################## def find_items(self, - record_nr,managed_progress,managed_results,managed_results_len,managed_abort, + results_queue,abort_queue, + record_nr, size_min,size_max, filename_search_kind,name_func_to_call,cd_search_kind,cd_func_to_call): self.find_results = [] - managed_results[record_nr] = [] self.decompress_filestructure() - results = set() - results_add = results.add - filenames_loc = self.filenames filestructure = self.filestructure - self.files_search_progress = 0 + files_search_progress = 0 + files_search_progress_update_quant = 0 if cd_search_kind!='dont': self.decompress_customdata() @@ -699,19 +702,30 @@ def find_items(self, self_customdata = self.customdata + def check_abort(): + try: + abort_queue.get(False) + return True + except Empty: + return False + while search_list: - if managed_abort[record_nr]: + if check_abort(): break filestructure,parent_path_components = search_list_pop() for data_entry in filestructure: - if managed_abort[record_nr]: + if check_abort(): break - self.files_search_progress +=1 + files_search_progress_update_quant+=1 + if files_search_progress_update_quant>4096: + results_queue.put((files_search_progress,None)) + + files_search_progress_update_quant=0 - managed_progress[record_nr]=self.files_search_progress + files_search_progress +=1 name_nr,code,size,mtime = data_entry[0:4] @@ -739,8 +753,7 @@ def find_items(self, #katalog moze spelniac kryteria naazwy pliku ale nie ma rozmiaru i custom data if name_func_to_call: if name_func_to_call(name): - results_add( tuple([tuple(next_level),size,mtime]) ) - managed_results_len[record_nr]+=1 + results_queue.put((files_search_progress,tuple([tuple(next_level),size,mtime]))) if sub_data: search_list_append( (sub_data,next_level) ) @@ -798,12 +811,9 @@ def find_items(self, else: continue - results_add( tuple([tuple(next_level),size,mtime ]) ) - managed_results_len[record_nr]+=1 + results_queue.put((files_search_progress,tuple([tuple(next_level),size,mtime ]))) - - managed_results[record_nr] = list(results) - #self.find_results + results_queue.put((files_search_progress,None)) def find_items_sort(self,what,reverse): if what=='data': @@ -920,7 +930,8 @@ def load_wrap(self,db_dir,file_name): def load(self,file_path): self.file_path = file_path file_name = basename(normpath(file_path)) - self.log.info('loading %s' % file_name) + #self.log.info('loading %s' % file_name) + #TODO - problem w podprocesie try: with ZipFile(file_path, "r") as zip_file: @@ -985,13 +996,24 @@ def decompress_customdata(self): else: return False -def global_find_items(record,record_nr,managed_progress,managed_results,managed_results_len,managed_abort,size_min,size_max,find_filename_search_kind,name_func_to_call,find_cd_search_kind,cd_func_to_call): - record.find_items(record_nr,managed_progress,managed_results,managed_results_len,managed_abort, - size_min,size_max, - find_filename_search_kind,name_func_to_call, - find_cd_search_kind,cd_func_to_call) +####################################################################### +def find_items_for_subprocess(results_queue,abort_queue,record_file_path,record_nr,size_min,size_max,find_filename_search_kind,name_func_to_call,find_cd_search_kind,cd_func_to_call): + t1 = perf_counter() + + new_record = LibrerRecord('dummylabel','dummyscanpath','dummylog') + + if new_record.load(record_file_path): + print('find_items_for_subprocess error:',record_file_path) + else: + new_record.find_items(results_queue,abort_queue,record_nr, + size_min,size_max, + find_filename_search_kind,name_func_to_call, + find_cd_search_kind,cd_func_to_call) + t2 = perf_counter() + print('timing record_nr:',record_nr,t2-t1) ####################################################################### + class LibrerCore: records = set() db_dir='' @@ -1030,8 +1052,8 @@ def read_records_pre(self): size_sum=0 self.record_files_list=[] for entry in res: - ename = entry.name - if ename.endswith('.dat'): + filename = entry.name + if filename.endswith('.dat'): try: stat_res = stat(entry) size = int(stat_res.st_size) @@ -1040,7 +1062,7 @@ def read_records_pre(self): continue size_sum+=size - self.record_files_list.append( (ename,size) ) + self.record_files_list.append( (filename,size) ) quant_sum=len(self.record_files_list) return (quant_sum,size_sum) except Exception as e: @@ -1057,26 +1079,26 @@ def read_records(self): info_curr_quant = 0 info_curr_size = 0 - for ename,size in sorted(self.record_files_list): + for filename,size in sorted(self.record_files_list): if self.abort_action: break - self.log.info('db:%s',ename) + self.log.info('db:%s',filename) new_record = self.create() - self.info_line = f'loading {ename}' + self.info_line = f'loading {filename}' info_curr_quant+=1 info_curr_size+=size - if new_record.load_wrap(self.db_dir,ename) : - self.log.warning('removing:%s',ename) + if new_record.load_wrap(self.db_dir,filename) : + self.log.warning('removing:%s',filename) self.records.remove(new_record) else: self.records_to_show.append( (new_record,info_curr_quant,info_curr_size) ) self.update_sorted() - def find_items_in_all_records_check(self, + def find_items_in_records_check(self, range_par, size_min,size_max, find_filename_search_kind,name_expr,name_case_sens, @@ -1123,90 +1145,8 @@ def find_results_clean(self): for record in self.records: record.find_results_clean() - def find_items_in_all_records_old(self, - range_par, - size_min,size_max, - find_filename_search_kind,name_expr,name_case_sens, - find_cd_search_kind,cd_expr,cd_case_sens, - filename_fuzzy_threshold,cd_fuzzy_threshold): - - self.find_results_clean() - - if name_expr: - filename_fuzzy_threshold_float=float(filename_fuzzy_threshold) if find_filename_search_kind == 'fuzzy' else 0 - - if find_filename_search_kind == 'regexp': - name_func_to_call = lambda x : search(name_expr,x) - elif find_filename_search_kind == 'glob': - if name_case_sens: - #name_func_to_call = lambda x : fnmatch(x,name_expr) - name_func_to_call = lambda x : re_compile(translate(name_expr)).match(x) - else: - name_func_to_call = lambda x : re_compile(translate(name_expr), IGNORECASE).match(x) - elif find_filename_search_kind == 'fuzzy': - name_func_to_call = lambda x : bool(SequenceMatcher(None, name_expr, x).ratio()>filename_fuzzy_threshold_float) - else: - name_func_to_call = None - else: - name_func_to_call = None - - if cd_expr: - cd_fuzzy_threshold_float = float(cd_fuzzy_threshold) if find_cd_search_kind == 'fuzzy' else 0 - - if find_cd_search_kind == 'regexp': - cd_func_to_call = lambda x : search(cd_expr,x) - elif find_cd_search_kind == 'glob': - if cd_case_sens: - #cd_func_to_call = lambda x : fnmatch(x,cd_expr) - cd_func_to_call = lambda x : re_compile(translate(cd_expr)).match(x) - else: - cd_func_to_call = lambda x : re_compile(translate(cd_expr), IGNORECASE).match(x) - elif find_cd_search_kind == 'fuzzy': - cd_func_to_call = lambda x : bool(SequenceMatcher(None, name_expr, x).ratio()>cd_fuzzy_threshold_float) - else: - cd_func_to_call = None - else: - cd_func_to_call = None - - self.find_res_quant = 0 - sel_range = [range_par] if range_par else self.records - - self.files_search_progress = 0 - - self.search_record_nr=0 - self.search_record_ref=None - - records_len = len(self.records) - - ############################################################ - self.abort_action = False - ############################################################ - for record in sel_range: - self.search_record_ref = record - - self.search_record_nr+=1 - self.records_perc_info = self.search_record_nr * 100.0 / records_len - - self.info_line = f'searching in record: {record.header.label}' - - if self.abort_action: - break - - try: - record.abort_action = False - record.find_items( - size_min,size_max, - find_filename_search_kind,name_func_to_call, - find_cd_search_kind,cd_func_to_call) - except Exception as e: - print(e) - - self.files_search_progress += record.header.quant_files - self.find_res_quant += len(record.find_results) - ############################################################ - ######################################################################################################################## - def find_items_in_all_records(self, + def find_items_in_records(self, range_par, size_min,size_max, find_filename_search_kind,name_expr,name_case_sens, @@ -1273,36 +1213,56 @@ def find_items_in_all_records(self, ############################################################ max_processes = cpu_count() - #max_processes = 8 manager = Manager() - managed_results = manager.dict() - managed_results_len = manager.dict() - managed_progress = manager.dict() - managed_abort = manager.dict() - for record_nr,record in enumerate(records_to_process): - managed_results[record_nr]=[] - managed_results_len[record_nr]=0 - managed_progress[record_nr]=0 - managed_abort[record_nr]=False - self.info_line = 'Initializing subprocesses ...' jobs = {} records_to_process_len = len(records_to_process) + results_queues=[] + abort_queues=[] + total_progress=[] for record_nr,record in enumerate(records_to_process): - subprocess = Process(target=global_find_items, args=(record,record_nr,managed_progress,managed_results,managed_results_len,managed_abort,size_min,size_max, + total_progress.append(0) + + results_queue = Queue() + results_queues.append(results_queue) + + abort_queue = Queue() + abort_queues.append(abort_queue) + + subprocess = Process(target=find_items_for_subprocess, args=(results_queue,abort_queue,record.file_path,record_nr,size_min,size_max, find_filename_search_kind,name_func_to_call, find_cd_search_kind,cd_func_to_call)) jobs[record_nr] = [False,subprocess] + record.find_results=[] self.info_line = 'subprocesses run.' ##################################################### + + def suck_queue(q,l): + got=0 + last_i=0 + try: + q_get = q.get + l_append = l.append + while val := q_get(False): #processed,found_data + (i,f)=val + last_i=i + got+=1 + if f:#None tylko update i + l_append(f) + except Empty: + pass + + return last_i + + ############################################################ while True: if self.abort_action: - break + _ = [abort_queues[record_nr].put(True) for record_nr in range(records_to_process_len)] need_to_run = [ record_nr for record_nr in range(records_to_process_len) if jobs[record_nr][0]==False ] need_to_run_len = len(need_to_run) @@ -1314,9 +1274,6 @@ def find_items_in_all_records(self, self.info_line = f'Running threads: {running}' - #done = records_to_process_len-running-need_to_run_len - #print(f'{need_to_run_len=} {running=}') - if need_to_run: if runningtotal_progress[record_nr]: + total_progress[record_nr] = i - self.files_search_progress = progress_sum - self.find_res_quant = found_sum + self.files_search_progress = sum(total_progress) + self.find_res_quant = sum([len(record.find_results) for record in records_to_process]) + if running==0 and need_to_run_len==0 and all([results_queues[record_nr].empty() for record_nr in range(records_to_process_len)]): + break - sleep(0.01) + sleep(0.1) ##################################################### - for record_nr,record in enumerate(records_to_process): - record.find_results=managed_results[record_nr] - for subprocess_combo in jobs.values(): + for record_nr,subprocess_combo in jobs.items(): if subprocess_combo[0]: subprocess_combo[1].join() return True - - - - - - - - #print('endu') - - #subprocess.start() - - self.info_line = 'Waiting for results ...' - - - - #for key,val in managed_results.items(): - # print(key,val) - - - - - - - - - - - - - - - - - - - - self.info_line = 'Initializing threads ...' - - threads_pool = {record:[False,Thread(daemon=True, target = lambda : record.find_items( - size_min,size_max, - find_filename_search_kind,name_func_to_call, - find_cd_search_kind,cd_func_to_call) ) ] for record in sel_range } - - global_find_items(record,size_min,size_max, - find_filename_search_kind,name_func_to_call, - find_cd_search_kind,cd_func_to_call,managed_results) - - - #queue = Queue() - #p = Process(target=global_record_find_items, args=(queue, 1)) - ##p.start() - #p.join() # this blocks until the process terminates - #result = queue.get() - - - - self.files_search_progress = 0 - self.find_res_quant = 0 - - while True: - need_to_run = [record for record in sel_range if threads_pool[record][0]==False ] - need_to_run_len = len(need_to_run) - - running = len([record for record in sel_range if threads_pool[record][0]==True and threads_pool[record][1].is_alive() ]) - - self.info_line = f'Running threads: {running}' - - #done = sel_range_len-running-need_to_run_len - #print(f'{need_to_run_len=} {running=}') - - if need_to_run: - if running