Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress test logs before sending them to the web server #117

Draft
wants to merge 4 commits into
base: cl
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions alts/shared/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from enum import IntEnum

__all__ = [
'API_VERSION',
'ARCHITECTURES',
Expand Down Expand Up @@ -60,3 +62,10 @@
'hostbased',
'publickey',
]


class TapStatusEnum(IntEnum):
FAILED = 0
DONE = 1
TODO = 2
SKIPPED = 3
18 changes: 18 additions & 0 deletions alts/shared/utils/plumbum_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import time

from plumbum import ProcessTimedOut
from plumbum.commands.modifiers import Future


def wait_bg_process(future: Future, timeout: int):
# For some reason, plumbum.commands.modifiers.Future.wait method
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, interesting. Which version of plumbum? I'm on 1.8.2 and everything looks fine here, see:

>>> runner = ( local['tail'].with_env(FOO='bar').run_bg(args=['-f', '/dev/null'], timeout=2, retcode=None,) )
>>> runner.wait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/commands/modifiers.py", line 43, in wait
    self._returncode, self._stdout, self._stderr = run_proc(
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/commands/processes.py", line 299, in run_proc
    return _check_process(proc, retcode, timeout, stdout, stderr)
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/commands/processes.py", line 17, in _check_process
    proc.verify(retcode, timeout, stdout, stderr)
  File "/home/jhernandez/sandbox/python/paramiko/env/lib64/python3.10/site-packages/plumbum/machines/base.py", line 15, in verify
    raise ProcessTimedOut(
plumbum.commands.processes.ProcessTimedOut: ('Process did not terminate within 2 seconds', ['/usr/bin/tail', '-f', '/dev/null'])

# doesn't take timeout into account, so here is a workaround for it
start_time = time.time()
while not future.poll():
if time.time() - start_time > timeout:
future.proc.terminate()
raise ProcessTimedOut(
f"Process did not terminate within {timeout} seconds",
future.proc.argv,
)
time.sleep(0.1)
19 changes: 17 additions & 2 deletions alts/worker/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import os
import tempfile
from datetime import datetime
from functools import wraps
from traceback import format_exc
Expand All @@ -9,6 +11,7 @@

from alts.shared.models import AsyncSSHParams, CommandResult
from alts.shared.utils.asyncssh import AsyncSSHClient, LongRunSSHClient
from alts.shared.utils.plumbum_utils import wait_bg_process


def measure_stage(stage: str):
Expand Down Expand Up @@ -196,6 +199,9 @@ def run_docker_command(
if env_vars:
for var in env_vars:
additional_env_vars.extend(('-e', var))
bg_stdout = tempfile.NamedTemporaryFile(delete=False, mode='w+', prefix=self.container_name, suffix='.stdout.log')
bg_stderr = tempfile.NamedTemporaryFile(delete=False, mode='w+', prefix=self.container_name, suffix='.stderr.log')
stdout = stderr = ''
try:
runner = (
local['docker']
Expand All @@ -209,11 +215,12 @@ def run_docker_command(
self.binary_name,
*cmd_args,
],
timeout=self.timeout,
retcode=None,
stdout=bg_stdout,
stderr=bg_stderr,
)
)
runner.wait()
wait_bg_process(runner, self.timeout or 30)
stdout = runner.stdout
stderr = runner.stderr
exit_code = runner.returncode
Expand All @@ -224,6 +231,14 @@ def run_docker_command(
except Exception:
self.logger.exception('Cannot run docker command:')
exit_code, stdout, stderr = 1, '', format_exc()
finally:
bg_stdout.seek(0)
bg_stderr.seek(0)
stdout += f'\n{bg_stdout.read()}'
stderr += f'\n{bg_stderr.read()}'
for file in (bg_stdout, bg_stderr):
file.close()
os.unlink(file.name)
return CommandResult(
exit_code=exit_code,
stdout=stdout,
Expand Down
36 changes: 24 additions & 12 deletions alts/worker/runners/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
git_reset_hard,
prepare_gerrit_command,
)
from alts.shared.utils.plumbum_utils import wait_bg_process
from alts.worker import CONFIG, RESOURCES_DIR
from alts.worker.executors.ansible import AnsibleExecutor
from alts.worker.executors.bats import BatsExecutor
Expand Down Expand Up @@ -572,21 +573,23 @@ def run_ansible_command(
self, args: Union[tuple, list], retcode_none: bool = False,
timeout: int = CONFIG.provision_timeout
):
run_kwargs = {
'args': args,
'timeout': timeout
}
run_kwargs = {'args': args}
if retcode_none:
run_kwargs['retcode'] = None
cmd = local[self.ansible_playbook_binary].with_cwd(self._work_dir)
formulated_cmd = cmd.formulate(args=run_kwargs.get('args', ()))
exception_happened = False
cmd_pid = None
bg_stdout = tempfile.NamedTemporaryFile(delete=False, mode='w+', prefix=self._task_id, suffix='.stdout.log')
bg_stderr = tempfile.NamedTemporaryFile(delete=False, mode='w+', prefix=self._task_id, suffix='.stderr.log')
stdout = stderr = ''
try:
future = cmd.run_bg(**run_kwargs)
future = cmd.run_bg(**run_kwargs, stdout=bg_stdout, stderr=bg_stderr)
cmd_pid = future.proc.pid
future.wait()
exit_code, stdout, stderr = future.returncode, future.stdout, future.stderr
wait_bg_process(future, timeout)
exit_code, stdout, stderr = (
future.returncode, future.stdout, future.stderr
)
except ProcessExecutionError as e:
stdout = e.stdout
stderr = e.stderr
Expand All @@ -598,13 +601,21 @@ def run_ansible_command(
exit_code = COMMAND_TIMEOUT_EXIT_CODE
exception_happened = True
except Exception as e:
self._logger.error(
'Unknown error happened during %s execution: %s',
self._logger.exception(
'Unknown error happened during execution: %s',
formulated_cmd
)
stdout = ''
stderr = str(e)
exit_code = 255
finally:
bg_stdout.seek(0)
bg_stderr.seek(0)
stdout += f'\n{bg_stdout.read()}'
stderr += f'\n{bg_stderr.read()}'
for file in (bg_stdout, bg_stderr):
file.close()
os.unlink(file.name)

if exception_happened and cmd_pid:
try:
Expand Down Expand Up @@ -922,7 +933,7 @@ def install_package(
module_stream=module_stream,
module_version=module_version,
semi_verbose=semi_verbose,
verbose=verbose,
verbose=self._verbose or verbose,
allow_fail=allow_fail,
)

Expand Down Expand Up @@ -1185,7 +1196,8 @@ def ensure_package_is_installed(
package_name,
package_version=package_version,
package_epoch=package_epoch,
semi_verbose=True
semi_verbose=True,
verbose=self._verbose,
)

def get_init_script(self, tests_dir: Path) -> Optional[Path]:
Expand Down Expand Up @@ -1537,7 +1549,7 @@ def setup(self, skip_provision: bool = False):
self.initialize_terraform()
self.start_env()
if not skip_provision:
self.initial_provision()
self.initial_provision(verbose=self._verbose)

def teardown(self, publish_artifacts: bool = True):
try:
Expand Down
2 changes: 1 addition & 1 deletion alts/worker/runners/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
vm_alive: bool = False,
artifacts_uploader: Optional[BaseLogsUploader] = None,
package_channel: Optional[str] = None,
verbose: bool = False,
verbose: bool = True,
):
"""
Docker environment class initialization.
Expand Down
2 changes: 1 addition & 1 deletion alts/worker/runners/opennebula.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
test_configuration: Optional[dict] = None,
test_flavor: Optional[Dict[str, str]] = None,
vm_alive: bool = False,
verbose: bool = False,
verbose: bool = True,
):
super().__init__(
task_id,
Expand Down
108 changes: 100 additions & 8 deletions alts/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import random
import time
import urllib.parse
import base64
import gzip
from celery.contrib.abortable import AbortableTask
from collections import defaultdict
from socket import timeout
Expand All @@ -26,7 +28,7 @@
from urllib3 import Retry
from urllib3.exceptions import TimeoutError

from alts.shared.constants import API_VERSION, DEFAULT_REQUEST_TIMEOUT
from alts.shared.constants import API_VERSION, DEFAULT_REQUEST_TIMEOUT, TapStatusEnum
from alts.shared.exceptions import (
InstallPackageError,
PackageIntegrityTestsError,
Expand Down Expand Up @@ -89,6 +91,88 @@ def are_tap_tests_success(tests_output: str):
return errors == 0


def parse_tap_output(text):
"""
Parses TAP test output and returns list of TAP-formatted entities.
Returns list of dicts with detailed status report for each test in file.

Parameters
----------
text: str or unicode or bytes
Test output from VM

Returns
-------
list

"""
def to_unicode(s):
if isinstance(s, bytes):
return s.decode('utf8')
if isinstance(s, str):
return s
return str(s)

def get_diagnostic(tap_item):
diagnostics = []
index = raw_data.index(tap_item) + 1
while (
index < len(raw_data)
and raw_data[index].category == 'diagnostic'
):
diagnostics.append(raw_data[index].text)
index += 1
return "\n".join(diagnostics)

try:
prepared_text = to_unicode(text).replace('\r\n', '\n')
except TypeError:
prepared_text = to_unicode(text.replace(b'\r\n', b'\n'))
tap_parser = tap.parser.Parser()
try:
raw_data = list(tap_parser.parse_text(prepared_text))
except Exception:
return

tap_output = []
if not all((item.category == 'unknown' for item in raw_data)):
for test_result in raw_data:
if test_result.category != 'test':
continue
test_name = test_result.description
if not test_name:
test_name = test_result.directive.text
status = TapStatusEnum.FAILED
if test_result.todo:
status = TapStatusEnum.TODO
elif test_result.skip:
status = TapStatusEnum.SKIPPED
elif test_result.ok:
status = TapStatusEnum.DONE
tap_output.append({
'test_name': test_name,
'status': status,
'diagnostic': get_diagnostic(test_result),
})
return tap_output


def parse_and_compress_stage_results(stage_data: dict, log_name: str = ''):
code = stage_data.get('exit_code')
out = stage_data.get('stdout', '')
err = stage_data.get('stderr', '')
log = f'Exit code: {code}\nStdout:\n{out}\nStderr:\n{err}'
result = {
'exit_code': code,
'compressed_log': base64.b64encode(
gzip.compress(log.encode()),
).decode('utf-8'),
}
if out and 'bats' in log_name:
result['tap_results'] = parse_tap_output(out)
return result


class RetryableTask(AbortableTask):
autoretry_for = AUTO_RETRY_EXCEPTIONS
max_retries = 5
Expand Down Expand Up @@ -164,7 +248,8 @@ def set_artifacts_when_stage_has_unexpected_exception(
'package_channel': task_params.get('package_channel', 'beta'),
'test_configuration': task_params.get('test_configuration', {}),
'test_flavor': task_params.get('test_flavor', {}),
'vm_alive': task_params.get('vm_alive')
'vm_alive': task_params.get('vm_alive'),
'verbose': task_params.get('verbose', False),
}

runner_class = RUNNER_MAPPING[task_params['runner_type']]
Expand Down Expand Up @@ -270,18 +355,25 @@ def set_artifacts_when_stage_has_unexpected_exception(
if stage not in TESTS_SECTIONS_NAMES:
stage_info = {'success': is_success(stage_data)}
if CONFIG.logs_uploader_config.skip_artifacts_upload:
stage_info.update(stage_data)
stage_info.update(
parse_and_compress_stage_results(
stage_data,
log_name=stage,
),
)
summary[stage] = stage_info
continue
if stage not in summary:
summary[stage] = {}
for inner_stage, inner_data in stage_data.items():
stage_info = {
'success': is_success(inner_data),
'output': inner_data['stdout'],
}
stage_info = {'success': is_success(inner_data)}
if CONFIG.logs_uploader_config.skip_artifacts_upload:
stage_info.update(inner_data)
stage_info.update(
parse_and_compress_stage_results(
inner_data,
log_name=inner_stage,
),
)
summary[stage][inner_stage] = stage_info
if runner.uploaded_logs:
summary['logs'] = runner.uploaded_logs
Expand Down
Loading