#!/usr/bin/python2 import yaml import sys reload(sys) sys.setdefaultencoding("utf-8") class ParsingError(ValueError): pass class _EventGroup(object): def __init__(self, events): if not events: raise ValueError("no events in event group") self._events = events @property def all_events(self): return self._events @property def asserts(self): return [e for e in self._events if e.is_assert] @property def fails(self): return [e for e in self._events if e.etype == 'ASSERT.FAIL'] @property def passes(self): return [e for e in self._events if e.etype == 'ASSERT.PASS'] @property def events(self): return self._events @property def lints(self): lints = [] for e in self._events: lints += e.lints return lints class EventTree(object): def __init__(self, events, head=None): self.children = [] self.rest = [e for e in events] self.head = head self._lints = [] while self.rest: if self.peek().startswith('SESSION.'): self.pop() elif self.peek() == 'PHASE.START': phead = self.pop() pbody = self.pull_branch('PHASE.END') self.children.append(Phase(pbody, phead)) elif self.peek().startswith('MESSAGE.'): self.children.append(self.pop()) elif self.peek().startswith('ASSERT.'): self.children.append(self.pop()) def peek(self): return self.rest[0].etype def pop(self): return self.rest.pop(0) def pull_branch(self, final_etype): out = [] while self.rest: if self.peek() == final_etype: self.pop() return out else: out.append(self.pop()) self._lints += [Lint('could not find: %s' % final_etype, out)] return out @property def lints(self): lints = [] for e in self.children: lints += e.lints return self._lints + lints class Phase(EventTree): is_phase = True @property def id(self): return self.head._rawdata['phase']['id'] @property def name(self): return self.head._rawdata['phase']['name'] @property def type(self): return self.head._rawdata['phase']['type'] @property def verdict(self): out = 'UNKNOWN' for e in self.children: if e.is_assert: if e.verdict == 'PASS': out = 'PASS' elif e.verdict == 'FAIL': return 'FAIL' return out class Event(object): is_assert = False is_message = False is_phase = False def __str__(self): return str(self._rawdata) def __init__(self, rawdata): self._rawdata = rawdata self.etype = rawdata['etype'] self.stamp = rawdata['stamp'] self.data = rawdata.get('data', dict()) @staticmethod def from_data(data): etype = data['etype'] if etype in ['ASSERT.PASS', 'ASSERT.FAIL']: cls = AssertEvent elif etype in ['PHASE.START', 'PHASE.END']: cls = StructureInfoEvent elif etype in ['SESSION.START', 'SESSION.RELOAD', 'SESSION.END']: cls = StructureInfoEvent elif etype == 'MESSAGE.INFO': cls = MessageEvent elif etype == 'MESSAGE.WARNING': cls = MessageEvent elif etype == 'MESSAGE.ERROR': cls = MessageEvent elif etype == 'MESSAGE.PROMISE': cls = PromiseEvent else: raise ParsingError("unknown event type: %s" % etype) return cls(data) @property def lints(self): return [] class MessageEvent(Event): is_message = True @property def message(self): return self._rawdata['message'] @property def severity(self): return self.etype.split('.')[1] @property def lints(self): if self.severity == 'WARNING': return [Lint("test warning", self._rawdata)] elif self.severity == 'ERROR': return [Lint("test error", self._rawdata)] else: return [] class PromiseEvent(Event): @property def lints(self): return [Lint("promises are not supported (yet)", self._rawdata)] class StructureInfoEvent(Event): pass class AssertEvent(Event): is_assert = True @property def caseid(self): return self._rawdata['caseid'] @property def hint(self): return self._rawdata['hint'] @property def verdict(self): return self.etype.split('.')[1] class EventLog(_EventGroup): """ Session event log (all events from session) """ @classmethod def from_data(cls, data): return cls([Event.from_data(e) for e in data]) class Lint(object): """ Lint-like warning coming from log parsing (eg. unclosed phase) """ def __init__(self, msg, data): self.msg = msg self._data = data class Session(object): def __init__(self, doc): self._doc = doc self._format = doc['format'] if not self._format.startswith('jat/0.'): raise ParsingError('unsupported log format: %s' % self._format) self.jat_version = doc['jat_version'] self.start = doc['start'] self.id = doc.get('id', 'noid') self.test = doc.get('test') self.finalized = doc.get('finalized', False) self.end = doc.get('end') self.lints = [] self.eventlog = EventLog.from_data(doc['events']) self.lints += self.eventlog.lints self.eventtree = EventTree(self.eventlog._events) self.lints += self.eventtree.lints if not self.finalized: self.lints.append(Lint("session not finalized", self.id)) def load(fpath): with open(fpath) as ylog: sessions = [Session(doc) for doc in yaml.load_all(ylog)] return sessions