Merge pull request #419 from D4N/testsuite_api

Testsuite improvements
v0.27.3
D4N 7 years ago committed by GitHub
commit 7812043b7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,7 +6,15 @@ matrix:
dist: trusty
sudo: required
compiler: gcc
env: COVERAGE=1 CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON -DBUILD_WITH_COVERAGE=ON -DEXIV2_TEAM_USE_SANITIZERS=ON" # All enabled + Coverage
env: COVERAGE=1 CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON -DBUILD_WITH_COVERAGE=ON" # All enabled + Coverage
- os: linux
dist: trusty
sudo: required
compiler: gcc
env:
- WITH_VALGRIND=1
- CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON"
- os: linux
dist: trusty
@ -19,7 +27,7 @@ matrix:
- sourceline: 'ppa:ubuntu-toolchain-r/test'
packages:
- g++-8
env: CC=gcc-8 CXX=g++-8 CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON -DEXIV2_TEAM_USE_SANITIZERS=ON"
env: CC=gcc-8 CXX=g++-8 CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON"
- os: linux
dist: trusty
@ -29,7 +37,7 @@ matrix:
- os: osx
osx_image: xcode9
compiler: clang
env: PYTHON=3.6.2 CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON -DEXIV2_TEAM_USE_SANITIZERS=ON" # All enabled
env: PYTHON=3.6.2 CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release -DEXIV2_ENABLE_VIDEO=ON -DEXIV2_ENABLE_WEBREADY=ON -DEXIV2_BUILD_UNIT_TESTS=ON" # All enabled
env:
#- CMAKE_OPTIONS="-DCMAKE_BUILD_TYPE=Release" # Default

@ -6,6 +6,9 @@ if [[ "$(uname -s)" == 'Linux' ]]; then
sudo apt-get update
sudo apt-get install cmake zlib1g-dev libssh-dev gettext
sudo apt-get install python-pip libxml2-utils
if [ -n "$WITH_VALGRIND" ]; then
sudo apt-get install valgrind
fi
sudo pip install virtualenv
virtualenv conan
source conan/bin/activate

@ -5,7 +5,17 @@ set -x
if [[ "$(uname -s)" == 'Linux' ]]; then
source conan/bin/activate
if [ "$CC" == "clang" ]; then
# clang + Ubuntu don't like to run with UBSAN, but ASAN works
export CMAKE_OPTIONS="$CMAKE_OPTIONS -DCMAKE_CXX_FLAGS=\"-fsanitize=address\" -DCMAKE_C_FLAGS=\"-fsanitize=address\" -DCMAKE_EXE_LINKER_FLAGS=\"-fsanitize=address\" -DCMAKE_MODULE_LINKER_FLAGS=\"-fsanitize=address\""
elif [ -n "$WITH_VALGRIND" ]; then
export EXIV2_VALGRIND="valgrind --quiet"
else
export CMAKE_OPTIONS="$CMAKE_OPTIONS -DEXIV2_TEAM_USE_SANITIZERS=ON"
fi
else
export CMAKE_OPTIONS="$CMAKE_OPTIONS -DEXIV2_TEAM_USE_SANITIZERS=ON"
export PYENV_VERSION=$PYTHON
export PATH="/Users/travis/.pyenv/shims:${PATH}"
eval "$(pyenv init -)"
@ -16,18 +26,17 @@ fi
mkdir build && cd build
conan install .. --build missing --profile release
cmake ${CMAKE_OPTIONS} -DCMAKE_INSTALL_PREFIX=install ..
make -j2 VERBOSE=1
#On most systems, you can set the TZ environment variable to set the timezone for a process. It's a POSIX feature.
export TZ=UTC
make tests
make install
pushd .
cd bin
./unit_tests
$EXIV2_VALGRIND ./unit_tests
popd
if [ -n "$COVERAGE" ]; then
cd ..
bash <(curl -s https://codecov.io/bash)
fi

@ -404,6 +404,7 @@ namespace Exiv2 {
throw Error(kerInvalidMalloc);
}
DataBuf buf((long)allocate); // allocate a buffer
std::memset(buf.pData_, 0, buf.size_);
std::memcpy(buf.pData_,dir.pData_+8,4); // copy dir[8:11] into buffer (short strings)
const bool bOffsetIsPointer = count*size > 4;

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
import os
import shutil
import string
import system_tests
class FailureOnCifsShares(metaclass=system_tests.CaseMeta):
url = "http://dev.exiv2.org/issues/1043"
num = 1043
original_file = system_tests.path("$data_path/exiv2-bug884c.jpg")
files = [
system_tests.path("$data_path/bug$num-" + char + ".jpg")
for char in string.ascii_uppercase
]
def setUp(self):
for fname in self.files:
shutil.copyfile(self.original_file, fname)
def tearDown(self):
for fname in self.files:
os.remove(fname)
commands = [
"""$exiv2 -u -v -M"set Exif.Photo.UserComment Test Bug $num my filename is {fname_short}" {fname}"""\
.format(fname=fname, fname_short=os.path.split(fname)[1])
for fname in files
] + [
# workaround for * wildcard in bash:
"""$exiv2 -PE -g UserComment {!s}""".format(" ".join(files))
]
retval = [0] * (len(files) + 1)
stdout = [
"""File 1/1: {fname}
Set Exif.Photo.UserComment "Test Bug $num my filename is {fname_short}" (Comment)
"""
.format(fname=fname, fname_short=os.path.split(fname)[1])
for fname in files
] + [
"""""".join(
"""{fname} Exif.Photo.UserComment Undefined 50 Test Bug $num my filename is {fname_short}
"""
.format(fname=fname, fname_short=os.path.split(fname)[1])
for fname in files
)
]
stderr= [""] * (len(files) + 1)

@ -1,14 +1,16 @@
# -*- coding: utf-8 -*-
import unittest
import os
import system_tests
@unittest.skipUnless(os.getenv('TZ') == 'UTC', "Testcase only works with the timezone set to UTC")
class Exiv2jsonRecursiveJsonTreeWithXMP(metaclass=system_tests.CaseMeta):
url = "http://dev.exiv2.org/issues/1054"
env = {
'TZ': 'UTC'
}
filename1 = system_tests.path("$data_path/BlueSquare.xmp")
filename2 = system_tests.path("$data_path/exiv2-bug784.jpg")

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import hashlib
import system_tests
class IccProfileInApp2Segment(metaclass=system_tests.CaseMeta):
url = "http://dev.exiv2.org/issues/1074"
num = 1074
encodings = [bytes]
filenames = [
system_tests.path("$data_path/" + fname)
for fname in (
"exiv2-bug$num.png", "imagemagick.png", "Reagan.tiff", "Reagan.jpg"
)
]
commands = [
"$exiv2 -pC " + fname for fname in filenames
]
def compare_stdout(self, i, command, got_stdout, expected_stdout):
self.assertEqual(
hashlib.md5(got_stdout).hexdigest(), expected_stdout
)
stderr = [bytes()] * len(filenames)
stdout = [
"5c02432934195866147d8cbfa49f3fcf",
"cf0aeee7fdc11b20ad8a19d65628488e",
"1d3fda2edb4a89ab60a23c5f7c7d81dd",
"50b9125494306a6fc1b7c4f2a1a8d49d"
]
retval = [0] * len(filenames)

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
import system_tests
@system_tests.CopyFiles("$data_path/exiv2-empty.jpg")
class MetadataPiping(metaclass=system_tests.CaseMeta):
url = "http://dev.exiv2.org/issues/1137"
filename = system_tests.path("$data_path/exiv2-empty_copy.jpg")
Stonehenge = system_tests.path("$data_path/Stonehenge.exv")
commands = [
"""$exiv2 -pa $filename""",
"""$exiv2 -PkV --grep GPSL $Stonehenge""",
"""$exiv2 -m- $filename""",
"""$exiv2 -pa --grep GPSL $filename"""
]
output_grep_GPSL = """set Exif.GPSInfo.GPSLatitudeRef N
set Exif.GPSInfo.GPSLatitude 51/1 106969/10000 0/1
set Exif.GPSInfo.GPSLongitudeRef W
set Exif.GPSInfo.GPSLongitude 1/1 495984/10000 0/1
"""
stdin = [
None,
None,
output_grep_GPSL,
None
]
stdout = [
"",
output_grep_GPSL,
"",
"""Exif.GPSInfo.GPSLatitudeRef Ascii 2 North
Exif.GPSInfo.GPSLatitude Rational 3 51deg 10.69690'
Exif.GPSInfo.GPSLongitudeRef Ascii 2 West
Exif.GPSInfo.GPSLongitude Rational 3 1deg 49.59840'
"""
]

@ -312,6 +312,48 @@ This section describes more advanced features that are probably not necessary
the "standard" usage of the test suite.
### Providing standard input to commands
The test suite supports providing a standard input to commands in a similar
fashion as the standard output and error are specified: it expects a list (with
the length equal to the number of commands) of standard inputs (either strings
or `bytes`). For commands that expect no standard input, simply set the
respective entry to `None`:
``` python
# -*- coding: utf-8 -*-
import system_tests
class AnInformativeName(metaclass=system_tests.CaseMeta):
commands = [
"$binary -c $import_file --",
"$binary -c $import_file --"
]
retval = [1, 1]
stdin = [
"read file a",
None
]
stdout = [
"Reading...",
""
]
stderr = [
"Error",
"No input provided"
]
```
In this example, the command `$binary -c $import_file --` would be run twice,
first with the standard input `read file a` and second without any input
(resulting in the error `No input provided`).
If all commands don't expect any standard input, omit the attribute `stdin`, the
test suite will implicitly assume `None` for every command.
### Using a different output encoding
The test suite will try to interpret the program's output as utf-8 encoded
@ -351,6 +393,70 @@ encodings can be found
[here](https://docs.python.org/3/library/codecs.html#standard-encodings).
### Working with binary output
Some programs output binary data directly to stdout or stderr. Such programs can
be also tested by specifying the type `bytes` as the only member in the
`encodings` list and supplying `stdout` and/or `stderr` as `bytes` and not as a
string.
An example test case would look like this:
``` python
# -*- coding: utf-8 -*-
import system_tests
class AnInformativeName(metaclass=system_tests.CaseMeta):
encodings = [bytes]
commands = ["$prog --dump-binary"]
retval = [1]
stdout = [bytes([1, 2, 3, 4, 16, 42])]
stderr = [bytes()]
```
Using the bytes encoding has the following limitations:
- variables of the form `$some_var` cannot be expanded in `stdout` and `stderr`
- if the `bytes` encoding is specified, then both `stderr` and `stdout` must be
valid `bytes`
### Setting and modifying environment variables
The test suite supports setting or modifying environment variables for
individual test cases. This can be accomplished by adding a member dictionary
named `env` with the appropriate variable names and keys:
``` python
# -*- coding: utf-8 -*-
from system_tests import CaseMeta, path
class AnInformativeName(metaclass=CaseMeta):
env = {
"MYVAR": 26,
"USER": "foobar"
}
# if you want a pristine environment, consisting only of MYVAR & USER,
# uncomment the following line:
# inherit_env = False
# the rest of the test case follows
```
All commands belonging to this test case will be run with a modified environment
where the variables `MYVAR` and `USER` will be set to the specified
values. By default the environment is inherited from the user's environment and
the specified variables in `env` take precedence over the variables in the
user's environment (in the above example the variable `$USER` would be
overridden). If no variables should be inherited set `inherit_env` to `False`
and your test case will get only the specified environment variables.
### Creating file copies
For tests that modify their input file it is useful to run these with a
@ -444,6 +550,43 @@ class AnInformativeName(metaclass=system_tests.CaseMeta):
```
### Running all commands under valgrind
The test suite can run all commands under a memory checker like
[valgrind](http://valgrind.org/) or [dr. memory](http://drmemory.org/). This
option can be enabled by adding the entry `memcheck` in the `General` section of
the configuration file, which specifies the command to invoke the memory
checking tool. The test suite will then prefix **all** commands with the
specified command.
For example this configuration file:
``` ini
[General]
timeout: 0.1
memcheck: valgrind --quiet
```
will result in every command specified in the test cases being run as `valgrind
--quiet $command`.
When running your test cases under a memory checker, please take the following
into account:
- valgrind and dr. memory slow the program execution down by a factor of
10-20. Therefore the test suite will increase the timeout value by a factor of
20 or by the value specified in the option `memcheck_timeout_penalty` in the
`General` section.
- valgrind reports by default on success to stderr, be sure to run it with
`--quiet`. Otherwise successful tests will fail under valgrind, as unexpected
output is present on stderr
- valgrind and ASAN cannot be used together
- Although the option is called `memcheck`, it can be used to execute all
commands via a wrapper that has a completely different purpose (e.g. to
collect test coverage).
### Manually expanding variables in strings
In case completely custom checks have to be run but one still wants to access
@ -562,7 +705,9 @@ python3 runner.py
One can supply the script with a directory where the suite should look for the
tests (it will search the directory recursively). If omitted, the runner will
look in the directory where the configuration file is located.
look in the directory where the configuration file is located. It is also
possible to instead pass a file as the parameter, the test suite will then only
run the tests from this file.
The runner script also supports the optional arguments `--config_file` which
allows to provide a different test suite configuration file than the default

@ -31,9 +31,10 @@ if __name__ == '__main__':
)
parser.add_argument(
"dir",
help="directory where the test are searched for (defaults to the config"
"file's location)",
"dir_or_file",
help="root directory under which the testsuite searches for tests or a"
"single file which tests are run (defaults to the config file's"
"location)",
default=None,
type=str,
nargs='?'
@ -41,12 +42,30 @@ if __name__ == '__main__':
args = parser.parse_args()
conf_file = args.config_file[0]
discovery_root = os.path.dirname(conf_file if args.dir is None else args.dir)
system_tests.set_debug_mode(args.debug)
DEFAULT_ROOT = os.path.abspath(os.path.dirname(conf_file))
system_tests.set_debug_mode(args.debug)
system_tests.configure_suite(conf_file)
discovered_tests = unittest.TestLoader().discover(discovery_root)
if args.dir_or_file is None or os.path.isdir(args.dir_or_file):
discovered_tests = unittest.defaultTestLoader.discover(
args.dir_or_file or DEFAULT_ROOT
)
elif os.path.isfile(args.dir_or_file):
discovered_tests = unittest.defaultTestLoader.discover(
os.path.dirname(args.dir_or_file),
pattern=os.path.split(args.dir_or_file)[1],
)
else:
print(
"WARNING: Invalid search location, falling back to {!s}"
.format(DEFAULT_ROOT),
file=sys.stderr
)
discovered_tests = unittest.defaultTestLoader.discover(
DEFAULT_ROOT
)
test_res = unittest.runner.TextTestRunner(verbosity=args.verbose)\
.run(discovered_tests)

@ -1,9 +1,11 @@
[General]
timeout: 1
memcheck: ${ENV:valgrind}
[ENV]
exiv2_path: EXIV2_BINDIR
binary_extension: EXIV2_EXT
valgrind: EXIV2_VALGRIND
[ENV fallback]
exiv2_path: ../build/bin

@ -35,14 +35,15 @@ else:
def _disjoint_dict_merge(d1, d2):
"""
Merges two dictionaries with no common keys together and returns the result.
Merges two dictionaries whose keys are disjoint sets and returns the
resulting dictionary:
>>> d1 = {"a": 1}
>>> d2 = {"b": 2, "c": 3}
>>> _disjoint_dict_merge(d1, d2) == {"a": 1, "b": 2, "c": 3}
True
Calling this function with dictionaries that share keys raises a ValueError:
Passing dictionaries that share keys raises a ValueError:
>>> _disjoint_dict_merge({"a": 1, "b": 6}, {"b": 2, "a": 3})
Traceback (most recent call last):
..
@ -85,6 +86,8 @@ class CasePreservingConfigParser(configparser.ConfigParser):
#: global parameters extracted from the test suite's configuration file
_parameters = {}
#: variables extracted from the test suite's configuration file
_config_variables = {}
#: setting whether debug mode is enabled or not
_debug_mode = False
@ -112,12 +115,12 @@ def configure_suite(config_file):
3. extract the environment variables given in the ``ENV`` section
4. save all entries from the ``variables`` section in the global
datastructure
5. interpret all entries in the ``paths`` section as relative paths from the
configuration file, expand them to absolute paths and save them in the
global datastructure
5. interpret all entries in the ``paths`` section as relative paths from
the configuration file, expand them to absolute paths and save them in
the global datastructure
For further information concerning the rationale behind this, please consult
the documentation in ``doc.md``.
For further information concerning the rationale behind this, please
consult the documentation in ``doc.md``.
"""
if not os.path.exists(config_file):
@ -134,11 +137,10 @@ def configure_suite(config_file):
config.read(config_file)
_parameters["suite_root"] = os.path.split(os.path.abspath(config_file))[0]
_parameters["timeout"] = config.getfloat("General", "timeout", fallback=1.0)
if 'variables' in config and 'paths' in config:
intersecting_keys = set(config["paths"].keys())\
.intersection(set(config["variables"].keys()))
intersecting_keys = set(config["paths"].keys()) \
.intersection(set(config["variables"].keys()))
if len(intersecting_keys) > 0:
raise ValueError(
"The sections 'paths' and 'variables' must not share keys, "
@ -159,7 +161,7 @@ def configure_suite(config_file):
if 'variables' in config:
for key in config['variables']:
_parameters[key] = config['variables'][key]
_config_variables[key] = config['variables'][key]
if 'paths' in config:
for key in config['paths']:
@ -175,13 +177,24 @@ def configure_suite(config_file):
abspath=abs_path,
rel=rel_path)
)
_parameters[key] = abs_path
_config_variables[key] = abs_path
for key in _parameters:
for key in _config_variables:
if key in globals():
raise ValueError("Variable name {!s} already used.")
globals()[key] = _parameters[key]
globals()[key] = _config_variables[key]
_parameters["timeout"] = config.getfloat(
"General", "timeout", fallback=1.0
)
if 'memcheck' in config['General']:
if config['General']['memcheck'] != '':
_parameters['memcheck'] = config['General']['memcheck']
_parameters["timeout"] *= config.getfloat(
"General", "memcheck_timeout_penalty", fallback=20.0
)
class FileDecoratorBase(object):
@ -494,32 +507,40 @@ def path(path_string):
def test_run(self):
"""
This function reads in the members commands, retval, stdout, stderr and runs
the `expand_variables` function on each. The resulting commands are then run
using the subprocess module and compared against the expected values that
were provided in the static members via `compare_stdout` and
`compare_stderr`. Furthermore a threading.Timer is used to abort the
This function reads in the attributes commands, retval, stdout, stderr,
stdin and runs the `expand_variables` function on each. The resulting
commands are then run using the subprocess module and compared against the
expected values that were provided in the attributes via `compare_stdout`
and `compare_stderr`. Furthermore a threading.Timer is used to abort the
execution if a configured timeout is reached.
It is automatically added as a member function to each system test by the
CaseMeta metaclass. This ensures that it is run by each system test
**after** setUp() and setUpClass() were run.
This function is automatically added as a member function to each system
test by the CaseMeta metaclass. This ensures that it is run by each system
test **after** setUp() and setUpClass() were run.
"""
if not (len(self.commands) == len(self.retval)
== len(self.stdout) == len(self.stderr)):
== len(self.stdout) == len(self.stderr) == len(self.stdin)):
raise ValueError(
"commands, retval, stdout and stderr don't have the same length"
)
for i, command, retval, stdout, stderr in zip(range(len(self.commands)),
self.commands,
self.retval,
self.stdout,
self.stderr):
command, retval, stdout, stderr = map(
self.expand_variables, [command, retval, stdout, stderr]
"commands, retval, stdout, stderr and stdin don't have the same "
"length"
)
for i, command, retval, stdout, stderr, stdin in \
zip(range(len(self.commands)),
self.commands,
self.retval,
self.stdout,
self.stderr,
self.stdin):
command, retval, stdout, stderr, stdin = [
self.expand_variables(var) for var in
(command, retval, stdout, stderr, stdin)
]
retval = int(retval)
timeout = {"flag": False}
if "memcheck" in _parameters:
command = _parameters["memcheck"] + " " + command
if _debug_mode:
print(
@ -533,41 +554,71 @@ def test_run(self):
_cmd_splitter(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE if stdin is not None else None,
env=self._get_env(),
cwd=self.work_dir,
shell=_SUBPROCESS_SHELL
)
def timeout_reached(timeout):
timeout["flag"] = True
# Setup a threading.Timer which will terminate the command if it takes
# too long. Don't use the timeout parameter in subprocess.Popen, since
# that is not available for all Python 3 versions.
# Use a dictionary to indicate a timeout, as booleans get passed by
# value and the changes made timeout_reached function will not be
# visible once it exits (the command will still be terminated once the
# timeout expires).
timeout = {"flag": False}
def timeout_reached(tmout):
tmout["flag"] = True
proc.kill()
t = threading.Timer(
_parameters["timeout"], timeout_reached, args=[timeout]
)
def get_encode_err():
""" Return an error message indicating that the encoding of stdin
failed.
"""
return "Could not encode stdin {!s} for the command {!s} with the"\
" following encodings: {!s}"\
.format(stdin, command, ','.join(self.encodings))
# Prepare stdin: try to encode it or keep it at None if it was not
# provided
encoded_stdin = None
if stdin is not None:
encoded_stdin = self._encode(
stdin, lambda data_in, encoding: data_in.encode(encoding),
get_encode_err
)
if _debug_mode:
print('', "stdin:", stdin or "", sep='\n')
t.start()
got_stdout, got_stderr = proc.communicate()
got_stdout, got_stderr = proc.communicate(input=encoded_stdin)
t.cancel()
processed_stdout = None
processed_stderr = None
for encoding in self.encodings:
try:
processed_stdout = _process_output_post(
got_stdout.decode(encoding)
)
processed_stderr = _process_output_post(
got_stderr.decode(encoding)
)
except UnicodeError:
pass
else:
break
if processed_stdout is None or processed_stderr is None:
raise ValueError(
"Could not decode the output of the command '{!s}' with the "
"following encodings: {!s}"
def get_decode_error():
""" Return an error indicating the the decoding of stdout/stderr
failed.
"""
return "Could not decode the output of the command '{!s}' with "\
"the following encodings: {!s}"\
.format(command, ','.join(self.encodings))
)
def decode_output(data_in, encoding):
""" Decode stdout/stderr, consider platform dependent line
endings.
"""
return _process_output_post(data_in.decode(encoding))
processed_stdout, processed_stderr = [
self._encode(output, decode_output, get_decode_error)
for output in (got_stdout, got_stderr)
]
if _debug_mode:
print(
@ -605,14 +656,76 @@ class Case(unittest.TestCase):
#: the first encoding that does not raise a UnicodeError is used
encodings = ['utf-8', 'iso-8859-1']
inherit_env = True
@classmethod
def setUpClass(cls):
"""
This function adds the variable work_dir to the class, which is the path
to the directory where the python source file is located.
This function adds the variable work_dir to the class, which is the
path to the directory where the python source file is located.
"""
cls.work_dir = os.path.dirname(inspect.getfile(cls))
def _get_env(self):
""" Return an appropriate env value for subprocess.Popen.
This function returns either an appropriately populated dictionary or
None (the latter if this class has no attribute env). If a dictionary
is returned, then it will be either exactly self.env (when inherit_env
is False) or a copy of the current environment merged with self.env
(the values from self.env take precedence).
"""
if not hasattr(self, "env"):
return None
if not self.inherit_env:
return self.env
env_copy = os.environ.copy()
for key in self.env:
env_copy[key] = self.env[key]
return env_copy
def _encode(self, data_in, encode_action, get_err):
"""
Try to convert data_in via encode_action using the encodings in
self.encodings.
This function tries all encodings in self.encodings to run
encode_action with the parameters (data_in, encoding). If encode_action
raises a UnicodeError, the next encoding is used, otherwise the result
of encode_action is returned. If an encoding is equal to the type
bytes, then data_in is returned unmodified.
If all encodings result in a UnicodeError, then the conversion is
considered unsuccessful and get_err() is called to obtain an error
string which is raised as a ValueError.
"""
result = None
for encoding in self.encodings:
if encoding == bytes:
return data_in
try:
result = encode_action(data_in, encoding)
except UnicodeError:
pass
else:
break
if result is None:
raise ValueError(get_err())
return result
def _compare_output(self, i, command, got, expected, msg=None):
""" Compares the expected and actual output of a test case. """
if isinstance(got, bytes):
self.assertEqual(got, expected, msg=msg)
else:
self.assertMultiLineEqual(
expected, got, msg=msg
)
def compare_stdout(self, i, command, got_stdout, expected_stdout):
"""
Function to compare whether the expected & obtained stdout match.
@ -625,28 +738,37 @@ class Case(unittest.TestCase):
platform so that lines always end with \n
expected_stdout - the expected stdout extracted from self.stdout
The default implementation simply uses assertMultiLineEqual from
unittest.TestCase. This function can be overridden in a child class to
implement a custom check.
The default implementation uses assertMultiLineEqual from
unittest.TestCase for ordinary strings and assertEqual for binary
output. This function can be overridden in a child class to implement a
custom check.
"""
self.assertMultiLineEqual(
expected_stdout, got_stdout, msg="Standard output does not match"
self._compare_output(
i, command, expected_stdout, got_stdout,
msg="Standard output does not match"
)
def compare_stderr(self, i, command, got_stderr, expected_stderr):
""" Same as compare_stdout only for standard-error. """
self.assertMultiLineEqual(
expected_stderr, got_stderr, msg="Standard error does not match"
self._compare_output(
i, command, expected_stderr, got_stderr,
msg="Standard error does not match"
)
def expand_variables(self, unexpanded_string):
"""
Expands all variables of the form ``$var`` in the given string using the
dictionary `variable_dict`.
Expands all variables of the form ``$var`` in the given string using
the dictionary `variable_dict`.
The expansion itself is performed by the string's template module using
via `safe_substitute`.
the function `safe_substitute`.
If unexpanded_string is of the type bytes, then no expansion is
performed.
"""
if isinstance(unexpanded_string, bytes) or unexpanded_string is None:
return unexpanded_string
return string.Template(str(unexpanded_string))\
.safe_substitute(**self.variable_dict)
@ -686,21 +808,24 @@ class CaseMeta(type):
1. Add the `test_run` function as a member of the test class
2. Add the `Case` class as the parent class
3. Expand all variables already defined in the class, so that any additional
code does not have to perform this task
3. Expand all variables already defined in the class, so that any
additional code does not have to perform this task
Using a metaclass instead of inheriting from case has the advantage, that we
can expand all variables in the strings before any test case or even the
Using a metaclass instead of inheriting from Case has the advantage, that
we can expand all variables in the strings before any test case or even the
class constructor is run! Thus users will immediately see the expanded
result. Also adding the `test_run` function as a direct member and not via
inheritance enforces that it is being run **after** the test cases setUp &
setUpClass (which oddly enough seems not to be the case in the unittest
module where test functions of the parent class run before setUpClass of the
child class).
module where test functions of the parent class run before setUpClass of
the child class).
"""
def __new__(mcs, clsname, bases, dct):
assert len(_parameters) != 0, \
"Internal error: substitution dictionary not populated"
changed = True
# expand all non-private variables by brute force
@ -717,13 +842,13 @@ class CaseMeta(type):
# only try expanding strings and lists
if isinstance(old_value, str):
new_value = string.Template(old_value).safe_substitute(
**_disjoint_dict_merge(dct, _parameters)
**_disjoint_dict_merge(dct, _config_variables)
)
elif isinstance(old_value, list):
# do not try to expand anything but strings in the list
new_value = [
string.Template(elem).safe_substitute(
**_disjoint_dict_merge(dct, _parameters)
**_disjoint_dict_merge(dct, _config_variables)
)
if isinstance(elem, str) else elem
for elem in old_value
@ -735,14 +860,31 @@ class CaseMeta(type):
changed = True
dct[key] = new_value
dct['variable_dict'] = _disjoint_dict_merge(dct, _parameters)
dct['variable_dict'] = _disjoint_dict_merge(dct, _config_variables)
dct['test_run'] = test_run
if Case not in bases:
bases += (Case,)
CaseMeta.add_default_values(clsname, dct)
return super(CaseMeta, mcs).__new__(mcs, clsname, bases, dct)
@staticmethod
def add_default_values(clsname, dct):
if 'commands' not in dct:
raise ValueError(
"No member 'commands' in class {!s}.".format(clsname)
)
cmd_length = len(dct['commands'])
for member, default in zip(
('stderr', 'stdout', 'stdin', 'retval'),
('', '', None, 0)):
if member not in dct:
dct[member] = [default] * cmd_length
def check_no_ASAN_UBSAN_errors(self, i, command, got_stderr, expected_stderr):
"""

Loading…
Cancel
Save