#!/usr/bin/python2 import yaml class ParsingError(ValueError): pass class _EventGroup(object): def __init__(self, events): if not events: raise ValueError("no events in event group") self._events = events @property def all_events(self): return self._events @property def asserts(self): return [e for e in self._events if e.is_assert] @property def fails(self): return [e for e in self._events if e.etype == 'FAIL'] @property def passes(self): return [e for e in self._events if e.etype == 'PASS'] @property def events(self): return self._events @property def lints(self): lints = [] for e in self._events: lints += e.lints return lints class Event(object): is_assert = False def __str__(self): return str(self._rawdata) def __init__(self, rawdata): self._rawdata = rawdata self.origin = rawdata['origin'] self.stamp = rawdata['stamp'] self.hint = rawdata['hint'] self.beids = [BeId(b) for b in rawdata['beids']] self.edata = rawdata['data'] self.etype = rawdata['etype'] self.phaseid = rawdata['phase']['id'] self.phasename = rawdata['phase']['name'] self.phasetype = rawdata['phase']['type'] @staticmethod def from_data(data): etype = data['etype'] if etype in ['PASS', 'FAIL']: cls = AssertEvent elif etype == 'SINFO': cls = StructureInfoEvent elif etype == 'TEST_ERROR': cls = TestErrorEvent elif etype == 'PROMISE': cls = PromiseEvent else: raise ParsingError("unknown event type: %s" % etype) return cls(data) @property def lints(self): return [] class StrayEvent(Event): pass class TestErrorEvent(Event): def lints(self): return [Lint("test error", self._rawdata)] class PromiseEvent(Event): pass class StructureInfoEvent(Event): pass class AssertEvent(Event): is_assert = True @property def lints(self): lints = [] if self.phaseid == '0000000000-000000000': lints.append(Lint('assert outside phase', self)) return lints class BeId(object): def __init__(self, data): self._data = data class Phase(_EventGroup): @property def name(self): return self.all_events[0].phasename @property def type(self): return self.all_events[0].phasetype class EventLog(_EventGroup): """ Session event log (all events from session) """ @classmethod def from_data(cls, data): return cls([Event.from_data(e) for e in data]) class Lint(object): """ Lint-like warning coming from log parsing (eg. unclosed phase) """ def __init__(self, msg, data): self.msg = msg self._data = data class Session(object): def __init__(self, doc): self._doc = doc self._format = doc['format'] if not self._format.startswith('jat/0.'): raise ParsingError('unsupported log format: %s' % self._format) self.jat_version = doc['jat_version'] self.start = doc['start'] self.sessid = 'noid' self.finalized = doc.get('finalized', False) self.end = doc.get('end') self.lints = [] self.eventlog = EventLog.from_data(doc['events']) self.lints += self.eventlog.lints self.phases, lints = self._es2ps(self.eventlog) self.lints += lints if not self.finalized: self.lints.append(Lint("session not finalized", self.sessid)) class _PhaseBuff: def __init__(self): self.phaseid = None self.events = [] def is_anon(self): return self.phaseid == '0000000000.000000000' def dump2phase(self): phase = Phase(self.events) self.phaseid = None self.events = [] return phase def match(self, event): if self.phaseid is None: return True else: return self.phaseid == event.phaseid def append(self, event): self.phaseid = event.phaseid self.events.append(event) def __nonzero__(self): return bool(self.events) def __bool__(self): return bool(self.events) def __len__(self): return len(self.events) @staticmethod def _es2ps(eventlog): def addlint(msg): lints.append(Lint(msg, event)) phases = [] lints = [] this_phaseid = None pbuff = Session._PhaseBuff() for event in eventlog.all_events: if event.origin == 'jat__sinit': continue if event.origin == 'jat__sfinish': if pbuff: phases.append(pbuff.dump2phase()) addlint('unfinished phase: %s' % pbuff.phaseid) continue if event.origin == 'jat__pend': # # Phase end # if pbuff: phases.append(pbuff.dump2phase()) else: addlint('dangling phase end') elif event.origin == '__jat__pstart': # # Phase start # if pbuff: phases.append(pbuff.dump2phase()) if not pbuff.is_anon(): addlint('phase starts before previous ends') elif pbuff: # # Second or later event # if not pbuff.match(event): phases.append(pbuff.dump2phase()) addlint('phase mismatch: %s =! %s'\ % (event.phaseid, pbuff.phaseid)) pbuff.append(event) else: # # First event # pbuff.append(event) return phases, lints def load(fpath): with open(fpath) as ylog: sessions = [Session(doc) for doc in yaml.load_all(ylog)] return sessions