12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523 |
- # coding=utf-8
-
- import collections
- import functools
- import csv
- import difflib
- import hashlib
- import inspect
- import itertools
- import json
- import operator
- import time
- from copy import deepcopy
-
-
- # ########################################################################### #
- # ## The Motor ## #
- # ########################################################################### #
-
- def regression_test(argsrc, tests, driver_settings=None, cleanup_hack=None,
- apply_hacks=None, on_next=None):
- """Perform regression test with argsets from `argsrc`.
-
- For each argset pulled from source, performs one comparison
- per driver pair in `tests`, which is list of tuples with
- comparison function and pair of test driver classes: `(operator,
- oracle_class, result_class)`. (The classes are assumed to
- be sub-classes of `hoover.BaseTestDriver`.)
-
- `driver_settings` is a dictionary supposed to hold environmental
- values for all the drivers, the keys having form "DriverName.
- settingName". Each driver is then instantiated with this
- dict, and gets a copy of the dict with settings only intended
- for itself (and the "DriverName" part stripped).
-
- If comparison fails, report is generated using `hoover.jsDiff()`,
- and along with affected arguments stored in `hoover.Tracker`
- instance, which is finally used as a return value. This instance
- then contains method for basic stats as well as method to format
- the final report and a helper method to export argument sets
- as a CSV files.
-
- Supports hacks, which are a data transformations performed by
- `hoover.TinyCase` class and are intended to avoid known bugs
- and anomalies (`apply_hacks`) or clean up data structures of
- irrelevant data (`cleanup_hack`, performed only if the comparison
- function provided along with driver pair is not "equals").
-
- A function can be provided as `on_next` argument, that will be
- called after pulling each argument set, with last argument set
- (or `None`) as first argument and current one as second argument.
- """
-
- # TODO: do not parse driver_settings thousands of times (use a view class?)
-
- on_next = on_next if on_next else lambda a, b: None
- apply_hacks = apply_hacks if apply_hacks else []
- driver_settings = driver_settings if driver_settings else {}
-
- tracker = Tracker()
- last_argset = None
-
- all_classes = set(functools.reduce(
- lambda a, b: a+b,
- [triple[1:] for triple in tests]
- ))
-
- counter = StatCounter()
-
- for argset in argsrc:
-
- on_start = time.time()
- on_next(argset, last_argset)
- counter.add('on_next', time.time() - on_start)
-
- # # load the data first, only once for each driver
- #
- data = {}
- for aclass in all_classes:
- try:
- aclass.check_values(argset)
- except NotImplementedError: # let them bail out
- counter.count_for(aclass, 'bailouts')
- else:
- data[aclass], duration, overhead = get_data_and_stats(
- aclass, argset, driver_settings)
- counter.count_for(aclass, 'calls')
- counter.add_for(aclass, 'duration', duration)
- counter.add_for(aclass, 'overhead', overhead)
-
- for match_op, oclass, rclass in tests:
-
- # skip test if one of classes bailed out on the argset
- if oclass not in data or rclass not in data:
- continue
-
- diff = None
-
- case = TinyCase({
- 'argset': argset,
- 'oracle': deepcopy(data[oclass]),
- 'result': deepcopy(data[rclass]),
- 'oname': oclass.__name__,
- 'rname': rclass.__name__
- })
-
- hacks_done = sum([case.hack(h) for h in apply_hacks])
- counter.add_for(oclass, 'ohacks', hacks_done)
- counter.add_for(rclass, 'rhacks', hacks_done)
- counter.add('hacks', hacks_done)
- counter.add('hacked_cases', (1 if hacks_done else 0))
-
- if not match_op(case['oracle'], case['result']):
-
- # try to clean up so that normally ignored items
- # do not clutter up the report
- if not match_op == operator.eq:
- case.hack(cleanup_hack)
- # but panic if that "removed" the error condition
- if match_op(case['oracle'], case['result']):
- raise RuntimeError("cleanup ate error")
-
- diff = jsDiff(dira=case['oracle'],
- dirb=case['result'],
- namea=case['oname'],
- nameb=case['rname'],
- chara='o',
- charb='r')
-
- tracker.update(diff, argset)
-
- counter.count('cases')
-
- tracker.argsets_done += 1
- last_argset = argset
-
- counter.count('argsets')
-
- tracker.driver_stats = counter.all_stats()
- return tracker
-
-
- def get_data_and_stats(driverClass, argset, driver_settings, only_own=False):
- """
- Run single test, return data and stats.
- """
- start = time.time()
- d = driverClass()
- d.setup(driver_settings, only_own=only_own)
- d.run(argset)
- return (d.data, d.duration, time.time() - d.duration - start)
-
-
- def get_data(driverClass, argset, driver_settings, only_own=False):
- """
- Run single test, return data only.
- """
- d = driverClass()
- d.setup(driver_settings, only_own=only_own)
- d.run(argset)
- return d.data
-
-
- # ########################################################################### #
- # ## The Pattern ## #
- # ########################################################################### #
-
- class _BaseRuleOp:
-
- def __init__(self, items, item_ok):
- self._items = items
- self._item_ok = item_ok
-
- def _eval(self, item):
- try: # it's a pattern! (recurse)
- return RuleOp.Match(item, self._item_ok)
- except ValueError: # no, it's something else...
- return self._item_ok(item)
-
- def __bool__(self):
- try:
- return self._match()
- except TypeError:
- raise ValueError("items must be an iterable: %r" % self._items)
-
-
- class RuleOp:
-
- class ALL(_BaseRuleOp):
-
- def _match(self):
- return all(self._eval(item) for item in self._items)
-
- class ANY(_BaseRuleOp):
-
- def _match(self):
- return any(self._eval(item) for item in self._items)
-
- @staticmethod
- def Match(pattern, item_ok):
- """
- Evaluate set of logically structured patterns using passed function.
-
- *pattern* must be a tuple in form of `(op, items)` where *op* can be
- either `RuleOp.ALL` or `RuleOp.ANY` and *items* is a list of items
- to check using *item_ok* function.
-
- *item_ok* is a function that accepts single argument and its return
- value is evaluated for true-ness.
-
- Final result is True or False and is computed by combining results
- of individual *item_ok* calls: either all must be true (when `op
- == RuleOp.ALL`) or at least one must be true (when `op == RuleOp.ANY`).
-
- The evaluation is done recursively, that is, if an item in the pattern
- is also a pattern itself, it will be evaluated by calling RuleOp.Match
- and passing the same *item_ok* function.
-
- Note that there is no data to evaluate "against", you can use closure
- if you need to do that.
- """
-
- try:
- op, items = pattern
- except TypeError:
- raise ValueError("pattern is not a tuple: %r" % pattern)
- if type(op) is not type:
- raise ValueError("invalid operator: %r" % op)
- if not issubclass(op, _BaseRuleOp):
- raise ValueError("invalid operator class: %s" % op.__name__)
- return bool(op(items, item_ok))
-
-
- # ########################################################################### #
- # ## The Path ## #
- # ########################################################################### #
-
- class DictPath:
- """Mixin that adds "path-like" behavior to the top dict of dicts.
-
- Use this class as a mixin for a deep dictionary-like structure in order to
- access the elements using a Unix-like path. For example:
-
- MyData(dict, DictPath):
- pass
-
- d = MyData({
- 'name': 'Joe',
- 'age': 34,
- 'ssn': {
- 'number': '012 345 678',
- 'expires': '10-01-16',
- },
- })
-
- print ("%s's ssn number %s will expire on %s"
- % (d.getpath('/name'),
- d.getpath('/ssn/number'),
- d.getpath('/ssn/expiry')))
- # joe's ssn number 012 345 678 will expire 10-01-16
- """
-
- DIV = "/"
-
- class Path:
-
- def __init__(self, path, div):
- self.DIV = div
- self._path = path
-
- def _validate(self):
- try:
- has_root = self._path.startswith(self.DIV)
- except AttributeError:
- raise ValueError("invalid path: not a string: %r" % self._path)
- if not has_root:
- raise ValueError("invalid path: missing root: %r" % self._path)
-
- def stripped(self):
- return self._path.lstrip(self.DIV)
-
- @classmethod
- def __s2path(cls, path):
- return cls.Path(path, cls.DIV)
-
- @classmethod
- def __err_path_not_found(cls, path):
- raise KeyError("path not found: %s" % path)
-
- @classmethod
- def __getitem(cls, dct, key):
- if cls.DIV in key:
- frag, rest = key.split(cls.DIV, 1)
- subdct = dct[frag]
- result = cls.__getitem(subdct, rest)
- else:
- result = dct[key]
- return result
-
- @classmethod
- def __setitem(cls, dct, key, value):
- if cls.DIV not in key:
- dct[key] = value
- else:
- frag, rest = key.split(cls.DIV, 1)
- subdct = dct[frag]
- cls.__setitem(subdct, rest, value)
-
- @classmethod
- def __delitem(cls, dct, key):
- if cls.DIV not in key:
- del dct[key]
- else:
- frag, rest = key.split(cls.DIV, 1)
- subdct = dct[frag]
- return cls.__delitem(subdct, rest)
-
- # # public methods
- #
-
- def getpath(self, path):
- try:
- return self.__getitem(self, self.__s2path(path).stripped())
- except (TypeError, KeyError):
- self.__err_path_not_found(path)
-
- def setpath(self, path, value):
- try:
- self.__setitem(self, self.__s2path(path).stripped(), value)
- except (TypeError, KeyError):
- self.__err_path_not_found(path)
-
- def delpath(self, path):
- try:
- self.__delitem(self, self.__s2path(path).stripped())
- except (TypeError, KeyError):
- self.__err_path_not_found(path)
-
- def ispath(self, path):
- try:
- self.getpath(path)
- return True
- except KeyError:
- return False
-
-
- # ########################################################################### #
- # ## The Case ## #
- # ########################################################################### #
-
- class TinyCase(dict, DictPath):
- """Test case for hoover.
-
- This class is used as an intermediary container for test parameters,
- oracles and test results. This is to allow post-test transformations
- ("hacks") to happen before the result is evaluated for pass/fail.
-
- Instantiate TinyCase with data (self) in following format:
-
- {
- 'argset': {}, # argset as fed into `BaseTestDriver.run`
- 'oracle': {}, # data as returned from oracle driver's `run()`
- 'result': {}, # data as returned from result driver's `run()`
- 'oname': "", # name of oracle driver's class
- 'rname': "" # name of result driver's class
- }
-
- Then call TinyCase.hack() with a set of rules which can alter oracles,
- results or both based on the data stored in TinyCase.
-
- Typical use cases for 'hacks' are:
-
- * avoid known and tracked bugs,
- * help normalize results (remove irrelevant details),
- * solve certain limitations in oracle machines.
-
- Note that while for most tests, you should strive for zero hacks,
- sometimes they are inevitable. In such cases, number of hacks can
- be a useful quality metric. For that reason, 'hoover.regression_test'
- will count the applied hacks and return it in the test report.
- """
-
- def a_exchange(self, action):
- """Exchange value A for value B.
-
- Expects a dict, where key is a tuple of two values `(a, b)` and
- value is a list of paths. For each key, it goes through the
- paths and if the value equals `a` it is set to `b`.
- """
- for (oldv, newv), paths in action.items():
- for path in paths:
- try:
- curv = self.getpath(path)
- except KeyError:
- continue
- else:
- if curv == oldv:
- self.setpath(path, newv)
-
- def a_format_str(self, action):
- """Convert value to a string using format string.
-
- Expects a dict, where key is a format string, and value is a list
- of paths. For each record, the paths are traversed, and value is
- converted to string using the format string and the `%` operator.
-
- This is especially useful for floats which you may want to trim
- before comparison, since direct comparison of floats is unreliable
- on some architectures.
- """
- for fmt, paths in action.items():
- for path in paths:
- if self.ispath(path):
- new = fmt % self.getpath(path)
- self.setpath(path, new)
-
- def a_even_up(self, action):
- """Even up structure of both dictionaries.
-
- Expects a list of two-element tuples `('/dict/a', '/dict/b')`
- containing pairs of path do simple dictionaries.
-
- Then the two dicts are altered to have same structure: if a key
- in dict "a" is missing in dict "b", it is set to `None` in "b" and
- vice-versa,
- """
- for patha, pathb in action:
- try:
- a = self.getpath(patha)
- b = self.getpath(pathb)
- except KeyError:
- continue
- else:
- for key in set(a.keys()) | set(b.keys()):
- if key in a and key in b:
- pass # nothing to do here
- elif key in a and a[key] is None:
- b[key] = None
- elif key in b and b[key] is None:
- a[key] = None
- else:
- pass # bailout: odd key but value is *not* None
-
- def a_remove(self, action):
- """Remove elements from structure.
-
- Expects a simple list of paths that are simply deleted fro, the
- structure.
- """
- for path in action:
- if self.ispath(path):
- self.delpath(path)
-
- def a_round(self, action):
- """Round a (presumably) float using tha `float()` built-in.
-
- Expects dict with precision (ndigits, after the dot) as a key and
- list of paths as value.
- """
- for ndigits, paths in action.items():
- for path in paths:
- try:
- f = self.getpath(path)
- except KeyError:
- pass
- else:
- self.setpath(path, round(f, ndigits))
-
- known_actions = {'remove': a_remove,
- 'even_up': a_even_up,
- 'format_str': a_format_str,
- 'exchange': a_exchange,
- 'round': a_round}
-
- def hack(self, ruleset):
- """
- Run any matching actions in the *ruleset*.
-
- Each rule must be in in a following form:
-
- {
- 'drivers': [{}], # list of structures to match
- # against self
- 'argsets': [{}], # -ditto-
- <action_name>: <Arg> # an action name with argument
- <action_name>: <Arg> # another action...
- }
-
- Each of the rules is first evaluated for match (does it apply to this
- TinyCase?), and if the rule applies, transformation is done. The
- transformation is defined by `<action_name>: <Arg>` pairs and it can
- alter 'oracle', 'result' or both.
-
- The match evaluation is done using `hoover.dataMatch()` -- this is
- basically a recursive pattern match against 'drivers' and 'argsets'.
- Both 'drivers' and 'argsets' are optional, but when specified, all
- items must must match in order for the rule to apply. (If 'drivers'
- and 'argsets' are both missing or empty, rule will apply to each and
- all test cases.)
-
- If rule does not match, `TinyCase.hack()` moves on to next one.
-
- If a rule does match, `TinyCase.hack()` will look for actions defined
- in it. Action consists of action name (key of the rule dictionary,
- <action_name>) and an argument (<Arg>).
-
- Action name must be one of: 'remove', 'even_up', 'format_str',
- 'exchange' or 'round'. Each action corresponds to a TinyCase method
- prefixed by 'a_'; for example 'even_up' action corresponds to
- TinyCase.a_even_up method. Each action expects different argument
- so see the corresponding method docstrings.
-
- Because 'oracle' and 'result' can be relatively complex structures,
- actions accept Unix-like paths to specify elements inside them.
- The "root" of the path is the TinyCase instance, and "directories"
- are keys under it. For example, following would be valid paths
- if test drivers work with dictionaries such as `{'temperature': 50,
- 'stats': {'word_count': 15}}`:
-
- /oracle/temperature
- /result/stats/word_count
-
- Warning: All actions will silently ignore any paths that are invalid
- or leading to non-existent data!
- (This does not apply to a path leading to `None`.)
- """
-
- def driver_matches(rule):
- if 'drivers' not in rule:
- return True
- else:
- return any(dataMatch(p, self)
- for p in rule['drivers'])
-
- def argset_matches(rule):
- if 'argsets' not in rule:
- return True
- else:
- return any(dataMatch(p, self)
- for p in rule['argsets'])
-
- matched = False
- cls = self.__class__
- for rule in ruleset:
- if driver_matches(rule) and argset_matches(rule):
- matched = True
- for action_name in cls.known_actions:
- if action_name in rule:
- cls.known_actions[action_name](self, rule[action_name])
- return matched
-
-
- # ########################################################################### #
- # ## Drivers ## #
- # ########################################################################### #
-
- class DriverError(Exception):
- """Error encountered when obtaining driver data"""
-
- def __init__(self, message, driver):
- self.message = message
- self.driver = driver
-
- def __str__(self):
-
- result = ("\n\n"
- " type: %s\n"
- " message: %s\n"
- " driver: %s\n"
- " args: %s\n"
- " settings: %s\n"
- % (self.message.__class__.__name__,
- self.message,
- self.driver.__class__.__name__,
- self.driver._args,
- self.driver._settings))
-
- return result
-
-
- class DriverDataError(Exception):
- """Error encountered when decoding or normalizing driver data"""
-
- def __init__(self, exception, driver):
- self.exception = exception
- self.driver = driver
-
- def __str__(self):
-
- result = ("%s: %s\n"
- " class: %s\n"
- " args: %s\n"
- " data: %s\n"
- % (self.exception.__class__.__name__, self.exception,
- self.driver.__class__.__name__,
- json.dumps(self.driver._args, sort_keys=True, indent=4),
- json.dumps(self.driver.data, sort_keys=True, indent=4)))
- return result
-
-
- class BaseTestDriver:
- """Base class for test drivers used by `hoover.regression_test` and others.
-
- This class tepresents test driver and can be used to:
-
- * Wrap system under test (SUT).
-
- Provide simple interface to set up, sandbox and activate the system
- and collect any relevant results. This can be merely return value
- (purely functional test) but also other characteristics such as
- time to complete.
-
- * Mimic ("mock") the system under test.
-
- Also called as oracle machine, this can be used to predict expected
- behavior of SUT under given parameters.
-
- * Wrap an alternative implementation of SUT.
-
- As a special case of the previous role, sometimes it's desirable to
- use an alternative implementation of SUT as oracle machine. This
- can be a legacy implementation, reference implementation or other
- platform implementation.
-
- In either case, the driver makes sure that any input arguments are
- interpreted (and passed on) correctly and any results are returned in
- a consistent way.
-
- To use this class, sub-class it and implement `_get_data()` method.
- Tge `_get_data()` method must:
-
- * Accept single argument; this contains arguments to the SUT.
-
- If using `hoover.regression_test()`, this value will be retrieved
- from the *argsrc* iterator.
-
- * Implement the test case defined by the argument set.
-
- The implementation can either be a wrapper to real SUT, alternative
- one, or can be an oracle machine -- i.e. it can figure out the result
- on its own. Note that this can be much easier as it sounds, given
- that you can "cheat" by crafting the set of test cases so that the
- prediction is easy (but still effective at hitting bugs), or you
- can "hide the answer" in the *args* itself, and define set of
- test cases statically in form of "question, answer" pairs.
-
- * Collect any relevant data and set it to `data` property.
-
- Optionally, you can also set `duration` property (in fractional
- seconds, as returned by standard time module). If you don't
- it will be automatically measured.
-
- Any exception from the *_get_data* method will be re-raised as
- DriverError.
-
- Optionally, you can:
-
- * Implement *__init__* method calling base __init__ and setting more
- properties:
-
- * `self._mandatory_args`, a list of keys that need to be present
- in `args` argument to `run()`
-
- * and `self._mandatory_settings`, a list of keys that need to be
- present in the `settings` argument to `__init__`
-
- * Implement methods
-
- * `_decode_data` and `_normalize_data`, which are intended to decode
- the data from any raw format it is received, and to prepare it
- for comparison in test,
-
- * and `_check_data`, to allow for early detection of failure,
-
- from which any exception is re-raised as a DriverDataError with
- some additional info
-
- * set "bailouts", a list of functions which, when passed "args"
- argument, return true to indicate that driver is not able to
- process these values (see below for explanation). If any of
- these functions returns true, NotImplementedError is raised.
-
- The expected workflow when using the driver is:
-
- # 1. sub-class hoover.BaseTestDriver
- # 2. prepare settings and args
- MyDriver.check_values(args) # optional, to force bailouts ASAP
- d = MyDriver()
- d.setup(settings)
- d.run(args)
- assert d.data, "no data" # evaluate the result...
- assert d.duration < 1 # duration of _get_data in seconds
-
- Note on bailouts: Typical strategy for which the driver is intended is
- that each possible combination of `args` is exhausted, and results from
- multiple drivers are compared to evaluate if driver, i.e. system in
- question is O.K.
-
- The bailouts mechanism is useful in cases, where for a certain system,
- a valid combination of arguments would bring the same result as another,
- so there is basically no value in testing both of them.
-
- Example might be a system that does not support a binary flag and
- behaves as if it was "on": you can simply make the test driver
- accept the option but "bail out" any time it is "off", therefore
- skipping the time-and-resource-consuming test.
- """
-
- bailouts = []
-
- ##
- # internal methods
- #
-
- def __init__(self):
- self.data = {}
- self.duration = None
- self._args = {}
- self._mandatory_args = []
- self._mandatory_settings = []
- self._settings = {}
- self._setup_ok = False
-
- def __check_mandatory(self):
- """validate before run()"""
- for key in self._mandatory_args:
- assert key in self._args, "missing arg: '%s'" % key
- for key in self._mandatory_settings:
- assert key in self._settings, "missing setting: '%s'" % key
-
- def __cleanup_data(self):
- """remove hidden data; e.g. what was only there for _check_data"""
- for key in self.data:
- if key.startswith("_"):
- del self.data[key]
-
- ##
- # virtual methods
- #
-
- def _check_data(self):
- """Early check for failure"""
- pass
-
- def _decode_data(self):
- """Decode from raw data as brought by _get_data"""
- pass
-
- def _normalize_data(self):
- """Preare data for comparison (e.g. sort, split, trim...)"""
- pass
-
- ##
- # public methods
- #
-
- @classmethod
- def check_values(cls, args=None):
- """
- Check args in advance before running or setting up anything.
- """
- for fn in cls.bailouts:
- if fn(args):
- raise NotImplementedError(inspect.getsource(fn))
-
- def setup(self, settings, only_own=False):
- """
- Load settings.
-
- If *only_own* is false, *settings* are merely assigned to
- settings attribute.
-
- if *only_own* is true, settings are filtered: Any keys that don't
- begin with the prefix of driver class name and period are ignored.
- Settings that do start with this prefix are assigned to settings
- attribute with the prefix removed.
- """
- if only_own:
- for ckey in settings:
- driver_class_name, setting_name = ckey.split(".", 2)
- if self.__class__.__name__ == driver_class_name:
- self._settings[setting_name] = settings[ckey]
- else:
- self._settings = settings
- self._setup_ok = True
-
- def run(self, args):
- """
- Validate args, run SUT and store data.
- """
-
- self._args = args
- assert self._setup_ok, "run() before setup()?"
- self.__class__.check_values(self._args)
- self.__check_mandatory()
- start = time.time()
- try:
- self._get_data() # run the test, i.e. obtain raw data
- except Exception as e:
- raise DriverError(e, self)
- self.duration = (time.time() - start if self.duration is None
- else self.duration)
- try:
- self._decode_data() # decode raw data
- self._normalize_data() # normalize decoded data
- self._check_data() # perform arbitrarty checking
- except Exception as e:
- raise DriverDataError(e, self)
- self.__cleanup_data() # cleanup (remove data['_*'])
-
-
- class MockDriverTrue(BaseTestDriver):
- """A simple mock driver, always returning True"""
-
- def _get_data(self, args):
- self.data = True
-
-
- # ########################################################################### #
- # ## Helpers ## #
- # ########################################################################### #
-
- class StatCounter:
- """
- A simple counter with support for custom formulas.
- """
-
- def __init__(self):
- self.generic_stats = {}
- self.driver_stats = {}
- self.formulas = {}
- self._born = time.time()
-
- def _register(self, dname):
- self.driver_stats[dname] = {
- 'calls': 0,
- 'rhacks': 0,
- 'ohacks': 0,
- 'duration': 0,
- 'overhead': 0
- }
-
- ##
- # Formulas
- #
-
- # cumulative duration/overhead; just round to ms
- self.add_formula(dname + '_overhead',
- lambda g, d: int(1000 * d[dname]['overhead']))
- self.add_formula(dname + '_duration',
- lambda g, d: int(1000 * d[dname]['duration']))
-
- # average (per driver call) overhead/duration
- self.add_formula(
- dname + '_overhead_per_call',
- lambda g, d: int(1000 * d[dname]['overhead'] / d[dname]['calls'])
- )
- self.add_formula(
- dname + '_duration_per_call',
- lambda g, d: int(1000 * d[dname]['duration'] / d[dname]['calls'])
- )
-
- def gtotal_drivertime(g, d):
- driver_time = (sum(s['overhead'] for s in d.values())
- + sum(s['duration'] for s in d.values()))
- return int(1000 * driver_time)
-
- def gtotal_loop_overhead(g, d):
- driver_time = gtotal_drivertime(g, d)
- onnext_time = int(1000 * g['on_next'])
- age = int(1000 * (time.time() - self._born))
- return age - driver_time - onnext_time
-
- # grand totals in times: driver time, loop overhead
- self.add_formula('gtotal_drivertime', gtotal_drivertime)
- self.add_formula('gtotal_loop_overhead', gtotal_loop_overhead)
- self.add_formula('gtotal_loop_onnext',
- lambda g, d: int(1000 * g['on_next']))
-
- # average (per driver call) overhead/duration
- self.add_formula(
- 'cases_hacked',
- lambda g, d: round(100 * float(g['hacked_cases']) / g['cases'], 2)
- )
-
- def _computed_stats(self):
- computed = dict.fromkeys(self.formulas)
- for fname, fml in self.formulas.items():
- try:
- v = fml(self.generic_stats, self.driver_stats)
- except ZeroDivisionError:
- v = None
- computed[fname] = v
- return computed
-
- def add_formula(self, vname, formula):
- """
- Add a function to work with generic_stats, driver_stats.
- """
- self.formulas[vname] = formula
-
- def add(self, vname, value):
- """
- Add a value to generic stat counter.
- """
- if vname in self.generic_stats:
- self.generic_stats[vname] += value
- else:
- self.generic_stats[vname] = value
-
- def add_for(self, dclass, vname, value):
- """
- Add a value to driver stat counter.
- """
- dname = dclass.__name__
- if dname not in self.driver_stats:
- self._register(dname)
- if vname in self.driver_stats[dname]:
- self.driver_stats[dname][vname] += value
- else:
- self.driver_stats[dname][vname] = value
-
- def count(self, vname):
- """
- Alias to add(vname, 1)
- """
- self.add(vname, 1)
-
- def count_for(self, dclass, vname):
- """
- Alias to add_for(vname, 1)
- """
- self.add_for(dclass, vname, 1)
-
- def all_stats(self):
- """
- Compute stats from formulas and add them to colledted data.
- """
- stats = self.generic_stats
- for dname, dstats in self.driver_stats.items():
- for key, value in dstats.items():
- stats[dname + "_" + key] = value
- stats.update(self._computed_stats())
- return stats
-
-
- class Tracker(dict):
- """
- Error tracker to allow for usable reports from huge regression tests.
-
- Best used as a result bearer from `regression_test`, this class keeps
- a simple in-memory "database" of errors seen during the regression
- test, and implements few methods to access the data.
-
- The basic usage is:
-
- 1. Instantiate (no parameters)
-
- 2. Each time you have a result of a test, you pass it to `update()`
- method along with the argument set (as a single object, typically
- a dict) that caused the error.
-
- If boolean value of the result is False, the object is thrown away
- and nothing happens. Otherwise, its string value is used as a key
- under which the argument set is saved.
-
- The string interpretation of the result is supposed to be
- "as deterministic as possible", i.e. it should provide only
- necessary information about the error: do not include any
- timestamps or "volatile" values such as PID's, version numbers
- or tempfile names.
-
- 3. At final stage, you can retrieve statistics as how many (distinct)
- errors have been recorded, what was the duration of the whole test,
- how many times `update()` was called, etc.
-
- 4. Optionally, you can also call `format_report()` to get a nicely
- formatted report with list of arguments for each error string.
-
- 5. Since in bigger tests, argument lists can grow really large,
- complete lists are not normally printed. Instead, you can use
- `write_stats_csv()`, which will create one CSV per each error,
- named as first 7 chars of its SHA1 (inspired by Git).
-
- Note that you need to pass an existing writable folder path.
- """
-
- ##
- # internal methods
- #
-
- def __init__(self):
- self._start = time.time()
- self._db = {}
- self.tests_done = 0
- self.tests_passed = 0
- self.argsets_done = 0
- self.driver_stats = {}
-
- def _csv_fname(self, errstr, prefix):
- """
- Format name of file for this error string
- """
- return '%s/%s.csv' % (prefix, self._eid(errstr))
-
- def _eid(self, errstr):
- """
- Return EID for the error string (first 7 chars of SHA1).
- """
- return hashlib.sha1(errstr).hexdigest()[:7]
-
- def _insert(self, errstr, argset):
- """
- Insert the argset into DB.
- """
- if errstr not in self._db:
- self._db[errstr] = []
- self._db[errstr].append(argset)
-
- def _format_error(self, errstr, max_aa=0):
- """
- Format single error for output.
- """
- argsets_affected = self._db[errstr]
- num_aa = len(argsets_affected)
-
- # trim if list is too long for Jenkins
- argsets_shown = argsets_affected
- if max_aa and (num_aa > max_aa):
- div = ["[...] not showing %s cases, see %s.csv for full list"
- % (num_aa - max_aa, self._eid(errstr))]
- argsets_shown = argsets_affected[0:max_aa] + div
-
- # format error
- formatted_aa = "\n".join([str(arg) for arg in argsets_shown])
- return ("~~~ ERROR FOUND (%s) ~~~~~~~~~~~~~~~~~~~~~~~~~\n"
- "--- error string: -----------------------------------\n%s\n"
- "--- argsets affected (%d) ---------------------------\n%s\n"
- % (self._eid(errstr), errstr, num_aa, formatted_aa))
-
- ##
- # public methods
- #
-
- def errors_found(self):
- """
- Return number of non-distinct errors in db.
- """
- return bool(self._db)
-
- def format_report(self, max_aa=0):
- """
- Return complete report formatted as string.
- """
- error_list = "\n".join([self._format_error(e, max_aa=max_aa)
- for e in self._db])
- return ("Found %(total_errors)s (%(distinct_errors)s distinct) errors"
- " in %(tests_done)s tests with %(argsets)s argsets"
- " (duration: %(time)ss):"
- % self.getstats()
- + "\n\n" + error_list)
-
- def getstats(self):
- """
- Return basic and driver stats
-
- Returns dictionary with following values:
-
- 'tests_done' - how many times Tracker.update() was called
-
- 'distinct_errors' - how many distinct errors (same `str(error)`)
- were seen by Tracker.update()
-
- 'total_errors' - how many times `Tracker.update()` saw an
- error, i.e. how many argsets are in DB
-
- 'time' - how long since init (seconds)
- """
-
- def total_errors():
- return functools.reduce(
- lambda x, y: x + len(y),
- self._db.values(),
- initial=0,
- )
-
- stats = {
- "argsets": self.argsets_done,
- "tests_done": self.tests_done,
- "distinct_errors": len(self._db),
- "total_errors": total_errors(),
- "time": int(time.time() - self._start)
- }
- stats.update(self.driver_stats)
- return stats
-
- def update(self, error, argset):
- """
- Update tracker with test result.
-
- If `bool(error)` is true, it is considered error and argset
- is inserted to DB with `str(error)` as key. This allows for later
- sorting and analysis.
- """
- self.tests_done += 1
- if error:
- errstr = str(error)
- self._insert(errstr, argset)
-
- def write_stats_csv(self, fname):
- """
- Write stats to a simple one row (plus header) CSV.
- """
- stats = self.getstats()
- colnames = sorted(stats.keys())
- with open(fname, 'a') as fh:
- cw = csv.DictWriter(fh, colnames)
- cw.writerow(dict(zip(colnames, colnames))) # header
- cw.writerow(stats)
-
- def write_args_csv(self, prefix=''):
- """
- Write out a set of CSV files, one per distinctive error.
-
- Each CSV is named with error EID (first 7 chars of SHA1) and lists
- all argument sets affected by this error. This is supposed to make
- easier to further analyse impact and trigerring values of errors,
- perhaps using a table processor software.
- """
-
- def get_all_colnames():
- cn = {}
- for affected in self._db.values():
- for argset in affected:
- cn.update(dict.fromkeys(argset))
- return sorted(cn.keys())
-
- all_colnames = get_all_colnames()
-
- for errstr in self._db:
- with open(self._csv_fname(errstr, prefix), 'a') as fh:
- cw = csv.DictWriter(fh, all_colnames)
- cw.writerow(dict(zip(all_colnames, all_colnames))) # header
- for argset in self._db[errstr]:
- cw.writerow(argset)
-
-
- def dataMatch(pattern, data):
- """Check if data structure matches a pattern data structure.
-
- Supports lists, dictionaries and scalars (int, float, string).
-
- For scalars, simple `==` is used.
-
- Lists are converted to sets and "to match" means "to have a matching
- subset (e.g. `[1, 2, 3, 4]` matches `[3, 2]`).
-
- Both lists and dictionaries are matched recursively.
- """
-
- def listMatch(pattern, data):
- """
- Match list-like objects
- """
- assert all([hasattr(o, 'append') for o in [pattern, data]])
- results = []
- for pv in pattern:
- if any([dataMatch(pv, dv) for dv in data]):
- results.append(True)
- else:
- results.append(False)
- return all(results)
-
- def dictMatch(pattern, data):
- """
- Match dict-like objects
- """
- assert all([hasattr(o, 'iteritems') for o in [pattern, data]])
- results = []
- try:
- for pk, pv in pattern.items():
- results.append(dataMatch(pv, data[pk]))
- except KeyError:
- results.append(False)
- return all(results)
-
- result = None
- if pattern == data:
- result = True
- else:
- for handler in [dictMatch, listMatch]:
- try:
- result = handler(pattern, data)
- except AssertionError:
- continue
- return result
-
-
- def jsDump(data):
- """
- A human-readable JSON dump.
- """
- return json.dumps(data, sort_keys=True, indent=4,
- separators=(',', ': '))
-
-
- def jsDiff(dira, dirb, namea="A", nameb="B", chara="a", charb="b"):
- """
- JSON-based human-readable diff of two data structures.
-
- '''BETA''' version.
-
- jsDiff is based on unified diff of two human-readable JSON dumps except
- that instead of showing line numbers and context based on proximity to
- the changed lines, it prints only context important from the data
- structure point.
-
- The goal is to be able to quickly tell the story of what has changed
- where in the structure, no matter size and complexity of the data set.
-
- For example:
-
- a = {
- 'w': {1: 2, 3: 4},
- 'x': [1, 2, 3],
- 'y': [3, 1, 2]
- }
- b = {
- 'w': {1: 2, 3: 4},
- 'x': [1, 1, 3],
- 'y': [3, 1, 3]
- }
- print jsDiff(a, b)
-
- will output:
-
- aaa ~/A
- "x": [
- a 2,
- "y": [
- a 2
- bbb ~/B
- "x": [
- b 1,
- "y": [
- b 3
-
- Notice that the final output somehow resembles the traditional unified
- diff, so to avoid confusion, +/- is changed to a/b (the characters can
- be provided as well as the names A/B).
- """
-
- def compress(lines):
-
- def is_body(line):
- return line.startswith(("-", "+", " "))
-
- def is_diff(line):
- return line.startswith(("-", "+"))
-
- def is_diffA(line):
- return line.startswith("-")
-
- def is_diffB(line):
- return line.startswith("+")
-
- def is_context(line):
- return line.startswith(" ")
-
- def is_hdr(line):
- return line.startswith(("@@", "---", "+++"))
-
- def is_hdr_hunk(line):
- return line.startswith("@@")
-
- def is_hdr_A(line):
- return line.startswith("---")
-
- def is_hdr_B(line):
- return line.startswith("+++")
-
- class Level:
-
- def __init__(self, hint):
- self.hint = hint
- self.hinted = False
-
- def __str__(self):
- return str(self.hint)
-
- def get_hint(self):
- if not self.hinted:
- self.hinted = True
- return self.hint
-
- class ContextTracker:
-
- def __init__(self):
- self.trace = []
- self.last_line = None
- self.last_indent = -1
-
- def indent_of(self, line):
- meat = line[1:].lstrip(" ")
- ind = len(line) - len(meat) - 1
- return ind
-
- def check(self, line):
- indent = self.indent_of(line)
- if indent > self.last_indent:
- self.trace.append(Level(self.last_line))
- elif indent < self.last_indent:
- self.trace.pop()
- self.last_line = line
- self.last_indent = indent
-
- def get_hint(self):
- return self.trace[-1].get_hint()
-
- buffa = []
- buffb = []
- ct = ContextTracker()
-
- for line in lines:
-
- if is_hdr_hunk(line):
- continue
- elif is_hdr_A(line):
- line = line.replace("---", chara * 3, 1)
- buffa.insert(0, line)
- elif is_hdr_B(line):
- line = line.replace("+++", charb * 3, 1)
- buffb.insert(0, line)
-
- elif is_body(line):
-
- ct.check(line)
-
- if is_diff(line):
- hint = ct.get_hint()
- if hint:
- buffa.append(hint)
- buffb.append(hint)
-
- if is_diffA(line):
- line = line.replace("-", chara, 1)
- buffa.append(line)
-
- elif is_diffB(line):
- line = line.replace("+", charb, 1)
- buffb.append(line)
-
- else:
- raise AssertionError("difflib.unified_diff emitted"
- " unknown format (%s chars):\n%s"
- % (len(line), line))
-
- return buffa + buffb
-
- dumpa = jsDump(dira)
- dumpb = jsDump(dirb)
- udiff = difflib.unified_diff(dumpa.split("\n"), dumpb.split("\n"),
- "~/" + namea, "~/" + nameb,
- n=10000, lineterm='')
-
- return "\n".join(compress([line for line in udiff]))
-
-
- class Cartman:
- """
- Create argument sets from ranges (or ay iterators) of values.
-
- This class is to enable easy definition and generation of dictionary
- argument sets using Cartesian product.
-
- To use Cartman iterator, you need to define structure of an argument
- set. Argument set--typically a dictionary--is a set of values that
- together constitute a test case. Within the argument set, values
- will change from test case to test case, so for each changing value,
- you will also need to define range of values you want to test on.
-
- Cartman initiator expects following arguments:
-
- * *scheme*, which is a "prototype" of a final argument set, except
- that values are replaced by either `Cartman.Iterable` if the
- value is changing from test case to another, and `Cartman.Scalar`
- if the value is constant.
-
- * *source*, which has the same structure, except that where in scheme
- is `Cartman.Iterable`, the source has an iterable. Where scheme has
- `Cartman.Scalar`, the source can have any value.
-
- Finally, when Cartman instance is used in loop, it uses Cartesian product
- in order to generate argument sets.
-
- Consider this example:
-
- You have a system (wrapped up in test driver) that takes ''size''
- argument, that is supposed to be ''width'', ''height'' and ''depth'',
- each an integer ranging from 1 to 100, and ''color'' that can
- be "white", "black" or "yellow".
-
- For a test using all-combinations strategy, you will need to generate
- 100 * 100 * 100 * 3 argument sets, i.e. 3M tests.
-
- All you need to do is:
-
- scheme = {
- 'size': {
- 'width': Cartman.Iterable,
- 'height': Cartman.Iterable,
- 'depth': Cartman.Iterable,
- }
- 'color': Cartman.Iterable,
- }
-
- source = {
- 'size': {
- 'width': range(1, 100),
- 'height': range(1, 100),
- 'depth': range(1, 100),
- }
- 'color': ['white', 'black', 'yellow'],
- }
-
- c = Cartman(source, scheme)
-
- for argset in c:
- result = my_test(argset)
- # assert ...
-
- The main advantage is that you can separate the definition from
- the code, and you can keep yor iterators as big or as small as
- needed, and add / remove values.
-
- Also in case your parameters vary in structure over time, or from
- one test to another, it gets much easier to keep up with changes
- without much jumping through hoops.
-
- Note: `Cartman.Scalar` is provided mainly to make your definitions
- more readable. Following constructions are functionally equal:
-
- c = Cartman({'a': 1}, {'a': Cartman.Scalar})
- c = Cartman({'a': [1]}, {'a': Cartman.Iterable})
-
- In future, however, this might change, though, mainly in case
- optimization became possible based on what was used.
- """
-
- # TODO: support for arbitrary ordering (profile / nginx)
- # TODO: implement getstats and fmtstats
- # TODO: N-wise
-
- class _BaseMark:
- pass
-
- class Scalar(_BaseMark):
- pass
-
- class Iterable(_BaseMark):
- pass
-
- def __init__(self, source, scheme):
- self.source = source
- self.scheme = scheme
-
- # validate scheme + source and throw useful error
- scheme_ok = isinstance(self.scheme, collections.Mapping)
- source_ok = isinstance(self.source, collections.Mapping)
- if not scheme_ok:
- raise ValueError("scheme must be a mapping (e.g. dict)")
- elif scheme_ok and not source_ok:
- raise ValueError("scheme vs. source mismatch")
-
- def __deepcopy__(self, memo):
- return Cartman(deepcopy(self.source, memo),
- deepcopy(self.scheme, memo))
-
- def _is_mark(self, subscheme):
- try:
- return issubclass(subscheme, Cartman._BaseMark)
- except TypeError:
- return False
-
- def _means_scalar(self, subscheme):
- if self._is_mark(subscheme):
- return issubclass(subscheme, Cartman.Scalar)
-
- def _means_iterable(self, subscheme):
- if self._is_mark(subscheme):
- return issubclass(subscheme, Cartman.Iterable)
-
- def _get_iterable_for(self, key):
- subscheme = self.scheme[key]
- subsource = self.source[key]
- if self._means_scalar(subscheme):
- return [subsource]
- elif self._means_iterable(subscheme):
- return subsource
- else: # try to use it as scheme
- return iter(Cartman(subsource, subscheme))
-
- def __iter__(self):
-
- names = []
- iterables = []
-
- for key in self.scheme:
- try:
- iterables.append(self._get_iterable_for(key))
- except KeyError:
- pass # ignore that subsource mentioned by scheme is missing
- else:
- names.append(key)
-
- for values in itertools.product(*iterables):
- yield dict(zip(names, values))
-
- def getstats(self):
- return {}
-
- def fmtstats(self):
- return ""
|