diff --git a/qubes/backup.py b/qubes/backup.py index aeda2ddeb..637f24c52 100644 --- a/qubes/backup.py +++ b/qubes/backup.py @@ -680,21 +680,23 @@ def backup_do(self): i += 1 chunkfile_p = open(chunkfile, 'wb') - common_args = { - 'backup_target': chunkfile_p, - 'hmac': hmac, - 'vmproc': vmproc, - 'addproc': tar_sparse, - 'progress_callback': self._add_vm_progress, - 'size_limit': self.chunk_size, - } - run_error = wait_backup_feedback( - in_stream=pipe, streamproc=encryptor, - **common_args) + run_error = handle_streams( + pipe, + {'hmac_data': hmac.stdin, + 'backup_target': chunkfile_p, + }, + {'hmac': hmac, + 'vmproc': vmproc, + 'addproc': tar_sparse, + 'streamproc': encryptor, + }, + self.chunk_size, + self._add_vm_progress + ) chunkfile_p.close() self.log.debug( - "Wait_backup_feedback returned: {}".format(run_error)) + "12 returned: {}".format(run_error)) if self.canceled: try: @@ -783,96 +785,52 @@ def backup_do(self): self.app.save() - - -def wait_backup_feedback(progress_callback, in_stream, streamproc, - backup_target, hmac=None, vmproc=None, - addproc=None, - size_limit=None): +def handle_streams(stream_in, streams_out, processes, size_limit=None, + progress_callback=None): ''' - Wait for backup chunk to finish - - Monitor all the processes (streamproc, hmac, vmproc, addproc) for errors - - Copy stdout of streamproc to backup_target and hmac stdin if available - - Compute progress based on total_backup_sz and send progress to - progress_callback function - - Returns if - - one of the monitored processes error out (streamproc, hmac, vmproc, - addproc), along with the processe that failed - - all of the monitored processes except vmproc finished successfully - (vmproc termination is controlled by the python script) - - streamproc does not delivers any data anymore (return with the error - "") - - size_limit is provided and is about to be exceeded + Copy stream_in to all streams_out and monitor all mentioned processes. + If any of them terminate with non-zero code, interrupt the process. Copy + at most `size_limit` data (if given). + + :param stream_in: file-like object to read data from + :param streams_out: dict of file-like objects to write data to + :param processes: dict of subprocess.Popen objects to monitor + :param size_limit: int maximum data amount to process + :param progress_callback: callable function to report progress, will be + given copied data size (it should accumulate internally) + :return: failed process name, failed stream name, "size_limit" or None ( + no error) ''' - buffer_size = 409600 - run_error = None - run_count = 1 bytes_copied = 0 - log = logging.getLogger('qubes.backup') - - while run_count > 0 and run_error is None: - if size_limit and bytes_copied + buffer_size > size_limit: - return "size_limit" + while True: + if size_limit: + to_copy = min(buffer_size, size_limit - bytes_copied) + if to_copy <= 0: + return "size_limit" + else: + to_copy = buffer_size + buf = stream_in.read(to_copy) + if not len(buf): + # done + return None - buf = in_stream.read(buffer_size) if callable(progress_callback): progress_callback(len(buf)) + for name, stream in streams_out.items(): + if stream is None: + continue + try: + stream.write(buf) + except IOError: + return name bytes_copied += len(buf) - run_count = 0 - if hmac: - retcode = hmac.poll() - if retcode is not None: - if retcode != 0: - run_error = "hmac" - else: - run_count += 1 - - if addproc: - retcode = addproc.poll() - if retcode is not None: - if retcode != 0: - run_error = "addproc" - else: - run_count += 1 - - if vmproc: - retcode = vmproc.poll() - if retcode is not None: - if retcode != 0: - run_error = "VM" - log.debug(vmproc.stdout.read()) - else: - # VM should run until the end - pass - - if streamproc: - retcode = streamproc.poll() - if retcode is not None: - if retcode != 0: - run_error = "streamproc" - break - elif retcode == 0 and len(buf) <= 0: - return "" - run_count += 1 - - else: - if len(buf) <= 0: - return "" - - try: - backup_target.write(buf) - except IOError as e: - if e.errno == errno.EPIPE: - run_error = "target" - else: - raise - - if hmac: - hmac.stdin.write(buf) - - return run_error + for name, proc in processes.items(): + if proc is None: + continue + if proc.poll(): + return name class ExtractWorker2(Process): @@ -1127,6 +1085,10 @@ def __run__(self): self.tar2_current_file = filename pipe = open(self.restore_pipe, 'wb') + monitor_processes = { + 'vmproc': self.vmproc, + 'addproc': self.tar2_process, + } common_args = { 'backup_target': pipe, 'hmac': None, @@ -1144,28 +1106,23 @@ def __run__(self): (["-z"] if self.compressed else []), stdin=open(filename, 'rb'), stdout=subprocess.PIPE) - - run_error = wait_backup_feedback( - progress_callback=self.progress_callback, - in_stream=self.decryptor_process.stdout, - streamproc=self.decryptor_process, - **common_args) + in_stream = self.decryptor_process.stdout + monitor_processes['decryptor'] = self.decryptor_process elif self.compressed: self.decompressor_process = subprocess.Popen( ["gzip", "-d"], stdin=open(filename, 'rb'), stdout=subprocess.PIPE) - - run_error = wait_backup_feedback( - progress_callback=self.progress_callback, - in_stream=self.decompressor_process.stdout, - streamproc=self.decompressor_process, - **common_args) + in_stream = self.decompressor_process.stdout + monitor_processes['decompresor'] = self.decompressor_process else: - run_error = wait_backup_feedback( - progress_callback=self.progress_callback, - in_stream=open(filename, "rb"), streamproc=None, - **common_args) + in_stream = open(filename, 'rb') + + run_error = handle_streams( + in_stream, + {'target': pipe}, + monitor_processes, + progress_callback=self.progress_callback) try: pipe.close() @@ -1177,7 +1134,7 @@ def __run__(self): # ignore the error else: raise - if len(run_error): + if run_error: if run_error == "target": self.collect_tar_output() details = "\n".join(self.tar2_stderr) @@ -1310,19 +1267,16 @@ def __run__(self): self.log.debug("Releasing next chunck") self.tar2_current_file = filename - common_args = { - 'backup_target': input_pipe, - 'hmac': None, - 'vmproc': self.vmproc, - 'addproc': self.tar2_process - } - - run_error = wait_backup_feedback( - progress_callback=self.progress_callback, - in_stream=open(filename, "rb"), streamproc=None, - **common_args) + run_error = handle_streams( + open(filename, 'rb'), + {'target': input_pipe}, + {'vmproc': self.vmproc, + 'addproc': self.tar2_process, + 'decryptor': self.decryptor_process, + }, + progress_callback=self.progress_callback) - if len(run_error): + if run_error: if run_error == "target": self.collect_tar_output() details = "\n".join(self.tar2_stderr)