Skip to content

Commit

Permalink
Generalized in_bufsize handling to deal with sh 1.13's increased pick…
Browse files Browse the repository at this point in the history
…iness.

Added some preliminary debug option handling, still need to link that in properly.
  • Loading branch information
Hans Chalupsky committed May 21, 2020
1 parent 1ae5393 commit 3b20ba1
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions kgtk/cli/zconcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ def add_arguments(parser):

### general command utilities (some of these should make it into a more central location):

tmp_dir = '/tmp' # this should be configurable
tmp_dir = '/tmp' # this should be configurable
kgtk_encoding = 'utf8' # this should be configurable

def make_temp_file(prefix='kgtk.'):
return tempfile.mkstemp(dir=tmp_dir, prefix=prefix)[1]

def get_buf_sizes(output=None, _tty_out=True, _piped=False):
def get_buf_sizes(input=None, output=None, _tty_out=True, _piped=False):
"""Determine stream buffer sizes to use. Since sh has complex rules for this depending on
what streams are used and flags are set, we simply try this here and see if it lets us do it.
We want to make sure to use large output buffers whenever possible for speed.
This should probably go into cli_entry.py.
"""
in_bufsize = 2**16
out_bufsize = in_bufsize
try:
sh.ls.bake(_in=input, _in_bufsize=in_bufsize)
except:
in_bufsize = None
try:
sh.ls.bake(_out=output, _out_bufsize=out_bufsize, _tty_out=_tty_out, _piped=_piped)
except:
Expand Down Expand Up @@ -68,18 +73,20 @@ def get_stream_header(stream, n=1, unit='line', preserve=False):
with open(temp, 'wb') as out:
out.write(header.getvalue())
cleanup = lambda cmd, status, exit_code: sh.rm('-f', temp)
in_bufsize, out_bufsize = get_buf_sizes(_tty_out=False, _piped=True)
in_bufsize, out_bufsize = get_buf_sizes(input=stream, _tty_out=False, _piped=True)
return header.getvalue(), [sh.cat.bake(temp, '-', _in=stream, _in_bufsize=in_bufsize, _piped=True, _done=cleanup)]
else:
return header.getvalue()

def run_sh_commands(commands):
def run_sh_commands(commands, debug=False):
"""Run a single or list of prebaked sh `commands', compose them with pipes when they
are marked with piped=True or bg=True. Return the last run command which can be used
to access the final exit_code and other state.
"""
if not hasattr(commands, "__iter__"):
commands = [commands]
if debug:
sys.stderr.write('zconcat.run_sh_commands: %s\n' % commands)
piped_output = None
last_cmd = None
for cmd in commands:
Expand Down Expand Up @@ -146,7 +153,7 @@ def build_command_1(input=None, output=None, gz=False, bz2=False, xz=False, _pip
outfile = open(output, _out_mode)
output = outfile
compress = get_compress_command(compress_switch_to_file_type(gz, bz2, xz))
in_bufsize, out_bufsize = get_buf_sizes(output, not compress, _piped)
in_bufsize, out_bufsize = get_buf_sizes(output=output, _tty_out=not compress, _piped=_piped)

if input == '-':
# process input piped in from stdin, possibly compressed in different ways:
Expand Down Expand Up @@ -191,13 +198,13 @@ def build_command(inputs=[], output=None, gz=False, bz2=False, xz=False):
out_mode='ab'
return command

def run(inputs=[], output=None, gz=False, bz2=False, xz=False):
def run(inputs=[], output=None, gz=False, bz2=False, xz=False, _debug=False):
"""Run zconcat according to the provided command-line arguments.
"""
# TO DO: figure out how to properly access shared --debug option
try:
commands = build_command(inputs=inputs, output=output, gz=gz, bz2=bz2, xz=xz)
#print(commands)
return run_sh_commands(commands).exit_code
return run_sh_commands(commands, debug=_debug).exit_code
except sh.SignalException_SIGPIPE:
# cleanup in case we piped and terminated prematurely:
sys.stdout.flush()
Expand Down

0 comments on commit 3b20ba1

Please sign in to comment.