| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 | # coding=utf-8
"""Convert Bank statement CSV to Gnucash-acceptable format"""
from collections import namedtuple
ParserState = namedtuple('ParserState', 'ths nxt')
def valid_formats():
    return _parsers.keys()
def convert_file(fpath, fmt, encoding='utf-8'):
    try:
        parser_class = _parsers[fmt]
    except KeyError:
        raise ValueError('unsupported format: %s' % fmt)
    else:
        parser = parser_class()
        parser.parse_file(fpath, encoding)
        return parser.format_csv()
def _unquote(value):
    if len(value) < 2:
        return value
    if value[0] == value[-1] == "'":
        return value[1:-1]
    if value[0] == value[-1] == '"':
        return value[1:-1]
    return value
class BaseTransaction(object):
    """A CSV-imported transaction"""
    pass
class GCTransaction(object):
    """A Gnucash-acceptable transaction"""
    def __init__(self, date, deposit, withdrawal, description):
        self.date = date
        self.deposit = deposit
        self.withdrawal = withdrawal
        self.description = description
    def __str__(self):
        """Naive CSV line printer"""
        values = [self.date, self.deposit, self.withdrawal, self.description]
        return ";".join(['"%s"' % v for v in values])
class BaseTransactionParser(object):
    """A statement parser"""
    name = "__NONE__"
    def __init__(self):
        self.transactions = []
        self.state = ParserState(ths=False, nxt=False)
    def is_open(self):
        return self.state.ths
    def is_closed(self):
        return not self.state.ths
    def parse_file(self, fpath, encoding='utf-8'):
        lines_read = 0
        with open(fpath) as f:
            for line in f.readlines():
                line = line.decode(encoding).strip()
                lines_read += 1
                self.state = self.advance(line, lineno=lines_read)
                if self.is_open():
                    self.transactions.append(self.parse_line(line).to_gc())
    def format_csv(self):
        return "\n".join([
            unicode(t).encode('utf-8')
            for t in self.transactions
        ])
class MbankTransaction(BaseTransaction):
    """Item in an Mbank CSV"""
    # (a) Datum uskutečnění transakce
    # (b) Datum zaúčtování transakce
    # (c) Popis transakce
    # (d) Zpráva pro příjemce
    # (e) Plátce/Příjemce
    # (f) Číslo účtu plátce/příjemce
    # (g) KS
    # (h) VS
    # (i) SS
    # (j) Částka transakce
    # (k) Účetní zůstatek po transakci
    # (l) (empty column at the end)
    def __init__(self, line):
        fields = [self._cleanup_field(f) for f in line.split(";")]
        self.date_r = self._convert_date(fields.pop(0))         # (a)
        self.date_b = self._convert_date(fields.pop(0))         # (b)
        __ = fields.pop()                                       # (l)
        self.new_balance = self._parse_currency(fields.pop())   # (k)
        amount = self._parse_currency(fields.pop())             # (j)
        if amount >= 0:
            self.amountd = amount
            self.amountw = 0
        else:
            self.amountd = 0
            self.amountw = -amount
        self._scrap = fields                # (c-i)
    def _cleanup_field(self, field):
        x = ' '.join(_unquote(field).strip().split())
        return '' if x == '-' else x
    def _convert_date(self, text):
        day, month, year = text.split('-')
        return '-'.join([year, month, day])
    def _description(self):
        def type_interesting(T):
            boring_types = [
                u'PŘÍCHOZÍ PLATBA Z MBANK',
                u'ODCHOZÍ PLATBA DO MBANK',
                u'PŘÍCHOZÍ PLATBA Z JINÉ BANKY',
                u'ODCHOZÍ PLATBA DO JINÉ BANKY',
                u'PLATBA KARTOU',
            ]
            if not T:
                return False
            if T in boring_types:
                return False
            return True
        def message_interesting(M):
            boring_messages = [
                u'PŘEVOD PROSTŘEDKŮ',
            ]
            if not M:
                return False
            if M in boring_messages:
                return False
            return True
        type_, message, party, number, ks, vs, ss = self._scrap
        dpt = u' DATUM PROVEDENÍ TRANSAKCE: '
        if dpt in message:
            message, __ = message.split(dpt)
        out = []
        if type_interesting(type_):
            out.append(type_)
        if message_interesting(message):
            out.append(message)
        if party:
            out.append(party)
        return " / ".join(out)
    def _parse_currency(self, text):
        """Read Mbank currency format"""
        num = text.replace(' ', '')
        num = num.replace(',', '.')
        return float(num)
    def to_gc(self):
        """Convert to GCTransaction"""
        return GCTransaction(
            date=self.date_r,
            deposit=self.amountd,
            withdrawal=self.amountw,
            description=self._description()
        )
class MbankTransactionParser(BaseTransactionParser):
    def parse_line(self, line):
        return MbankTransaction(line)
    def advance(self, line, lineno=None):
        """Choose parser state according to current line and line no."""
        if self.state.nxt:
            return ParserState(ths=True, nxt=False)
        if self.is_closed() and line.startswith(u'#Datum uskute'):
            return ParserState(ths=False, nxt=True)
        if self.is_open() and not line:
            return ParserState(ths=False, nxt=False)
        return self.state
class FioTransaction(BaseTransaction):
    # ID operace
    # Datum
    # Objem
    # Měna
    # Protiúčet
    # Název protiúčtu
    # Kód banky
    # Název banky
    # KS
    # VS
    # SS
    # Poznámka
    # Zpráva pro příjemce
    # Typ
    # Provedl
    # Upřesnění
    # Poznámka
    # BIC
    # ID pokynu
    def __init__(self, line):
        fields = [self._cleanup_field(f) for f in line.split(";")]
        __ = fields.pop(0)
        self.date = self._convert_date(fields.pop(0))
        amount = self._parse_currency(fields.pop(0))
        if amount >= 0:
            self.amountd = amount
            self.amountw = 0
        else:
            self.amountd = 0
            self.amountw = -amount
        self.currency = fields.pop(0)
        assert self.currency == 'CZK'
        self._scrap = [self._cleanup_field(f) for f in fields]
    def _cleanup_field(self, field):
        x = ' '.join(_unquote(field).strip().split())
        return '' if x == '-' else x
    def _description(self):
        (
            party_acc,
            party_accnname,
            party_bankcode,
            party_bankname,
            ks,
            vs,
            ss,
            note1,
            message,
            type_,
            who,
            explanation,
            note2,
            bic,
            comm_id
            ) = self._scrap
        out = []
        if message:
            out.append(message)
        return " / ".join(out)
    def _convert_date(self, text):
        day, month, year = text.split('.')
        return '-'.join([year, month, day])
    def _parse_currency(self, text):
        return float(text)
    def to_gc(self):
        """Convert to GCTransaction"""
        return GCTransaction(
            date=self.date,
            deposit=self.amountd,
            withdrawal=self.amountw,
            description=self._description()
        )
class FioTransactionParser(BaseTransactionParser):
    def parse_line(self, line):
        return FioTransaction(line)
    def advance(self, line, lineno=None):
        """Choose parser state according to current line and line no."""
        if self.state.nxt:
            return ParserState(ths=True, nxt=False)
        if self.is_open() and line.startswith(u'"ID operace";"Datum";'):
            return ParserState(ths=False, nxt=True)
        return self.state
class RaifTransaction(BaseTransaction):
    # (a) Transaction Date
    # (b) Booking Date
    # (c) Account number
    # (d) Account Name
    # (e) Transaction Category
    # (f) Accocunt Number [SIC]
    # (g) Name of Account
    # (h) Transaction type
    # (i) Message
    # (j) Note
    # (k) VS
    # (l) KS
    # (m) SS
    # (n) Booked amount
    # (o) Account Currency
    # (p) Original Amount and Currency
    # (q) Original Amount and Currency
    # (r) Fee
    # (s) Transaction ID
    def __init__(self, line):
        self._fields = [self._cleanup_field(f) for f in line.split(";")]
        self.date = self._convert_date(self._fields[0])
        amount = self._parse_currency(self._fields[13])
        if amount >= 0:
            self.amountd = amount
            self.amountw = 0
        else:
            self.amountd = 0
            self.amountw = -amount
        self.currency = self._fields[14]
        assert self.currency == 'CZK'
    def _cleanup_field(self, field):
        x = ' '.join(_unquote(field).strip().split())
        return '' if x == '-' else x
    def _description(self):
        (
            transaction_date,
            booking_date,
            src_account_number,
            src_account_name,
            transaction_category,
            dst_account_number,
            dst_account_name,
            dst_ransaction_type,
            message,
            note,
            vs,
            ks,
            ss,
            booked_amount,
            account_currency,
            original_amount_and_currency,
            original_amount_and_currency,
            fee,
            transaction_id,
            ) = self._fields
        out = []
        if message:
            out.append(message)
        if note and note != message:
            out.append(note)
        return " / ".join(out)
    def _convert_date(self, text):
        day, month, year = text.split('.')
        return '-'.join([year, month, day])
    def _parse_currency(self, text):
        return float(text.replace(',', '.').replace(' ', ''))
    def to_gc(self):
        """Convert to GCTransaction"""
        return GCTransaction(
            date=self.date,
            deposit=self.amountd,
            withdrawal=self.amountw,
            description=self._description()
        )
class RaifTransactionParser(BaseTransactionParser):
    def parse_line(self, line):
        return RaifTransaction(line)
    def advance(self, line, lineno=None):
        """Choose parser state according to current line and line no."""
        if self.state.nxt:
            return ParserState(ths=True, nxt=False)
        if self.is_closed() and line.startswith(u'Transaction Date;Booking'):
            return ParserState(ths=False, nxt=True)
        return self.state
_parsers = {
    'CzMbank': MbankTransactionParser,
    'CzFio': FioTransactionParser,
    'CzRaif': RaifTransactionParser,
}
 |