aosp12/external/autotest/client/bin/job.py

1335 lines
48 KiB
Python
Raw Permalink Normal View History

2023-01-09 17:11:35 +08:00
# Lint as: python2, python3
"""The main job wrapper
This is the core infrastructure.
Copyright Andy Whitcroft, Martin J. Bligh 2006
"""
# pylint: disable=missing-docstring
import copy
from datetime import datetime
import getpass
import glob
import logging
import os
import re
import shutil
import sys
import time
import traceback
import types
import weakref
import six
import common
from autotest_lib.client.bin import client_logging_config
from autotest_lib.client.bin import harness
from autotest_lib.client.bin import local_host
from autotest_lib.client.bin import parallel
from autotest_lib.client.bin import partition as partition_lib
from autotest_lib.client.bin import profilers
from autotest_lib.client.bin import sysinfo
from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import barrier
from autotest_lib.client.common_lib import base_job
from autotest_lib.client.common_lib import control_data
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import logging_manager
from autotest_lib.client.common_lib import packages
from autotest_lib.client.cros import cros_logging
from autotest_lib.client.tools import html_report
GLOBAL_CONFIG = global_config.global_config
LAST_BOOT_TAG = object()
JOB_PREAMBLE = """
from autotest_lib.client.common_lib.error import *
from autotest_lib.client.bin.utils import *
"""
class StepError(error.AutotestError):
pass
class NotAvailableError(error.AutotestError):
pass
def _run_test_complete_on_exit(f):
"""Decorator for job methods that automatically calls
self.harness.run_test_complete when the method exits, if appropriate."""
def wrapped(self, *args, **dargs):
try:
return f(self, *args, **dargs)
finally:
if self._logger.global_filename == 'status':
self.harness.run_test_complete()
if self.drop_caches:
utils.drop_caches()
wrapped.__name__ = f.__name__
wrapped.__doc__ = f.__doc__
wrapped.__dict__.update(f.__dict__)
return wrapped
class status_indenter(base_job.status_indenter):
"""Provide a status indenter that is backed by job._record_prefix."""
def __init__(self, job_):
self._job = weakref.proxy(job_) # avoid a circular reference
@property
def indent(self):
return self._job._record_indent
def increment(self):
self._job._record_indent += 1
def decrement(self):
self._job._record_indent -= 1
class base_client_job(base_job.base_job):
"""The client-side concrete implementation of base_job.
Optional properties provided by this implementation:
control
harness
"""
_WARNING_DISABLE_DELAY = 5
# _record_indent is a persistent property, but only on the client
_job_state = base_job.base_job._job_state
_record_indent = _job_state.property_factory(
'_state', '_record_indent', 0, namespace='client')
_max_disk_usage_rate = _job_state.property_factory(
'_state', '_max_disk_usage_rate', 0.0, namespace='client')
def __init__(self, control, options, drop_caches=True):
"""
Prepare a client side job object.
@param control: The control file (pathname of).
@param options: an object which includes:
jobtag: The job tag string (eg "default").
cont: If this is the continuation of this job.
harness_type: An alternative server harness. [None]
use_external_logging: If true, the enable_external_logging
method will be called during construction. [False]
@param drop_caches: If true, utils.drop_caches() is called before and
between all tests. [True]
"""
super(base_client_job, self).__init__(options=options)
self._pre_record_init(control, options)
try:
self._post_record_init(control, options, drop_caches)
except Exception as err:
self.record(
'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
str(err))
raise
@classmethod
def _get_environ_autodir(cls):
return os.environ['AUTODIR']
@classmethod
def _find_base_directories(cls):
"""
Determine locations of autodir and clientdir (which are the same)
using os.environ. Serverdir does not exist in this context.
"""
autodir = clientdir = cls._get_environ_autodir()
return autodir, clientdir, None
@classmethod
def _parse_args(cls, args):
return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
def _find_resultdir(self, options):
"""
Determine the directory for storing results. On a client this is
always <autodir>/results/<tag>, where tag is passed in on the command
line as an option.
"""
output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
'output_dir',
default="")
if options.output_dir:
basedir = options.output_dir
elif output_dir_config:
basedir = output_dir_config
else:
basedir = self.autodir
return os.path.join(basedir, 'results', options.tag)
def _get_status_logger(self):
"""Return a reference to the status logger."""
return self._logger
def _pre_record_init(self, control, options):
"""
Initialization function that should peform ONLY the required
setup so that the self.record() method works.
As of now self.record() needs self.resultdir, self._group_level,
self.harness and of course self._logger.
"""
if not options.cont:
self._cleanup_debugdir_files()
self._cleanup_results_dir()
logging_manager.configure_logging(
client_logging_config.ClientLoggingConfig(),
results_dir=self.resultdir,
verbose=options.verbose)
logging.info('Writing results to %s', self.resultdir)
# init_group_level needs the state
self.control = os.path.realpath(control)
self._is_continuation = options.cont
self._current_step_ancestry = []
self._next_step_index = 0
self._load_state()
_harness = self.handle_persistent_option(options, 'harness')
_harness_args = self.handle_persistent_option(options, 'harness_args')
self.harness = harness.select(_harness, self, _harness_args)
if self.control:
parsed_control = control_data.parse_control(
self.control, raise_warnings=False)
self.fast = parsed_control.fast
# set up the status logger
def client_job_record_hook(entry):
msg_tag = ''
if '.' in self._logger.global_filename:
msg_tag = self._logger.global_filename.split('.', 1)[1]
# send the entry to the job harness
message = '\n'.join([entry.message] + entry.extra_message_lines)
rendered_entry = self._logger.render_entry(entry)
self.harness.test_status_detail(entry.status_code, entry.subdir,
entry.operation, message, msg_tag,
entry.fields)
self.harness.test_status(rendered_entry, msg_tag)
# send the entry to stdout, if it's enabled
logging.info(rendered_entry)
self._logger = base_job.status_logger(
self, status_indenter(self), record_hook=client_job_record_hook)
def _post_record_init(self, control, options, drop_caches):
"""
Perform job initialization not required by self.record().
"""
self._init_drop_caches(drop_caches)
self._init_packages()
self.sysinfo = sysinfo.sysinfo(self.resultdir)
self._load_sysinfo_state()
if not options.cont:
download = os.path.join(self.testdir, 'download')
if not os.path.exists(download):
os.mkdir(download)
shutil.copyfile(self.control,
os.path.join(self.resultdir, 'control'))
self.control = control
self.logging = logging_manager.get_logging_manager(
manage_stdout_and_stderr=True, redirect_fds=True)
self.logging.start_logging()
self.profilers = profilers.profilers(self)
self.machines = [options.hostname]
self.machine_dict_list = [{'hostname' : options.hostname}]
# Client side tests should always run the same whether or not they are
# running in the lab.
self.in_lab = False
self.hosts = set([local_host.LocalHost(hostname=options.hostname)])
self.args = []
if options.args:
self.args = self._parse_args(options.args)
if options.user:
self.user = options.user
else:
self.user = getpass.getuser()
self.sysinfo.log_per_reboot_data()
if not options.cont:
self.record('START', None, None)
self.harness.run_start()
if options.log:
self.enable_external_logging()
self.num_tests_run = None
self.num_tests_failed = None
self.warning_loggers = None
self.warning_manager = None
def _init_drop_caches(self, drop_caches):
"""
Perform the drop caches initialization.
"""
self.drop_caches_between_iterations = (
GLOBAL_CONFIG.get_config_value('CLIENT',
'drop_caches_between_iterations',
type=bool, default=True))
self.drop_caches = drop_caches
if self.drop_caches:
utils.drop_caches()
def _init_packages(self):
"""
Perform the packages support initialization.
"""
self.pkgmgr = packages.PackageManager(
self.autodir, run_function_dargs={'timeout':3600})
def _cleanup_results_dir(self):
"""Delete everything in resultsdir"""
assert os.path.exists(self.resultdir)
list_files = glob.glob('%s/*' % self.resultdir)
for f in list_files:
if os.path.isdir(f):
shutil.rmtree(f)
elif os.path.isfile(f):
os.remove(f)
def _cleanup_debugdir_files(self):
"""
Delete any leftover debugdir files
"""
list_files = glob.glob("/tmp/autotest_results_dir.*")
for f in list_files:
os.remove(f)
def disable_warnings(self, warning_type):
self.record("INFO", None, None,
"disabling %s warnings" % warning_type,
{"warnings.disable": warning_type})
time.sleep(self._WARNING_DISABLE_DELAY)
def enable_warnings(self, warning_type):
time.sleep(self._WARNING_DISABLE_DELAY)
self.record("INFO", None, None,
"enabling %s warnings" % warning_type,
{"warnings.enable": warning_type})
def monitor_disk_usage(self, max_rate):
"""\
Signal that the job should monitor disk space usage on /
and generate a warning if a test uses up disk space at a
rate exceeding 'max_rate'.
Parameters:
max_rate - the maximium allowed rate of disk consumption
during a test, in MB/hour, or 0 to indicate
no limit.
"""
self._max_disk_usage_rate = max_rate
def control_get(self):
return self.control
def control_set(self, control):
self.control = os.path.abspath(control)
def harness_select(self, which, harness_args):
self.harness = harness.select(which, self, harness_args)
def setup_dirs(self, results_dir, tmp_dir):
if not tmp_dir:
tmp_dir = os.path.join(self.tmpdir, 'build')
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
if not os.path.isdir(tmp_dir):
e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
raise ValueError(e_msg)
# We label the first build "build" and then subsequent ones
# as "build.2", "build.3", etc. Whilst this is a little bit
# inconsistent, 99.9% of jobs will only have one build
# (that's not done as kernbench, sparse, or buildtest),
# so it works out much cleaner. One of life's compromises.
if not results_dir:
results_dir = os.path.join(self.resultdir, 'build')
i = 2
while os.path.exists(results_dir):
results_dir = os.path.join(self.resultdir, 'build.%d' % i)
i += 1
if not os.path.exists(results_dir):
os.mkdir(results_dir)
return (results_dir, tmp_dir)
def barrier(self, *args, **kwds):
"""Create a barrier object"""
return barrier.barrier(*args, **kwds)
def install_pkg(self, name, pkg_type, install_dir):
'''
This method is a simple wrapper around the actual package
installation method in the Packager class. This is used
internally by the profilers, deps and tests code.
name : name of the package (ex: sleeptest, dbench etc.)
pkg_type : Type of the package (ex: test, dep etc.)
install_dir : The directory in which the source is actually
untarred into. (ex: client/profilers/<name> for profilers)
'''
if self.pkgmgr.repositories:
self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
def add_repository(self, repo_urls):
'''
Adds the repository locations to the job so that packages
can be fetched from them when needed. The repository list
needs to be a string list
Ex: job.add_repository(['http://blah1','http://blah2'])
'''
for repo_url in repo_urls:
self.pkgmgr.add_repository(repo_url)
# Fetch the packages' checksum file that contains the checksums
# of all the packages if it is not already fetched. The checksum
# is always fetched whenever a job is first started. This
# is not done in the job's constructor as we don't have the list of
# the repositories there (and obviously don't care about this file
# if we are not using the repos)
try:
checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
packages.CHECKSUM_FILE)
self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,
checksum_file_path, use_checksum=False)
except error.PackageFetchError:
# packaging system might not be working in this case
# Silently fall back to the normal case
pass
def require_gcc(self):
"""
Test whether gcc is installed on the machine.
"""
# check if gcc is installed on the system.
try:
utils.system('which gcc')
except error.CmdError:
raise NotAvailableError('gcc is required by this job and is '
'not available on the system')
def setup_dep(self, deps):
"""Set up the dependencies for this test.
deps is a list of libraries required for this test.
"""
# Fetch the deps from the repositories and set them up.
for dep in deps:
dep_dir = os.path.join(self.autodir, 'deps', dep)
# Search for the dependency in the repositories if specified,
# else check locally.
try:
self.install_pkg(dep, 'dep', dep_dir)
except error.PackageInstallError:
# see if the dep is there locally
pass
# dep_dir might not exist if it is not fetched from the repos
if not os.path.exists(dep_dir):
raise error.TestError("Dependency %s does not exist" % dep)
os.chdir(dep_dir)
# Run the dependency, as it could create more files needed for the
# tests.
# In future this might want to be changed, as this always returns
# None, unless the dep.py errors. In which case, it'll error rather
# than returning.
if eval(compile(open('%s.py' % dep, "rb").read(),
'%s.py' % dep, 'exec'), {}) is None:
logging.info('Dependency %s successfuly built', dep)
def _runtest(self, url, tag, timeout, args, dargs):
try:
l = lambda : test.runtest(self, url, tag, args, dargs)
pid = parallel.fork_start(self.resultdir, l)
self._forkwait(pid, timeout)
except error.TestBaseException:
# These are already classified with an error type (exit_status)
raise
except error.JobError:
raise # Caught further up and turned into an ABORT.
except Exception as e:
# Converts all other exceptions thrown by the test regardless
# of phase into a TestError(TestBaseException) subclass that
# reports them with their full stack trace.
raise error.UnhandledTestError(e)
def _forkwait(self, pid, timeout=None):
"""Wait for the given pid to complete
@param pid (int) process id to wait for
@param timeout (int) seconds to wait before timing out the process"""
if timeout:
logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
else:
logging.debug('Waiting for pid %d', pid)
parallel.fork_waitfor(self.resultdir, pid)
logging.info('pid %d completed', pid)
def _run_test_base(self, url, *args, **dargs):
"""
Prepares arguments and run functions to run_test and run_test_detail.
@param url A url that identifies the test to run.
@param tag An optional keyword argument that will be added to the
test and subdir name.
@param subdir_tag An optional keyword argument that will be added
to the subdir name.
@returns:
subdir: Test subdirectory
testname: Test name
group_func: Actual test run function
timeout: Test timeout
"""
_group, testname = self.pkgmgr.get_package_name(url, 'test')
testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
self._make_test_outputdir(subdir)
timeout = dargs.pop('timeout', None)
if timeout:
logging.debug('Test has timeout: %d sec.', timeout)
def log_warning(reason):
self.record("WARN", subdir, testname, reason)
@disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
def group_func():
try:
self._runtest(url, tag, timeout, args, dargs)
except error.TestBaseException as detail:
# The error is already classified, record it properly.
self.record(detail.exit_status, subdir, testname, str(detail))
raise
else:
self.record('GOOD', subdir, testname, 'completed successfully')
return (subdir, testname, group_func, timeout)
@_run_test_complete_on_exit
def run_test(self, url, *args, **dargs):
"""
Summon a test object and run it.
@param url A url that identifies the test to run.
@param tag An optional keyword argument that will be added to the
test and subdir name.
@param subdir_tag An optional keyword argument that will be added
to the subdir name.
@returns True if the test passes, False otherwise.
"""
(subdir, testname, group_func, timeout) = self._run_test_base(url,
*args,
**dargs)
try:
self._rungroup(subdir, testname, group_func, timeout)
return True
except error.TestBaseException:
return False
# Any other exception here will be given to the caller
#
# NOTE: The only exception possible from the control file here
# is error.JobError as _runtest() turns all others into an
# UnhandledTestError that is caught above.
def stage_control_file(self, url):
"""
Install the test package and return the control file path.
@param url The name of the test, e.g. login_LoginSuccess. This is the
string passed to run_test in the client test control file:
job.run_test('login_LoginSuccess')
This name can also be something like 'camera_HAL3.jea',
which corresponds to a test package containing multiple
control files, each with calls to:
job.run_test('camera_HAL3', **opts)
@returns Absolute path to the control file for the test.
"""
testname, _, _tag = url.partition('.')
bindir = os.path.join(self.testdir, testname)
self.install_pkg(testname, 'test', bindir)
return _locate_test_control_file(bindir, url)
@_run_test_complete_on_exit
def run_test_detail(self, url, *args, **dargs):
"""
Summon a test object and run it, returning test status.
@param url A url that identifies the test to run.
@param tag An optional keyword argument that will be added to the
test and subdir name.
@param subdir_tag An optional keyword argument that will be added
to the subdir name.
@returns Test status
@see: client/common_lib/error.py, exit_status
"""
(subdir, testname, group_func, timeout) = self._run_test_base(url,
*args,
**dargs)
try:
self._rungroup(subdir, testname, group_func, timeout)
return 'GOOD'
except error.TestBaseException as detail:
return detail.exit_status
def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
"""\
subdir:
name of the group
testname:
name of the test to run, or support step
function:
subroutine to run
*args:
arguments for the function
Returns the result of the passed in function
"""
try:
optional_fields = None
if timeout:
optional_fields = {}
optional_fields['timeout'] = timeout
self.record('START', subdir, testname,
optional_fields=optional_fields)
self._state.set('client', 'unexpected_reboot', (subdir, testname))
try:
result = function(*args, **dargs)
self.record('END GOOD', subdir, testname)
return result
except error.TestBaseException as e:
self.record('END %s' % e.exit_status, subdir, testname)
raise
except error.JobError as e:
self.record('END ABORT', subdir, testname)
raise
except Exception as e:
# This should only ever happen due to a bug in the given
# function's code. The common case of being called by
# run_test() will never reach this. If a control file called
# run_group() itself, bugs in its function will be caught
# here.
err_msg = str(e) + '\n' + traceback.format_exc()
self.record('END ERROR', subdir, testname, err_msg)
raise
finally:
self._state.discard('client', 'unexpected_reboot')
def run_group(self, function, tag=None, **dargs):
"""
Run a function nested within a group level.
function:
Callable to run.
tag:
An optional tag name for the group. If None (default)
function.__name__ will be used.
**dargs:
Named arguments for the function.
"""
if tag:
name = tag
else:
name = function.__name__
try:
return self._rungroup(subdir=None, testname=name,
function=function, timeout=None, **dargs)
except (SystemExit, error.TestBaseException):
raise
# If there was a different exception, turn it into a TestError.
# It will be caught by step_engine or _run_step_fn.
except Exception as e:
raise error.UnhandledTestError(e)
def cpu_count(self):
return utils.count_cpus() # use total system count
def start_reboot(self):
self.record('START', None, 'reboot')
self.record('GOOD', None, 'reboot.start')
def _record_reboot_failure(self, subdir, operation, status,
running_id=None):
self.record("ABORT", subdir, operation, status)
if not running_id:
running_id = utils.running_os_ident()
kernel = {"kernel": running_id.split("::")[0]}
self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
def _check_post_reboot(self, subdir, running_id=None):
"""
Function to perform post boot checks such as if the system configuration
has changed across reboots (specifically, CPUs and partitions).
@param subdir: The subdir to use in the job.record call.
@param running_id: An optional running_id to include in the reboot
failure log message
@raise JobError: Raised if the current configuration does not match the
pre-reboot configuration.
"""
# check to see if any partitions have changed
partition_list = partition_lib.get_partition_list(self,
exclude_swap=False)
mount_info = partition_lib.get_mount_info(partition_list)
old_mount_info = self._state.get('client', 'mount_info')
if mount_info != old_mount_info:
new_entries = mount_info - old_mount_info
old_entries = old_mount_info - mount_info
description = ("mounted partitions are different after reboot "
"(old entries: %s, new entries: %s)" %
(old_entries, new_entries))
self._record_reboot_failure(subdir, "reboot.verify_config",
description, running_id=running_id)
raise error.JobError("Reboot failed: %s" % description)
# check to see if any CPUs have changed
cpu_count = utils.count_cpus()
old_count = self._state.get('client', 'cpu_count')
if cpu_count != old_count:
description = ('Number of CPUs changed after reboot '
'(old count: %d, new count: %d)' %
(old_count, cpu_count))
self._record_reboot_failure(subdir, 'reboot.verify_config',
description, running_id=running_id)
raise error.JobError('Reboot failed: %s' % description)
def partition(self, device, loop_size=0, mountpoint=None):
"""
Work with a machine partition
@param device: e.g. /dev/sda2, /dev/sdb1 etc...
@param mountpoint: Specify a directory to mount to. If not specified
autotest tmp directory will be used.
@param loop_size: Size of loopback device (in MB). Defaults to 0.
@return: A L{client.bin.partition.partition} object
"""
if not mountpoint:
mountpoint = self.tmpdir
return partition_lib.partition(self, device, loop_size, mountpoint)
@utils.deprecated
def filesystem(self, device, mountpoint=None, loop_size=0):
""" Same as partition
@deprecated: Use partition method instead
"""
return self.partition(device, loop_size, mountpoint)
def enable_external_logging(self):
pass
def disable_external_logging(self):
pass
def reboot_setup(self):
# save the partition list and mount points, as well as the cpu count
partition_list = partition_lib.get_partition_list(self,
exclude_swap=False)
mount_info = partition_lib.get_mount_info(partition_list)
self._state.set('client', 'mount_info', mount_info)
self._state.set('client', 'cpu_count', utils.count_cpus())
def reboot(self):
self.reboot_setup()
self.harness.run_reboot()
# HACK: using this as a module sometimes hangs shutdown, so if it's
# installed unload it first
utils.system("modprobe -r netconsole", ignore_status=True)
# sync first, so that a sync during shutdown doesn't time out
utils.system("sync; sync", ignore_status=True)
utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
self.quit()
def noop(self, text):
logging.info("job: noop: " + text)
@_run_test_complete_on_exit
def parallel(self, *tasklist, **kwargs):
"""Run tasks in parallel"""
pids = []
old_log_filename = self._logger.global_filename
for i, task in enumerate(tasklist):
assert isinstance(task, (tuple, list))
self._logger.global_filename = old_log_filename + (".%d" % i)
def task_func():
# stub out _record_indent with a process-local one
base_record_indent = self._record_indent
proc_local = self._job_state.property_factory(
'_state', '_record_indent.%d' % os.getpid(),
base_record_indent, namespace='client')
self.__class__._record_indent = proc_local
task[0](*task[1:])
forked_pid = parallel.fork_start(self.resultdir, task_func)
logging.info('Just forked pid %d', forked_pid)
pids.append(forked_pid)
old_log_path = os.path.join(self.resultdir, old_log_filename)
old_log = open(old_log_path, "a")
exceptions = []
for i, pid in enumerate(pids):
# wait for the task to finish
try:
self._forkwait(pid, kwargs.get('timeout'))
except Exception as e:
logging.info('pid %d completed with error', pid)
exceptions.append(e)
# copy the logs from the subtask into the main log
new_log_path = old_log_path + (".%d" % i)
if os.path.exists(new_log_path):
new_log = open(new_log_path)
old_log.write(new_log.read())
new_log.close()
old_log.flush()
os.remove(new_log_path)
old_log.close()
self._logger.global_filename = old_log_filename
# handle any exceptions raised by the parallel tasks
if exceptions:
msg = "%d task(s) failed in job.parallel" % len(exceptions)
raise error.JobError(msg)
def quit(self):
# XXX: should have a better name.
self.harness.run_pause()
raise error.JobContinue("more to come")
def complete(self, status):
"""Write pending reports, clean up, and exit"""
# We are about to exit 'complete' so clean up the control file.
dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
shutil.move(self._state_file, dest)
self.harness.run_complete()
self.disable_external_logging()
sys.exit(status)
def _load_state(self):
# grab any initial state and set up $CONTROL.state as the backing file
init_state_file = self.control + '.init.state'
self._state_file = self.control + '.state'
if os.path.exists(init_state_file):
shutil.move(init_state_file, self._state_file)
self._state.set_backing_file(self._state_file)
# initialize the state engine, if necessary
has_steps = self._state.has('client', 'steps')
if not self._is_continuation and has_steps:
raise RuntimeError('Loaded state can only contain client.steps if '
'this is a continuation')
if not has_steps:
logging.debug('Initializing the state engine')
self._state.set('client', 'steps', [])
def handle_persistent_option(self, options, option_name):
"""
Select option from command line or persistent state.
Store selected option to allow standalone client to continue
after reboot with previously selected options.
Priority:
1. explicitly specified via command line
2. stored in state file (if continuing job '-c')
3. default == None
"""
option = None
cmd_line_option = getattr(options, option_name)
if cmd_line_option:
option = cmd_line_option
self._state.set('client', option_name, option)
else:
stored_option = self._state.get('client', option_name, None)
if stored_option:
option = stored_option
logging.debug('Persistent option %s now set to %s', option_name, option)
return option
def __create_step_tuple(self, fn, args, dargs):
# Legacy code passes in an array where the first arg is
# the function or its name.
if isinstance(fn, list):
assert(len(args) == 0)
assert(len(dargs) == 0)
args = fn[1:]
fn = fn[0]
# Pickling actual functions is hairy, thus we have to call
# them by name. Unfortunately, this means only functions
# defined globally can be used as a next step.
if callable(fn):
fn = fn.__name__
if not isinstance(fn, six.string_types):
raise StepError("Next steps must be functions or "
"strings containing the function name")
ancestry = copy.copy(self._current_step_ancestry)
return (ancestry, fn, args, dargs)
def next_step_append(self, fn, *args, **dargs):
"""Define the next step and place it at the end"""
steps = self._state.get('client', 'steps')
steps.append(self.__create_step_tuple(fn, args, dargs))
self._state.set('client', 'steps', steps)
def next_step(self, fn, *args, **dargs):
"""Create a new step and place it after any steps added
while running the current step but before any steps added in
previous steps"""
steps = self._state.get('client', 'steps')
steps.insert(self._next_step_index,
self.__create_step_tuple(fn, args, dargs))
self._next_step_index += 1
self._state.set('client', 'steps', steps)
def next_step_prepend(self, fn, *args, **dargs):
"""Insert a new step, executing first"""
steps = self._state.get('client', 'steps')
steps.insert(0, self.__create_step_tuple(fn, args, dargs))
self._next_step_index += 1
self._state.set('client', 'steps', steps)
def _run_step_fn(self, local_vars, fn, args, dargs):
"""Run a (step) function within the given context"""
local_vars['__args'] = args
local_vars['__dargs'] = dargs
try:
exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
return local_vars['__ret']
except SystemExit:
raise # Send error.JobContinue and JobComplete on up to runjob.
except error.TestNAError as detail:
self.record(detail.exit_status, None, fn, str(detail))
except Exception as detail:
raise error.UnhandledJobError(detail)
def _create_frame(self, global_vars, ancestry, fn_name):
"""Set up the environment like it would have been when this
function was first defined.
Child step engine 'implementations' must have 'return locals()'
at end end of their steps. Because of this, we can call the
parent function and get back all child functions (i.e. those
defined within it).
Unfortunately, the call stack of the function calling
job.next_step might have been deeper than the function it
added. In order to make sure that the environment is what it
should be, we need to then pop off the frames we built until
we find the frame where the function was first defined."""
# The copies ensure that the parent frames are not modified
# while building child frames. This matters if we then
# pop some frames in the next part of this function.
current_frame = copy.copy(global_vars)
frames = [current_frame]
for steps_fn_name in ancestry:
ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
current_frame = copy.copy(ret)
frames.append(current_frame)
# Walk up the stack frames until we find the place fn_name was defined.
while len(frames) > 2:
if fn_name not in frames[-2]:
break
if frames[-2][fn_name] != frames[-1][fn_name]:
break
frames.pop()
ancestry.pop()
return (frames[-1], ancestry)
def _add_step_init(self, local_vars, current_function):
"""If the function returned a dictionary that includes a
function named 'step_init', prepend it to our list of steps.
This will only get run the first time a function with a nested
use of the step engine is run."""
if (isinstance(local_vars, dict) and
'step_init' in local_vars and
callable(local_vars['step_init'])):
# The init step is a child of the function
# we were just running.
self._current_step_ancestry.append(current_function)
self.next_step_prepend('step_init')
def step_engine(self):
"""The multi-run engine used when the control file defines step_init.
Does the next step.
"""
# Set up the environment and then interpret the control file.
# Some control files will have code outside of functions,
# which means we need to have our state engine initialized
# before reading in the file.
global_control_vars = {'job': self,
'args': self.args}
exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
try:
exec(compile(open(self.control, "rb").read(), self.control, 'exec'),
global_control_vars, global_control_vars)
except error.TestNAError as detail:
self.record(detail.exit_status, None, self.control, str(detail))
except SystemExit:
raise # Send error.JobContinue and JobComplete on up to runjob.
except Exception as detail:
# Syntax errors or other general Python exceptions coming out of
# the top level of the control file itself go through here.
raise error.UnhandledJobError(detail)
# If we loaded in a mid-job state file, then we presumably
# know what steps we have yet to run.
if not self._is_continuation:
if 'step_init' in global_control_vars:
self.next_step(global_control_vars['step_init'])
else:
# if last job failed due to unexpected reboot, record it as fail
# so harness gets called
last_job = self._state.get('client', 'unexpected_reboot', None)
if last_job:
subdir, testname = last_job
self.record('FAIL', subdir, testname, 'unexpected reboot')
self.record('END FAIL', subdir, testname)
# Iterate through the steps. If we reboot, we'll simply
# continue iterating on the next step.
while len(self._state.get('client', 'steps')) > 0:
steps = self._state.get('client', 'steps')
(ancestry, fn_name, args, dargs) = steps.pop(0)
self._state.set('client', 'steps', steps)
self._next_step_index = 0
ret = self._create_frame(global_control_vars, ancestry, fn_name)
local_vars, self._current_step_ancestry = ret
local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
self._add_step_init(local_vars, fn_name)
def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
on_every_test)
def add_sysinfo_logfile(self, file, on_every_test=False):
self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
def _add_sysinfo_loggable(self, loggable, on_every_test):
if on_every_test:
self.sysinfo.test_loggables.add(loggable)
else:
self.sysinfo.boot_loggables.add(loggable)
self._save_sysinfo_state()
def _load_sysinfo_state(self):
state = self._state.get('client', 'sysinfo', None)
if state:
self.sysinfo.deserialize(state)
def _save_sysinfo_state(self):
state = self.sysinfo.serialize()
self._state.set('client', 'sysinfo', state)
class disk_usage_monitor:
def __init__(self, logging_func, device, max_mb_per_hour):
self.func = logging_func
self.device = device
self.max_mb_per_hour = max_mb_per_hour
def start(self):
self.initial_space = utils.freespace(self.device)
self.start_time = time.time()
def stop(self):
# if no maximum usage rate was set, we don't need to
# generate any warnings
if not self.max_mb_per_hour:
return
final_space = utils.freespace(self.device)
used_space = self.initial_space - final_space
stop_time = time.time()
total_time = stop_time - self.start_time
# round up the time to one minute, to keep extremely short
# tests from generating false positives due to short, badly
# timed bursts of activity
total_time = max(total_time, 60.0)
# determine the usage rate
bytes_per_sec = used_space / total_time
mb_per_sec = bytes_per_sec / 1024**2
mb_per_hour = mb_per_sec * 60 * 60
if mb_per_hour > self.max_mb_per_hour:
msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
msg %= (self.device, mb_per_hour)
self.func(msg)
@classmethod
def watch(cls, *monitor_args, **monitor_dargs):
""" Generic decorator to wrap a function call with the
standard create-monitor -> start -> call -> stop idiom."""
def decorator(func):
def watched_func(*args, **dargs):
monitor = cls(*monitor_args, **monitor_dargs)
monitor.start()
try:
func(*args, **dargs)
finally:
monitor.stop()
return watched_func
return decorator
def runjob(control, drop_caches, options):
"""
Run a job using the given control file.
This is the main interface to this module.
@see base_job.__init__ for parameter info.
"""
control = os.path.abspath(control)
state = control + '.state'
# Ensure state file is cleaned up before the job starts to run if autotest
# is not running with the --continue flag
if not options.cont and os.path.isfile(state):
logging.debug('Cleaning up previously found state file')
os.remove(state)
# instantiate the job object ready for the control file.
myjob = None
try:
# Check that the control file is valid
if not os.path.exists(control):
raise error.JobError(control + ": control file not found")
# When continuing, the job is complete when there is no
# state file, ensure we don't try and continue.
if options.cont and not os.path.exists(state):
raise error.JobComplete("all done")
myjob = job(control=control, drop_caches=drop_caches, options=options)
# Load in the users control file, may do any one of:
# 1) execute in toto
# 2) define steps, and select the first via next_step()
myjob.step_engine()
except error.JobContinue:
sys.exit(5)
except error.JobComplete:
sys.exit(1)
except error.JobError as instance:
logging.error("JOB ERROR: " + str(instance))
if myjob:
command = None
if len(instance.args) > 1:
command = instance.args[1]
myjob.record('ABORT', None, command, str(instance))
myjob.record('END ABORT', None, None, str(instance))
assert myjob._record_indent == 0
myjob.complete(1)
else:
sys.exit(1)
except Exception as e:
# NOTE: job._run_step_fn and job.step_engine will turn things into
# a JobError for us. If we get here, its likely an autotest bug.
msg = str(e) + '\n' + traceback.format_exc()
logging.critical("JOB ERROR (autotest bug?): " + msg)
if myjob:
myjob.record('END ABORT', None, None, msg)
assert myjob._record_indent == 0
myjob.complete(1)
else:
sys.exit(1)
# If we get here, then we assume the job is complete and good.
myjob.record('END GOOD', None, None)
assert myjob._record_indent == 0
myjob.complete(0)
class job(base_client_job):
def __init__(self, *args, **kwargs):
base_client_job.__init__(self, *args, **kwargs)
def run_test(self, url, *args, **dargs):
log_pauser = cros_logging.LogRotationPauser()
passed = False
try:
log_pauser.begin()
passed = base_client_job.run_test(self, url, *args, **dargs)
if not passed:
# Save the VM state immediately after the test failure.
# This is a NOOP if the the test isn't running in a VM or
# if the VM is not properly configured to save state.
_group, testname = self.pkgmgr.get_package_name(url, 'test')
now = datetime.now().strftime('%I:%M:%S.%f')
checkpoint_name = '%s-%s' % (testname, now)
utils.save_vm_state(checkpoint_name)
finally:
log_pauser.end()
return passed
def reboot(self):
self.reboot_setup()
self.harness.run_reboot()
# sync first, so that a sync during shutdown doesn't time out
utils.system('sync; sync', ignore_status=True)
utils.system('reboot </dev/null >/dev/null 2>&1 &')
self.quit()
def require_gcc(self):
return False
# TODO(ayatane): This logic should be deduplicated with
# server/cros/dynamic_suite/control_file_getter.py, but the server
# libraries are not available on clients.
def _locate_test_control_file(dirpath, testname):
"""
Locate the control file for the given test.
@param dirpath Root directory to search.
@param testname Name of test.
@returns Absolute path to the control file.
@raise JobError: Raised if control file not found.
"""
for dirpath, _dirnames, filenames in os.walk(dirpath):
for filename in filenames:
if 'control' not in filename:
continue
path = os.path.join(dirpath, filename)
if _is_control_file_for_test(path, testname):
return os.path.abspath(path)
raise error.JobError(
'could not find client test control file',
dirpath, testname)
_NAME_PATTERN = "NAME *= *['\"]([^'\"]+)['\"]"
def _is_control_file_for_test(path, testname):
with open(path) as f:
for line in f:
match = re.match(_NAME_PATTERN, line)
if match is not None:
return match.group(1) == testname