| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Term based tool to view *colored*, *incremental* diff in a *Git/Mercurial/Svn*
workspace or from stdin, with *side by side* and *auto pager* support. Requires
python (>= 2.5.0) and ``less``.
"""
META_INFO = {
    'version'     : '0.8',
    'license'     : 'BSD-3',
    'author'      : 'Matthew Wang',
    'email'       : 'mattwyl(@)gmail(.)com',
    'url'         : 'https://github.com/ymattw/cdiff',
    'keywords'    : 'colored incremental side-by-side diff',
    'description' : ('View colored, incremental diff in a workspace or from '
                     'stdin, with side by side and auto pager support')
}
import sys
if sys.hexversion < 0x02050000:
    raise SystemExit("*** Requires python >= 2.5.0")    # pragma: no cover
import re
import subprocess
import errno
import difflib
COLORS = {
    'reset'         : '\x1b[0m',
    'underline'     : '\x1b[4m',
    'reverse'       : '\x1b[7m',
    'red'           : '\x1b[31m',
    'green'         : '\x1b[32m',
    'yellow'        : '\x1b[33m',
    'blue'          : '\x1b[34m',
    'magenta'       : '\x1b[35m',
    'cyan'          : '\x1b[36m',
    'lightred'      : '\x1b[1;31m',
    'lightgreen'    : '\x1b[1;32m',
    'lightyellow'   : '\x1b[1;33m',
    'lightblue'     : '\x1b[1;34m',
    'lightmagenta'  : '\x1b[1;35m',
    'lightcyan'     : '\x1b[1;36m',
}
# Keys for revision control probe, diff and log with diff
VCS_INFO = {
    'Git': {
        'probe' : ['git', 'rev-parse'],
        'diff'  : ['git', 'diff'],
        'log'   : ['git', 'log', '--patch'],
    },
    'Mercurial': {
        'probe' : ['hg', 'summary'],
        'diff'  : ['hg', 'diff'],
        'log'   : ['hg', 'log', '--patch'],
    },
    'Svn': {
        'probe' : ['svn', 'info'],
        'diff'  : ['svn', 'diff'],
        'log'   : ['svn', 'log', '--diff', '--use-merge-history'],
    },
}
def colorize(text, start_color, end_color='reset'):
    return COLORS[start_color] + text + COLORS[end_color]
class Hunk(object):
    def __init__(self, hunk_headers, hunk_meta, old_addr, new_addr):
        self._hunk_headers = hunk_headers
        self._hunk_meta = hunk_meta
        self._old_addr = old_addr   # tuple (start, offset)
        self._new_addr = new_addr   # tuple (start, offset)
        self._hunk_list = []        # list of tuple (attr, line)
    def append(self, hunk_line):
        """hunk_line is a 2-element tuple: (attr, text), where attr is:
                '-': old, '+': new, ' ': common
        """
        self._hunk_list.append(hunk_line)
    def mdiff(self):
        r"""The difflib._mdiff() function returns an interator which returns a
        tuple: (from line tuple, to line tuple, boolean flag)
        from/to line tuple -- (line num, line text)
            line num -- integer or None (to indicate a context separation)
            line text -- original line text with following markers inserted:
                '\0+' -- marks start of added text
                '\0-' -- marks start of deleted text
                '\0^' -- marks start of changed text
                '\1' -- marks end of added/deleted/changed text
        boolean flag -- None indicates context separation, True indicates
            either "from" or "to" line contains a change, otherwise False.
        """
        return difflib._mdiff(self._get_old_text(), self._get_new_text())
    def _get_old_text(self):
        out = []
        for (attr, line) in self._hunk_list:
            if attr != '+':
                out.append(line)
        return out
    def _get_new_text(self):
        out = []
        for (attr, line) in self._hunk_list:
            if attr != '-':
                out.append(line)
        return out
class DiffOps(object):      # pragma: no cover
    """Methods in this class are supposed to be overwritten by derived class.
    No is_header() anymore, all non-recognized lines are considered as headers
    """
    def is_old_path(self, line):
        return False
    def is_new_path(self, line):
        return False
    def is_hunk_meta(self, line):
        return False
    def parse_hunk_meta(self, line):
        """Returns a 2-element tuple, each is a tuple of (start, offset)"""
        return None
    def parse_hunk_line(self, line):
        """Returns a 2-element tuple: (attr, text), where attr is:
                '-': old, '+': new, ' ': common
        """
        return None
    def is_old(self, line):
        return False
    def is_new(self, line):
        return False
    def is_common(self, line):
        return False
    def is_eof(self, line):
        return False
    def is_only_in_dir(self, line):
        return False
    def is_binary_differ(self, line):
        return False
class Diff(DiffOps):
    def __init__(self, headers, old_path, new_path, hunks):
        self._headers = headers
        self._old_path = old_path
        self._new_path = new_path
        self._hunks = hunks
    def markup_traditional(self):
        """Returns a generator"""
        for line in self._headers:
            yield self._markup_header(line)
        yield self._markup_old_path(self._old_path)
        yield self._markup_new_path(self._new_path)
        for hunk in self._hunks:
            for hunk_header in hunk._hunk_headers:
                yield self._markup_hunk_header(hunk_header)
            yield self._markup_hunk_meta(hunk._hunk_meta)
            for old, new, changed in hunk.mdiff():
                if changed:
                    if not old[0]:
                        # The '+' char after \x00 is kept
                        # DEBUG: yield 'NEW: %s %s\n' % (old, new)
                        line = new[1].strip('\x00\x01')
                        yield self._markup_new(line)
                    elif not new[0]:
                        # The '-' char after \x00 is kept
                        # DEBUG: yield 'OLD: %s %s\n' % (old, new)
                        line = old[1].strip('\x00\x01')
                        yield self._markup_old(line)
                    else:
                        # DEBUG: yield 'CHG: %s %s\n' % (old, new)
                        yield self._markup_old('-') + \
                            self._markup_mix(old[1], 'red')
                        yield self._markup_new('+') + \
                            self._markup_mix(new[1], 'green')
                else:
                    yield self._markup_common(' ' + old[1])
    def markup_side_by_side(self, width):
        """Returns a generator"""
        wrap_char = colorize('>', 'lightmagenta')
        def _normalize(line):
            return line.replace(
                '\t', ' ' * 8).replace('\n', '').replace('\r', '')
        def _fit_with_marker(text, markup_fn, width, pad=False):
            """Wrap or pad input pure text, then markup"""
            if len(text) > width:
                return markup_fn(text[:(width - 1)]) + wrap_char
            elif pad:
                pad_len = width - len(text)
                return '%s%*s' % (markup_fn(text), pad_len, '')
            else:
                return markup_fn(text)
        def _fit_with_marker_mix(text, base_color, width, pad=False):
            """Wrap or pad input text which contains mdiff tags, markup at the
            meantime, note only left side need to set `pad`
            """
            out = [COLORS[base_color]]
            count = 0
            tag_re = re.compile(r'\x00[+^-]|\x01')
            while text and count < width:
                if text.startswith('\x00-'):    # del
                    out.append(COLORS['reverse'] + COLORS[base_color])
                    text = text[2:]
                elif text.startswith('\x00+'):  # add
                    out.append(COLORS['reverse'] + COLORS[base_color])
                    text = text[2:]
                elif text.startswith('\x00^'):  # change
                    out.append(COLORS['underline'] + COLORS[base_color])
                    text = text[2:]
                elif text.startswith('\x01'):   # reset
                    out.append(COLORS['reset'] + COLORS[base_color])
                    text = text[1:]
                else:
                    # FIXME: utf-8 wchar might break the rule here, e.g.
                    # u'\u554a' takes double width of a single letter, also
                    # this depends on your terminal font.  I guess audience of
                    # this tool never put that kind of symbol in their code :-)
                    #
                    out.append(text[0])
                    count += 1
                    text = text[1:]
            if count == width and tag_re.sub('', text):
                # Was stripped: output fulfil and still has normal char in text
                out[-1] = COLORS['reset'] + wrap_char
            elif count < width and pad:
                pad_len = width - count
                out.append('%s%*s' % (COLORS['reset'], pad_len, ''))
            else:
                out.append(COLORS['reset'])
            return ''.join(out)
        # Set up line width
        if width <= 0:
            width = 80
        # Set up number width, note last hunk might be empty
        try:
            (start, offset) = self._hunks[-1]._old_addr
            max1 = start + offset - 1
            (start, offset) = self._hunks[-1]._new_addr
            max2 = start + offset - 1
        except IndexError:
            max1 = max2 = 0
        num_width = max(len(str(max1)), len(str(max2)))
        # Setup lineno and line format
        left_num_fmt = colorize('%%(left_num)%ds' % num_width, 'yellow')
        right_num_fmt = colorize('%%(right_num)%ds' % num_width, 'yellow')
        line_fmt = left_num_fmt + ' %(left)s ' + COLORS['reset'] + \
            right_num_fmt + ' %(right)s\n'
        # yield header, old path and new path
        for line in self._headers:
            yield self._markup_header(line)
        yield self._markup_old_path(self._old_path)
        yield self._markup_new_path(self._new_path)
        # yield hunks
        for hunk in self._hunks:
            for hunk_header in hunk._hunk_headers:
                yield self._markup_hunk_header(hunk_header)
            yield self._markup_hunk_meta(hunk._hunk_meta)
            for old, new, changed in hunk.mdiff():
                if old[0]:
                    left_num = str(hunk._old_addr[0] + int(old[0]) - 1)
                else:
                    left_num = ' '
                if new[0]:
                    right_num = str(hunk._new_addr[0] + int(new[0]) - 1)
                else:
                    right_num = ' '
                left = _normalize(old[1])
                right = _normalize(new[1])
                if changed:
                    if not old[0]:
                        left = '%*s' % (width, ' ')
                        right = right.lstrip('\x00+').rstrip('\x01')
                        right = _fit_with_marker(
                            right, self._markup_new, width)
                    elif not new[0]:
                        left = left.lstrip('\x00-').rstrip('\x01')
                        left = _fit_with_marker(left, self._markup_old, width)
                        right = ''
                    else:
                        left = _fit_with_marker_mix(left, 'red', width, 1)
                        right = _fit_with_marker_mix(right, 'green', width)
                else:
                    left = _fit_with_marker(
                        left, self._markup_common, width, 1)
                    right = _fit_with_marker(right, self._markup_common, width)
                yield line_fmt % {
                    'left_num': left_num,
                    'left': left,
                    'right_num': right_num,
                    'right': right
                }
    def _markup_header(self, line):
        return colorize(line, 'cyan')
    def _markup_old_path(self, line):
        return colorize(line, 'yellow')
    def _markup_new_path(self, line):
        return colorize(line, 'yellow')
    def _markup_hunk_header(self, line):
        return colorize(line, 'lightcyan')
    def _markup_hunk_meta(self, line):
        return colorize(line, 'lightblue')
    def _markup_common(self, line):
        return colorize(line, 'reset')
    def _markup_old(self, line):
        return colorize(line, 'lightred')
    def _markup_new(self, line):
        return colorize(line, 'lightgreen')
    def _markup_mix(self, line, base_color):
        del_code = COLORS['reverse'] + COLORS[base_color]
        add_code = COLORS['reverse'] + COLORS[base_color]
        chg_code = COLORS['underline'] + COLORS[base_color]
        rst_code = COLORS['reset'] + COLORS[base_color]
        line = line.replace('\x00-', del_code)
        line = line.replace('\x00+', add_code)
        line = line.replace('\x00^', chg_code)
        line = line.replace('\x01', rst_code)
        return colorize(line, base_color)
class UnifiedDiff(Diff):
    def is_old_path(self, line):
        return line.startswith('--- ')
    def is_new_path(self, line):
        return line.startswith('+++ ')
    def is_hunk_meta(self, line):
        """Minimal valid hunk meta is like '@@ -1 +1 @@', note extra chars
        might occur after the ending @@, e.g. in git log.  '## ' uaually
        indicates svn property changes in output from `svn log --diff`
        """
        return (line.startswith('@@ -') and line.find(' @@') >= 8) or \
               (line.startswith('## -') and line.find(' ##') >= 8)
    def parse_hunk_meta(self, hunk_meta):
        # @@ -3,7 +3,6 @@
        a = hunk_meta.split()[1].split(',')   # -3 7
        if len(a) > 1:
            old_addr = (int(a[0][1:]), int(a[1]))
        else:
            # @@ -1 +1,2 @@
            old_addr = (int(a[0][1:]), 0)
        b = hunk_meta.split()[2].split(',')   # +3 6
        if len(b) > 1:
            new_addr = (int(b[0][1:]), int(b[1]))
        else:
            # @@ -0,0 +1 @@
            new_addr = (int(b[0][1:]), 0)
        return (old_addr, new_addr)
    def parse_hunk_line(self, line):
        return (line[0], line[1:])
    def is_old(self, line):
        """Exclude old path and header line from svn log --diff output, allow
        '----' likely to see in diff from yaml file
        """
        return line.startswith('-') and not self.is_old_path(line) and \
            not re.match(r'^-{72}$', line.rstrip())
    def is_new(self, line):
        return line.startswith('+') and not self.is_new_path(line)
    def is_common(self, line):
        return line.startswith(' ')
    def is_eof(self, line):
        # \ No newline at end of file
        # \ No newline at end of property
        return line.startswith(r'\ No newline at end of')
    def is_only_in_dir(self, line):
        return line.startswith('Only in ')
    def is_binary_differ(self, line):
        return re.match('^Binary files .* differ$', line.rstrip())
class ContextDiff(Diff):
    pass
class PatchStream(object):
    def __init__(self, diff_hdl):
        self._diff_hdl = diff_hdl
        self._stream_header_size = 0
        self._stream_header = []
        # Test whether stream is empty by read 1 line
        line = self._diff_hdl.readline()
        if not line:
            self._is_empty = True
        else:
            self._stream_header.append(line)
            self._stream_header_size += 1
            self._is_empty = False
    def is_empty(self):
        return self._is_empty
    def read_stream_header(self, stream_header_size):
        """Returns a small chunk for patch type detect, suppose to call once"""
        for i in range(1, stream_header_size):
            line = self._diff_hdl.readline()
            if not line:
                break
            self._stream_header.append(line)
            self._stream_header_size += 1
        return self._stream_header
    def __iter__(self):
        for line in self._stream_header:
            yield line
        for line in self._diff_hdl:
            yield line
class DiffParser(object):
    def __init__(self, stream):
        self._stream = stream
        header = [decode(line) for line in
                  self._stream.read_stream_header(100)]
        size = len(header)
        if size >= 4 and (header[0].startswith('*** ') and
                          header[1].startswith('--- ') and
                          header[2] == '***************\n' and
                          header[3].startswith('*** ') and
                          header[3].endswith(' ****\n')):
            self._type = 'context'
            return
        for n in range(size):
            if header[n].startswith('--- ') and (n < size - 1) and \
                    header[n+1].startswith('+++ '):
                self._type = 'unified'
                break
        else:
            if size < 4:
                # It's safe to consider as udiff if patch stream contains no
                # more than 3 lines, happens with `git diff` on a file that
                # only has perm bits changes
                #
                self._type = 'unified'
            else:
                raise RuntimeError('unknown diff type')
    def get_diff_generator(self):
        """parse all diff lines, construct a list of Diff objects"""
        if self._type == 'unified':
            difflet = UnifiedDiff(None, None, None, None)
        else:
            raise RuntimeError('unsupported diff format')
        diff = Diff([], None, None, [])
        headers = []
        for line in self._stream:
            line = decode(line)
            if difflet.is_old_path(line):
                # FIXME: '--- ' breaks here, need to probe next 3 lines,
                # reproducible with github raw patch
                #
                if diff._old_path and diff._new_path and len(diff._hunks) > 0:
                    # One diff constructed
                    yield diff
                    diff = Diff([], None, None, [])
                diff = Diff(headers, line, None, [])
                headers = []
            elif difflet.is_new_path(line):
                diff._new_path = line
            elif difflet.is_hunk_meta(line):
                hunk_meta = line
                try:
                    old_addr, new_addr = difflet.parse_hunk_meta(hunk_meta)
                except (IndexError, ValueError):
                    raise RuntimeError('invalid hunk meta: %s' % hunk_meta)
                hunk = Hunk(headers, hunk_meta, old_addr, new_addr)
                headers = []
                diff._hunks.append(hunk)
            elif len(diff._hunks) > 0 and (difflet.is_old(line) or
                                           difflet.is_new(line) or
                                           difflet.is_common(line)):
                diff._hunks[-1].append(difflet.parse_hunk_line(line))
            elif difflet.is_eof(line):
                # ignore
                pass
            elif difflet.is_only_in_dir(line) or \
                    difflet.is_binary_differ(line):
                # 'Only in foo:' and 'Binary files ... differ' are considered
                # as separate diffs, so yield current diff, then this line
                #
                if diff._old_path and diff._new_path and len(diff._hunks) > 0:
                    # Current diff is comppletely constructed
                    yield diff
                headers.append(line)
                yield Diff(headers, '', '', [])
                headers = []
                diff = Diff([], None, None, [])
            else:
                # All other non-recognized lines are considered as headers or
                # hunk headers respectively
                #
                headers.append(line)
        # Validate and yield the last patch set if it is not yielded yet
        if diff._old_path:
            assert diff._new_path is not None
            if diff._hunks:
                assert len(diff._hunks[-1]._hunk_meta) > 0
                assert len(diff._hunks[-1]._hunk_list) > 0
            yield diff
        if headers:
            # Tolerate dangling headers, just yield a Diff object with only
            # header lines
            #
            yield Diff(headers, '', '', [])
class DiffMarkup(object):
    def __init__(self, stream):
        self._diffs = DiffParser(stream).get_diff_generator()
    def markup(self, side_by_side=False, width=0):
        """Returns a generator"""
        if side_by_side:
            return self._markup_side_by_side(width)
        else:
            return self._markup_traditional()
    def _markup_traditional(self):
        for diff in self._diffs:
            for line in diff.markup_traditional():
                yield line
    def _markup_side_by_side(self, width):
        for diff in self._diffs:
            for line in diff.markup_side_by_side(width):
                yield line
def markup_to_pager(stream, opts):
    markup = DiffMarkup(stream)
    color_diff = markup.markup(side_by_side=opts.side_by_side,
                               width=opts.width)
    # Args stolen from git source: github.com/git/git/blob/master/pager.c
    pager = subprocess.Popen(
        ['less', '-FRSX'], stdin=subprocess.PIPE, stdout=sys.stdout)
    try:
        for line in color_diff:
            pager.stdin.write(line.encode('utf-8'))
    except KeyboardInterrupt:
        pass
    pager.stdin.close()
    pager.wait()
def check_command_status(arguments):
    """Return True if command returns 0."""
    try:
        return subprocess.call(
            arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0
    except OSError:
        return False
def revision_control_diff(args):
    """Return diff from revision control system."""
    for _, ops in VCS_INFO.items():
        if check_command_status(ops['probe']):
            return subprocess.Popen(
                ops['diff'] + args, stdout=subprocess.PIPE).stdout
def revision_control_log(args):
    """Return log from revision control system."""
    for _, ops in VCS_INFO.items():
        if check_command_status(ops['probe']):
            return subprocess.Popen(
                ops['log'] + args, stdout=subprocess.PIPE).stdout
def decode(line):
    """Decode UTF-8 if necessary."""
    try:
        return line.decode('utf-8')
    except AttributeError:
        return line
def main():
    import optparse
    supported_vcs = sorted(VCS_INFO.keys())
    usage = """%prog [options] [file|dir ...]"""
    parser = optparse.OptionParser(
        usage=usage, description=META_INFO['description'],
        version='%%prog %s' % META_INFO['version'])
    parser.add_option(
        '-s', '--side-by-side', action='store_true',
        help='enable side-by-side mode')
    parser.add_option(
        '-w', '--width', type='int', default=80, metavar='N',
        help='set text width for side-by-side mode, default is 80')
    parser.add_option(
        '-l', '--log', action='store_true',
        help='show log with changes from revision control')
    parser.add_option(
        '-c', '--color', default='auto', metavar='X',
        help="""colorize mode 'auto' (default), 'always', or 'never'""")
    opts, args = parser.parse_args()
    if opts.log:
        diff_hdl = revision_control_log(args)
        if not diff_hdl:
            sys.stderr.write(('*** Not in a supported workspace, supported '
                              'are: %s\n') % ', '.join(supported_vcs))
            return 1
    elif sys.stdin.isatty():
        diff_hdl = revision_control_diff(args)
        if not diff_hdl:
            sys.stderr.write(('*** Not in a supported workspace, supported '
                              'are: %s\n\n') % ', '.join(supported_vcs))
            parser.print_help()
            return 1
    else:
        diff_hdl = sys.stdin
    stream = PatchStream(diff_hdl)
    # Don't let empty diff pass thru
    if stream.is_empty():
        return 0
    if opts.color == 'always' or \
            (opts.color == 'auto' and sys.stdout.isatty()):
        try:
            markup_to_pager(stream, opts)
        except IOError:
            e = sys.exc_info()[1]
            if e.errno == errno.EPIPE:
                pass
    else:
        # pipe out stream untouched to make sure it is still a patch
        for line in stream:
            sys.stdout.write(decode(line))
    if diff_hdl is not sys.stdin:
        diff_hdl.close()
    return 0
if __name__ == '__main__':
    sys.exit(main())
# vim:set et sts=4 sw=4 tw=79:
 |