123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- #!/usr/bin/python2
-
- import yaml
- import sys
-
-
- class ParsingError(ValueError):
- pass
-
-
- class _EventGroup(object):
-
- def __init__(self, events):
- if not events:
- raise ValueError("no events in event group")
- self._events = events
-
- @property
- def all_events(self):
- return self._events
-
- @property
- def asserts(self):
- return [e for e in self._events if e.is_assert]
-
- @property
- def fails(self):
- return [e for e in self._events if e.etype == 'ASSERT.FAIL']
-
- @property
- def passes(self):
- return [e for e in self._events if e.etype == 'ASSERT.PASS']
-
- @property
- def events(self):
- return self._events
-
- @property
- def lints(self):
- lints = []
- for e in self._events:
- lints += e.lints
- return lints
-
-
- class EventTree(object):
-
- def __init__(self, events, head=None):
- self.children = []
- self.rest = [e for e in events]
- self.head = head
- while self.rest:
- if self.peek().startswith('SESSION.'):
- self.pop()
- elif self.peek() == 'PHASE.START':
- phead = self.pop()
- pbody = self.pull_branch('PHASE.END')
- self.children.append(Phase(pbody, phead))
- elif self.peek().startswith('MESSAGE.'):
- self.children.append(self.pop())
- elif self.peek().startswith('ASSERT.'):
- self.children.append(self.pop())
-
- def peek(self):
- return self.rest[0].etype
-
- def pop(self):
- return self.rest.pop(0)
-
- def pull_branch(self, final_etype):
- out = []
- while self.rest:
- if self.peek() == final_etype:
- self.pop()
- return out
- else:
- out.append(self.pop())
- self.lints += [Lint('could not find: %s' % final_etype, out)]
- return out
-
- @property
- def lints(self):
- lints = []
- for e in self.children:
- lints += e.lints
- return lints
-
-
- class Phase(EventTree):
-
- is_phase = True
-
- @property
- def id(self):
- return self.head._rawdata['phase']['id']
-
- @property
- def name(self):
- return self.head._rawdata['phase']['name']
-
- @property
- def type(self):
- return self.head._rawdata['phase']['type']
-
- @property
- def verdict(self):
- out = 'UNKNOWN'
- for e in self.children:
- if e.is_assert:
- if e.verdict == 'PASS':
- out = 'PASS'
- elif e.verdict == 'FAIL':
- return 'FAIL'
- return out
-
-
- class Event(object):
-
- is_assert = False
- is_message = False
- is_phase = False
-
- def __str__(self):
- return str(self._rawdata)
-
- def __init__(self, rawdata):
- self._rawdata = rawdata
- self.etype = rawdata['etype']
- self.stamp = rawdata['stamp']
- self.data = rawdata.get('data', dict())
-
- @staticmethod
- def from_data(data):
- etype = data['etype']
- if etype in ['ASSERT.PASS', 'ASSERT.FAIL']:
- cls = AssertEvent
- elif etype in ['PHASE.START', 'PHASE.END']:
- cls = StructureInfoEvent
- elif etype in ['SESSION.START', 'SESSION.RELOAD', 'SESSION.END']:
- cls = StructureInfoEvent
- elif etype == 'MESSAGE.INFO':
- cls = MessageEvent
- elif etype == 'MESSAGE.WARNING':
- cls = MessageEvent
- elif etype == 'MESSAGE.ERROR':
- cls = MessageEvent
- elif etype == 'MESSAGE.PROMISE':
- cls = PromiseEvent
- else:
- raise ParsingError("unknown event type: %s" % etype)
- return cls(data)
-
- @property
- def lints(self):
- return []
-
-
- class MessageEvent(Event):
-
- is_message = True
-
- @property
- def message(self):
- return self._rawdata['message']
-
- @property
- def severity(self):
- return self.etype.split('.')[1]
-
- @property
- def lints(self):
- if self.severity == 'WARNING':
- return [Lint("test warning", self._rawdata)]
- elif self.severity == 'ERROR':
- return [Lint("test error", self._rawdata)]
- else:
- return []
-
-
- class PromiseEvent(Event):
-
- @property
- def lints(self):
- return [Lint("promises are not supported (yet)", self._rawdata)]
-
-
- class StructureInfoEvent(Event):
- pass
-
-
- class AssertEvent(Event):
-
- is_assert = True
-
- @property
- def caseid(self):
- return self._rawdata['caseid']
-
- @property
- def hint(self):
- return self._rawdata['hint']
-
- @property
- def verdict(self):
- return self.etype.split('.')[1]
-
-
- class EventLog(_EventGroup):
- """
- Session event log (all events from session)
- """
-
- @classmethod
- def from_data(cls, data):
- return cls([Event.from_data(e) for e in data])
-
-
- class Lint(object):
- """
- Lint-like warning coming from log parsing (eg. unclosed phase)
- """
-
- def __init__(self, msg, data):
- self.msg = msg
- self._data = data
-
-
- class Session(object):
-
- def __init__(self, doc):
- self._doc = doc
- self._format = doc['format']
- if not self._format.startswith('jat/0.'):
- raise ParsingError('unsupported log format: %s' % self._format)
- self.jat_version = doc['jat_version']
- self.start = doc['start']
- self.sessid = doc.get('sessid', 'noid')
- self.finalized = doc.get('finalized', False)
- self.end = doc.get('end')
- self.lints = []
- self.eventlog = EventLog.from_data(doc['events'])
- self.lints += self.eventlog.lints
- self.eventtree = EventTree(self.eventlog._events)
- self.lints += self.eventtree.lints
- if not self.finalized:
- self.lints.append(Lint("session not finalized", self.sessid))
-
- def load(fpath):
- with open(fpath) as ylog:
- sessions = [Session(doc) for doc in yaml.load_all(ylog)]
- return sessions
|