@ -8,6 +8,7 @@ import threading
import shlex
import sys
import shutil
import string
import unittest
@ -75,9 +76,24 @@ class CasePreservingConfigParser(configparser.ConfigParser):
return option
#: global parameters extracted from the test suite's configuration file
_parameters = { }
#: setting whether debug mode is enabled or not
_debug_mode = False
def set_debug_mode ( debug ) :
""" Enable or disable debug mode
In debug mode the test suite will print out all commands that it runs , the
expected output and the actually obtained output
"""
global _debug_mode
_debug_mode = debug
def configure_suite ( config_file ) :
"""
Populates a global datastructure with the parameters from the suite ' s
@ -406,42 +422,120 @@ class CopyFiles(FileDecoratorBase):
return shutil . copyfile ( expanded_file_name , new_name )
class Case ( unittest . TestCase ) :
def test_run ( self ) :
"""
System test case base class , provides the functionality to interpret static
class members as system tests and runs them .
This function reads in the members commands , retval , stdout , stderr and runs
the ` expand_variables ` function on each . The resulting commands are then run
using the subprocess module and compared against the expected values that
were provided in the static members via ` compare_stdout ` and
` compare_stderr ` . Furthermore a threading . Timer is used to abort the
execution if a configured timeout is reached .
This class reads in the members commands , retval , stdout , stderr and runs
the format function on each , where format is called with the kwargs being a
merged dictionary of all variables that were extracted from the suite ' s
configuration file and all static members of the current class .
It is automatically added as a member function to each system test by the
CaseMeta metaclass . This ensures that it is run by each system test
* * after * * setUp ( ) and setUpClass ( ) were run .
"""
if not ( len ( self . commands ) == len ( self . retval )
== len ( self . stdout ) == len ( self . stderr ) ) :
raise ValueError (
" commands, retval, stdout and stderr don ' t have the same length "
)
for i , command , retval , stdout , stderr in zip ( range ( len ( self . commands ) ) ,
self . commands ,
self . retval ,
self . stdout ,
self . stderr ) :
command , retval , stdout , stderr = map (
self . expand_variables , [ command , retval , stdout , stderr ]
)
retval = int ( retval )
timeout = { " flag " : False }
if _debug_mode :
print (
' ' , " = " * 80 , " will run: " + command , " expected stdout: " , stdout ,
" expected stderr: " , stderr ,
" expected return value: {:d} " . format ( retval ) ,
sep = ' \n '
)
The resulting commands are then run using the subprocess module and compared
against the expected values that were provided in the static
members . Furthermore a threading . Timer is used to abort the execution if a
configured timeout is reached .
proc = subprocess . Popen (
_cmd_splitter ( command ) ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
cwd = self . work_dir
)
def timeout_reached ( timeout ) :
timeout [ " flag " ] = True
proc . kill ( )
t = threading . Timer (
_parameters [ " timeout " ] , timeout_reached , args = [ timeout ]
)
t . start ( )
got_stdout , got_stderr = proc . communicate ( )
t . cancel ( )
processed_stdout = None
processed_stderr = None
for encoding in self . encodings :
try :
processed_stdout = _process_output_post (
got_stdout . decode ( encoding )
)
processed_stderr = _process_output_post (
got_stderr . decode ( encoding )
)
except UnicodeError :
pass
else :
break
if processed_stdout is None or processed_stderr is None :
raise ValueError (
" Could not decode the output of the command ' {!s} ' with the "
" following encodings: {!s} "
. format ( command , ' , ' . join ( self . encodings ) )
)
if _debug_mode :
print (
" got stdout: " , processed_stdout , " got stderr: " ,
processed_stderr , " got return value: {:d} "
. format ( proc . returncode ) ,
sep = ' \n '
)
The class itself must be inherited from , otherwise it is not useful at all ,
as it does not provide any static members that could be used to run system
tests . However , a class that inherits from this class needn ' t provide any
member functions at all , the inherited test_run ( ) function performs all
required functionality in child classes .
self . assertFalse ( timeout [ " flag " ] , msg = " Timeout reached " )
self . compare_stdout ( i , command , processed_stdout , stdout )
self . compare_stderr ( i , command , processed_stderr , stderr )
self . assertEqual (
retval , proc . returncode , msg = " Return value does not match "
)
class Case ( unittest . TestCase ) :
"""
System test case base class , provides the functionality to interpret static
class members as system tests .
The class itself only provides utility functions and system tests need not
inherit from it , as it is automatically added via the CaseMeta metaclass .
"""
""" maxDiff set so that arbitrarily large diffs will be shown """
#: maxDiff set so that arbitrarily large diffs will be shown
maxDiff = None
#: list of encodings that are used to decode the test program's output
#: the first encoding that does not raise a UnicodeError is used
encodings = [ ' utf-8 ' , ' iso-8859-1 ' ]
@classmethod
def setUpClass ( cls ) :
"""
This function adds the variables variable_dict & work_dir to the class .
work_dir - set to the file where the current class is defined
variable_dict - a merged dictionary of all static members of the current
class and all variables extracted from the suite ' s
configuration file
This function adds the variable work_dir to the class , which is the path
to the directory where the python source file is located .
"""
cls . variable_dict = _disjoint_dict_merge ( cls . __dict__ , _parameters )
cls . work_dir = os . path . dirname ( inspect . getfile ( cls ) )
def compare_stdout ( self , i , command , got_stdout , expected_stdout ) :
@ -460,71 +554,89 @@ class Case(unittest.TestCase):
unittest . TestCase . This function can be overridden in a child class to
implement a custom check .
"""
self . assertMultiLineEqual ( expected_stdout , got_stdout )
self . assertMultiLineEqual (
expected_stdout , got_stdout , msg = " Standard output does not match "
)
def compare_stderr ( self , i , command , got_stderr , expected_stderr ) :
"""
Same as compare_stdout only for standard - error .
"" "
self . assertMultiLineEqual ( expected_stderr , got_stderr )
""" Same as compare_stdout only for standard-error. """
self . assertMultiLineEqual (
expected_stderr , got_stderr , msg = " Standard error does not match "
)
def expand_variables ( self , string) :
def expand_variables ( self , unexpanded_ string) :
"""
Expands all variables in curly braces in the given string using the
dictionary variable_dict .
Expands all variables of the form ` ` $ var ` ` in the given string using the
dictionary ` variable_dict ` .
The expansion itself is performed by the builtin string method format ( ) .
A KeyError indicates that the supplied string contains a variable
in curly braces that is missing from self . variable_dict
The expansion itself is performed by the string ' s template module using
via ` safe_substitute ` .
"""
return str ( string ) . format ( * * self . variable_dict )
return string . Template ( str ( unexpanded_string ) ) \
. safe_substitute ( * * self . variable_dict )
def test_run ( self ) :
"""
Actual system test function which runs the provided commands ,
pre - processes all variables and post processes the output before passing
it on to compare_stderr ( ) & compare_stdout ( ) .
class CaseMeta ( type ) :
""" System tests generation metaclass.
This metaclass is performs the following tasks :
1. Add the ` test_run ` function as a member of the test class
2. Add the ` Case ` class as the parent class
3. Expand all variables already defined in the class , so that any additional
code does not have to perform this task
Using a metaclass instead of inheriting from case has the advantage , that we
can expand all variables in the strings before any test case or even the
class constructor is run ! Thus users will immediately see the expanded
result . Also adding the ` test_run ` function as a direct member and not via
inheritance enforces that it is being run * * after * * the test cases setUp &
setUpClass ( which oddly enough seems not to be the case in the unittest
module where test functions of the parent class run before setUpClass of the
child class ) .
"""
for i , command , retval , stdout , stderr in zip ( range ( len ( self . commands ) ) ,
self . commands ,
self . retval ,
self . stdout ,
self . stderr ) :
command , retval , stdout , stderr = map (
self . expand_variables , [ command , retval , stdout , stderr ]
)
retval = int ( retval )
timeout = { " flag " : False }
def __new__ ( mcs , clsname , bases , dct ) :
proc = subprocess . Popen (
_cmd_splitter ( command ) ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
cwd = self . work_dir
)
changed = False
def timeout_reached ( timeout ) :
timeout [ " flag " ] = True
proc . kill ( )
# expand all non-private variables by brute force
# => try all expanding all elements defined in the current class until
# there is no change in them any more
keys = [ key for key in list ( dct . keys ( ) ) if not key . startswith ( ' _ ' ) ]
while not changed :
for key in keys :
t = threading . Timer (
_parameters [ " timeout " ] , timeout_reached , args = [ timeout ]
)
t . start ( )
got_stdout , got_stderr = proc . communicate ( )
t . cancel ( )
old_value = dct [ key ]
self . assertFalse ( timeout [ " flag " ] and " Timeout reached " )
self . compare_stdout (
i , command ,
_process_output_post ( got_stdout . decode ( ' utf-8 ' ) ) , stdout
# only try expanding strings and lists
if isinstance ( old_value , str ) :
new_value = string . Template ( old_value ) . safe_substitute (
* * _disjoint_dict_merge ( dct , _parameters )
)
self . compare_stderr (
i , command ,
_process_output_post ( got_stderr . decode ( ' utf-8 ' ) ) , stderr
elif isinstance ( old_value , list ) :
# do not try to expand anything but strings in the list
new_value = [
string . Template ( elem ) . safe_substitute (
* * _disjoint_dict_merge ( dct , _parameters )
)
self . assertEqual ( retval , proc . returncode )
if isinstance ( elem , str ) else elem
for elem in old_value
]
else :
continue
if old_value != new_value :
changed = True
dct [ key ] = new_value
dct [ ' variable_dict ' ] = _disjoint_dict_merge ( dct , _parameters )
dct [ ' test_run ' ] = test_run
if Case not in bases :
bases + = ( Case , )
return super ( CaseMeta , mcs ) . __new__ ( mcs , clsname , bases , dct )
def check_no_ASAN_UBSAN_errors ( self , i , command , got_stderr , expected_stderr ) :