You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
332 lines
12 KiB
Python
332 lines
12 KiB
Python
8 years ago
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import configparser
|
||
|
import os
|
||
|
import inspect
|
||
|
import subprocess
|
||
|
import threading
|
||
|
import shlex
|
||
|
import sys
|
||
|
import shutil
|
||
|
import unittest
|
||
|
|
||
|
|
||
|
if sys.platform == 'win32':
|
||
|
def _cmd_splitter(cmd):
|
||
|
return cmd
|
||
|
|
||
|
def _process_output_post(output):
|
||
|
return output.replace('\r\n', '\n')
|
||
|
|
||
|
else:
|
||
|
def _cmd_splitter(cmd):
|
||
|
return shlex.split(cmd)
|
||
|
|
||
|
def _process_output_post(output):
|
||
|
return output
|
||
|
|
||
|
|
||
|
def _disjoint_dict_merge(d1, d2):
|
||
|
"""
|
||
|
Merges two dictionaries with no common keys together and returns the result.
|
||
|
|
||
|
>>> d1 = {"a": 1}
|
||
|
>>> d2 = {"b": 2, "c": 3}
|
||
|
>>> _disjoint_dict_merge(d1, d2) == {"a": 1, "b": 2, "c": 3}
|
||
|
True
|
||
|
|
||
|
Calling this function with dictionaries that share keys raises a ValueError:
|
||
|
>>> _disjoint_dict_merge({"a": 1, "b": 6}, {"b": 2, "a": 3})
|
||
|
Traceback (most recent call last):
|
||
|
..
|
||
|
ValueError: Dictionaries have common keys.
|
||
|
|
||
|
"""
|
||
|
inter = set(d1.keys()).intersection(set(d2.keys()))
|
||
|
if len(inter) > 0:
|
||
|
raise ValueError("Dictionaries have common keys.")
|
||
|
res = d1.copy()
|
||
|
res.update(d2)
|
||
|
return res
|
||
|
|
||
|
|
||
|
_parameters = {}
|
||
|
|
||
|
|
||
|
def configure_suite(config_file):
|
||
|
"""
|
||
|
Populates a global datastructure with the parameters from the suite's
|
||
|
configuration file.
|
||
|
|
||
|
This function performs the following steps:
|
||
|
1. read in the file ``config_file`` via the ConfigParser module using
|
||
|
extended interpolation
|
||
|
2. check that the sections ``variables`` and ``paths`` are disjoint
|
||
|
3. extract the environment variables given in the ``ENV`` section
|
||
|
4. save all entries from the ``variables`` section in the global
|
||
|
datastructure
|
||
|
5. interpret all entries in the ``paths`` section as relative paths from the
|
||
|
configuration file, expand them to absolute paths and save them in the
|
||
|
global datastructure
|
||
|
|
||
|
For further information concerning the rationale behind this, please consult
|
||
|
the documentation in ``doc.md``.
|
||
|
"""
|
||
|
|
||
|
if not os.path.exists(config_file):
|
||
|
raise ValueError(
|
||
|
"Test suite config file {:s} does not exist"
|
||
|
.format(os.path.abspath(config_file))
|
||
|
)
|
||
|
|
||
|
config = configparser.ConfigParser(
|
||
|
interpolation=configparser.ExtendedInterpolation(),
|
||
|
delimiters=(':'),
|
||
|
comment_prefixes=('#')
|
||
|
)
|
||
|
config.read(config_file)
|
||
|
|
||
|
_parameters["suite_root"] = os.path.split(os.path.abspath(config_file))[0]
|
||
|
_parameters["timeout"] = config.getfloat("General", "timeout", fallback=1.0)
|
||
|
|
||
|
if 'variables' in config and 'paths' in config:
|
||
|
intersecting_keys = set(config["paths"].keys())\
|
||
|
.intersection(set(config["variables"].keys()))
|
||
|
if len(intersecting_keys) > 0:
|
||
|
raise ValueError(
|
||
|
"The sections 'paths' and 'variables' must not share keys, "
|
||
|
"but they have the following common key{:s}: {:s}"
|
||
|
.format(
|
||
|
's' if len(intersecting_keys) > 1 else '',
|
||
|
', '.join(k for k in intersecting_keys)
|
||
|
)
|
||
|
)
|
||
|
|
||
|
# extract variables from the environment
|
||
|
for key in config['ENV']:
|
||
|
if key in config['ENV fallback']:
|
||
|
fallback = config['ENV fallback'][key]
|
||
|
else:
|
||
|
fallback = ""
|
||
|
config['ENV'][key] = os.getenv(config['ENV'][key]) or fallback
|
||
|
|
||
|
if 'variables' in config:
|
||
|
for key in config['variables']:
|
||
|
_parameters[key] = config['variables'][key]
|
||
|
|
||
|
if 'paths' in config:
|
||
|
for key in config['paths']:
|
||
|
rel_path = config['paths'][key]
|
||
|
abs_path = os.path.abspath(
|
||
|
os.path.join(_parameters["suite_root"], rel_path)
|
||
|
)
|
||
|
if not os.path.exists(abs_path):
|
||
|
raise ValueError(
|
||
|
"Path replacement for {short}: {abspath} does not exist"
|
||
|
" (was expanded from {rel})".format(
|
||
|
short=key,
|
||
|
abspath=abs_path,
|
||
|
rel=rel_path)
|
||
|
)
|
||
|
_parameters[key] = abs_path
|
||
|
|
||
|
|
||
|
def _setUp_factory(old_setUp, *files):
|
||
|
"""
|
||
|
Factory function that returns a setUp function suitable to replace the
|
||
|
existing setUp of a unittest.TestCase. The returned setUp calls at first
|
||
|
old_setUp(self) and then creates a copy of all files in *files with the
|
||
|
name: fname.ext -> fname_copy.ext
|
||
|
|
||
|
All file names in *files are at first expanded using self.expand_variables()
|
||
|
and the path to the copy is saved in self._file_copies
|
||
|
"""
|
||
|
def setUp(self):
|
||
|
old_setUp(self)
|
||
|
self._file_copies = []
|
||
|
for f in files:
|
||
|
expanded_fname = self.expand_variables(f)
|
||
|
fname, ext = os.path.splitext(expanded_fname)
|
||
|
new_name = fname + '_copy' + ext
|
||
|
self._file_copies.append(
|
||
|
shutil.copyfile(expanded_fname, new_name)
|
||
|
)
|
||
|
return setUp
|
||
|
|
||
|
|
||
|
def _tearDown_factory(old_tearDown):
|
||
|
"""
|
||
|
Factory function that returns a new tearDown method to replace an existing
|
||
|
tearDown method. It at first deletes all files in self._file_copies and then
|
||
|
calls old_tearDown(self).
|
||
|
This factory is intended to be used in conjunction with _setUp_factory
|
||
|
"""
|
||
|
def tearDown(self):
|
||
|
for f in self._file_copies:
|
||
|
os.remove(f)
|
||
|
old_tearDown(self)
|
||
|
return tearDown
|
||
|
|
||
|
|
||
|
def CopyFiles(*files):
|
||
|
"""
|
||
|
Decorator for subclasses of system_test.Case that automatically creates a
|
||
|
copy of the files specified as the parameters to the decorator.
|
||
|
|
||
|
Example:
|
||
|
>>> @CopyFiles("{some_var}/file.txt", "{another_var}/other_file.png")
|
||
|
class Foo(Case):
|
||
|
pass
|
||
|
|
||
|
The decorator will inject new setUp method that at first calls the already
|
||
|
defined setUp(), then expands all supplied file names using
|
||
|
Case.expand_variables and then creates copies by appending '_copy' before
|
||
|
the file extension. The paths to the copies are stored in self._file_copies.
|
||
|
|
||
|
The decorator also injects a new tearDown method that deletes all files in
|
||
|
self._file_copies and then calls the original tearDown method.
|
||
|
|
||
|
This function will also complain if it is called without arguments or
|
||
|
without paranthesis, which is valid decorator syntax but is obviously a bug
|
||
|
in this case.
|
||
|
"""
|
||
|
if len(files) == 0:
|
||
|
raise ValueError("No files to copy supplied.")
|
||
|
elif len(files) == 1:
|
||
|
if isinstance(files[0], type):
|
||
|
raise UserWarning(
|
||
|
"Decorator used wrongly, must be called with filenames in paranthesis"
|
||
|
)
|
||
|
|
||
|
def wrapper(cls):
|
||
|
old_setUp = cls.setUp
|
||
|
cls.setUp = _setUp_factory(old_setUp, *files)
|
||
|
|
||
|
old_tearDown = cls.tearDown
|
||
|
cls.tearDown = _tearDown_factory(old_tearDown)
|
||
|
|
||
|
return cls
|
||
|
|
||
|
return wrapper
|
||
|
|
||
|
|
||
|
class Case(unittest.TestCase):
|
||
|
"""
|
||
|
System test case base class, provides the functionality to interpret static
|
||
|
class members as system tests and runs them.
|
||
|
|
||
|
This class reads in the members commands, retval, stdout, stderr and runs
|
||
|
the format function on each, where format is called with the kwargs being a
|
||
|
merged dictionary of all variables that were extracted from the suite's
|
||
|
configuration file and all static members of the current class.
|
||
|
|
||
|
The resulting commands are then run using the subprocess module and compared
|
||
|
against the expected values that were provided in the static
|
||
|
members. Furthermore a threading.Timer is used to abort the execution if a
|
||
|
configured timeout is reached.
|
||
|
|
||
|
The class itself must be inherited from, otherwise it is not useful at all,
|
||
|
as it does not provide any static members that could be used to run system
|
||
|
tests. However, a class that inherits from this class needn't provide any
|
||
|
member functions at all, the inherited test_run() function performs all
|
||
|
required functionality in child classes.
|
||
|
"""
|
||
|
|
||
|
""" maxDiff set so that arbitrarily large diffs will be shown """
|
||
|
maxDiff = None
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
"""
|
||
|
This function adds the variables variable_dict & work_dir to the class.
|
||
|
|
||
|
work_dir - set to the file where the current class is defined
|
||
|
variable_dict - a merged dictionary of all static members of the current
|
||
|
class and all variables extracted from the suite's
|
||
|
configuration file
|
||
|
"""
|
||
|
cls.variable_dict = _disjoint_dict_merge(cls.__dict__, _parameters)
|
||
|
cls.work_dir = os.path.dirname(inspect.getfile(cls))
|
||
|
|
||
|
def compare_stdout(self, i, command, got_stdout, expected_stdout):
|
||
|
"""
|
||
|
Function to compare whether the expected & obtained stdout match.
|
||
|
|
||
|
This function is automatically invoked by test_run with the following
|
||
|
parameters:
|
||
|
i - the index of the current command that is run in self.commands
|
||
|
command - the command that was run
|
||
|
got_stdout - the obtained stdout, post-processed depending on the
|
||
|
platform so that lines always end with \n
|
||
|
expected_stdout - the expected stdout extracted from self.stdout
|
||
|
|
||
|
The default implementation simply uses assertMultiLineEqual from
|
||
|
unittest.TestCase. This function can be overridden in a child class to
|
||
|
implement a custom check.
|
||
|
"""
|
||
|
self.assertMultiLineEqual(expected_stdout, got_stdout)
|
||
|
|
||
|
def compare_stderr(self, i, command, got_stderr, expected_stderr):
|
||
|
"""
|
||
|
Same as compare_stdout only for standard-error.
|
||
|
"""
|
||
|
self.assertMultiLineEqual(expected_stderr, got_stderr)
|
||
|
|
||
|
def expand_variables(self, string):
|
||
|
"""
|
||
|
Expands all variables in curly braces in the given string using the
|
||
|
dictionary variable_dict.
|
||
|
|
||
|
The expansion itself is performed by the builtin string method format().
|
||
|
A KeyError indicates that the supplied string contains a variable
|
||
|
in curly braces that is missing from self.variable_dict
|
||
|
"""
|
||
|
return str(string).format(**self.variable_dict)
|
||
|
|
||
|
def test_run(self):
|
||
|
"""
|
||
|
Actual system test function which runs the provided commands,
|
||
|
pre-processes all variables and post processes the output before passing
|
||
|
it on to compare_stderr() & compare_stdout().
|
||
|
"""
|
||
|
|
||
|
for i, command, retval, stdout, stderr in zip(range(len(self.commands)),
|
||
|
self.commands,
|
||
|
self.retval,
|
||
|
self.stdout,
|
||
|
self.stderr):
|
||
|
command, retval, stdout, stderr = map(
|
||
|
self.expand_variables, [command, retval, stdout, stderr]
|
||
|
)
|
||
|
retval = int(retval)
|
||
|
timeout = {"flag": False}
|
||
|
|
||
|
proc = subprocess.Popen(
|
||
|
_cmd_splitter(command),
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE,
|
||
|
cwd=self.work_dir
|
||
|
)
|
||
|
|
||
|
def timeout_reached(timeout):
|
||
|
timeout["flag"] = True
|
||
|
proc.kill()
|
||
|
|
||
|
t = threading.Timer(
|
||
|
_parameters["timeout"], timeout_reached, args=[timeout]
|
||
|
)
|
||
|
t.start()
|
||
|
got_stdout, got_stderr = proc.communicate()
|
||
|
t.cancel()
|
||
|
|
||
|
self.assertFalse(timeout["flag"] and "Timeout reached")
|
||
|
self.compare_stdout(
|
||
|
i, command,
|
||
|
_process_output_post(got_stdout.decode('utf-8')), stdout
|
||
|
)
|
||
|
self.compare_stderr(
|
||
|
i, command,
|
||
|
_process_output_post(got_stderr.decode('utf-8')), stderr
|
||
|
)
|
||
|
self.assertEqual(retval, proc.returncode)
|