Skip to content

Commit

Permalink
backup: make wait_backup_feedback/handle_streams less ugly
Browse files Browse the repository at this point in the history
Have a generic function `handle_streams`, instead of
`wait_backup_feedback` with open coded process names and manual
iteration over them.

No functional change, besides minor logging change.
  • Loading branch information
marmarek committed Oct 28, 2016
1 parent 6ee2002 commit d7c355e
Showing 1 changed file with 76 additions and 122 deletions.
198 changes: 76 additions & 122 deletions qubes/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,21 +680,23 @@ def backup_do(self):
i += 1
chunkfile_p = open(chunkfile, 'wb')

common_args = {
'backup_target': chunkfile_p,
'hmac': hmac,
'vmproc': vmproc,
'addproc': tar_sparse,
'progress_callback': self._add_vm_progress,
'size_limit': self.chunk_size,
}
run_error = wait_backup_feedback(
in_stream=pipe, streamproc=encryptor,
**common_args)
run_error = handle_streams(
pipe,
{'hmac_data': hmac.stdin,
'backup_target': chunkfile_p,
},
{'hmac': hmac,
'vmproc': vmproc,
'addproc': tar_sparse,
'streamproc': encryptor,
},
self.chunk_size,
self._add_vm_progress
)
chunkfile_p.close()

self.log.debug(
"Wait_backup_feedback returned: {}".format(run_error))
"12 returned: {}".format(run_error))

if self.canceled:
try:
Expand Down Expand Up @@ -783,96 +785,52 @@ def backup_do(self):
self.app.save()




def wait_backup_feedback(progress_callback, in_stream, streamproc,
backup_target, hmac=None, vmproc=None,
addproc=None,
size_limit=None):
def handle_streams(stream_in, streams_out, processes, size_limit=None,
progress_callback=None):
'''
Wait for backup chunk to finish
- Monitor all the processes (streamproc, hmac, vmproc, addproc) for errors
- Copy stdout of streamproc to backup_target and hmac stdin if available
- Compute progress based on total_backup_sz and send progress to
progress_callback function
- Returns if
- one of the monitored processes error out (streamproc, hmac, vmproc,
addproc), along with the processe that failed
- all of the monitored processes except vmproc finished successfully
(vmproc termination is controlled by the python script)
- streamproc does not delivers any data anymore (return with the error
"")
- size_limit is provided and is about to be exceeded
Copy stream_in to all streams_out and monitor all mentioned processes.
If any of them terminate with non-zero code, interrupt the process. Copy
at most `size_limit` data (if given).
:param stream_in: file-like object to read data from
:param streams_out: dict of file-like objects to write data to
:param processes: dict of subprocess.Popen objects to monitor
:param size_limit: int maximum data amount to process
:param progress_callback: callable function to report progress, will be
given copied data size (it should accumulate internally)
:return: failed process name, failed stream name, "size_limit" or None (
no error)
'''

buffer_size = 409600
run_error = None
run_count = 1
bytes_copied = 0
log = logging.getLogger('qubes.backup')

while run_count > 0 and run_error is None:
if size_limit and bytes_copied + buffer_size > size_limit:
return "size_limit"
while True:
if size_limit:
to_copy = min(buffer_size, size_limit - bytes_copied)
if to_copy <= 0:
return "size_limit"
else:
to_copy = buffer_size
buf = stream_in.read(to_copy)
if not len(buf):
# done
return None

buf = in_stream.read(buffer_size)
if callable(progress_callback):
progress_callback(len(buf))
for name, stream in streams_out.items():
if stream is None:
continue
try:
stream.write(buf)
except IOError:
return name
bytes_copied += len(buf)

run_count = 0
if hmac:
retcode = hmac.poll()
if retcode is not None:
if retcode != 0:
run_error = "hmac"
else:
run_count += 1

if addproc:
retcode = addproc.poll()
if retcode is not None:
if retcode != 0:
run_error = "addproc"
else:
run_count += 1

if vmproc:
retcode = vmproc.poll()
if retcode is not None:
if retcode != 0:
run_error = "VM"
log.debug(vmproc.stdout.read())
else:
# VM should run until the end
pass

if streamproc:
retcode = streamproc.poll()
if retcode is not None:
if retcode != 0:
run_error = "streamproc"
break
elif retcode == 0 and len(buf) <= 0:
return ""
run_count += 1

else:
if len(buf) <= 0:
return ""

try:
backup_target.write(buf)
except IOError as e:
if e.errno == errno.EPIPE:
run_error = "target"
else:
raise

if hmac:
hmac.stdin.write(buf)

return run_error
for name, proc in processes.items():
if proc is None:
continue
if proc.poll():
return name


class ExtractWorker2(Process):
Expand Down Expand Up @@ -1127,6 +1085,10 @@ def __run__(self):
self.tar2_current_file = filename

pipe = open(self.restore_pipe, 'wb')
monitor_processes = {
'vmproc': self.vmproc,
'addproc': self.tar2_process,
}
common_args = {
'backup_target': pipe,
'hmac': None,
Expand All @@ -1144,28 +1106,23 @@ def __run__(self):
(["-z"] if self.compressed else []),
stdin=open(filename, 'rb'),
stdout=subprocess.PIPE)

run_error = wait_backup_feedback(
progress_callback=self.progress_callback,
in_stream=self.decryptor_process.stdout,
streamproc=self.decryptor_process,
**common_args)
in_stream = self.decryptor_process.stdout
monitor_processes['decryptor'] = self.decryptor_process
elif self.compressed:
self.decompressor_process = subprocess.Popen(
["gzip", "-d"],
stdin=open(filename, 'rb'),
stdout=subprocess.PIPE)

run_error = wait_backup_feedback(
progress_callback=self.progress_callback,
in_stream=self.decompressor_process.stdout,
streamproc=self.decompressor_process,
**common_args)
in_stream = self.decompressor_process.stdout
monitor_processes['decompresor'] = self.decompressor_process
else:
run_error = wait_backup_feedback(
progress_callback=self.progress_callback,
in_stream=open(filename, "rb"), streamproc=None,
**common_args)
in_stream = open(filename, 'rb')

run_error = handle_streams(
in_stream,
{'target': pipe},
monitor_processes,
progress_callback=self.progress_callback)

try:
pipe.close()
Expand All @@ -1177,7 +1134,7 @@ def __run__(self):
# ignore the error
else:
raise
if len(run_error):
if run_error:
if run_error == "target":
self.collect_tar_output()
details = "\n".join(self.tar2_stderr)
Expand Down Expand Up @@ -1310,19 +1267,16 @@ def __run__(self):
self.log.debug("Releasing next chunck")
self.tar2_current_file = filename

common_args = {
'backup_target': input_pipe,
'hmac': None,
'vmproc': self.vmproc,
'addproc': self.tar2_process
}

run_error = wait_backup_feedback(
progress_callback=self.progress_callback,
in_stream=open(filename, "rb"), streamproc=None,
**common_args)
run_error = handle_streams(
open(filename, 'rb'),
{'target': input_pipe},
{'vmproc': self.vmproc,
'addproc': self.tar2_process,
'decryptor': self.decryptor_process,
},
progress_callback=self.progress_callback)

if len(run_error):
if run_error:
if run_error == "target":
self.collect_tar_output()
details = "\n".join(self.tar2_stderr)
Expand Down

0 comments on commit d7c355e

Please sign in to comment.