''' Importer for OTP Bank CSV transaction history. ''' __copyright__ = 'Copyright (c) 2020 Kristóf Marussy ' __license__ = 'GNU GPLv2' import csv import datetime as dt from decimal import Decimal import logging from typing import Callable, Dict, Iterable, List, NamedTuple, Optional import re from os import path import beancount.core.amount as am from beancount.core.amount import Amount from beancount.core import data from beancount.core.number import ZERO from beancount.ingest.cache import _FileMemo as FileMemo from beancount.ingest.importer import ImporterProtocol from beancount_extras_kris7t.importers import utils from beancount_extras_kris7t.importers.utils import COMMENT_META, InvalidEntry, PAYEE_META, \ Posting from beancount_extras_kris7t.plugins.transfer_accounts import TRANSFER_ACCOUNT_META, \ TRANSFER_DATE_META OTP_BANK = 'OTP Bank' BOOKING_DATE_META = 'booking-date' ENTRY_TYPE_META = 'otpbank-entry-type' OTPBANK_CSV_TAG = 'otpbank-csv' PAYPASS_TAG = 'paypass' SIMPLE_TAG = 'processor-simple' CARD_REGEX = re.compile( r'^(?P\d{4}\.\d{2}\.\d{2})\s+(?P\d+)') PAYPASS_REGEX = re.compile(r'-ÉRINT(Ő|\?)|PPASS$', re.IGNORECASE) SIMPLE_REGEX = re.compile(r'-SIMPLE$', re.IGNORECASE) SMS_REGEX = re.compile( r'^(?P\d{4}\.\d{2}\.\d{2})\s+\((?P\d+) DB SMS\)', re.IGNORECASE) def _parse_date(date_str: str) -> dt.date: return utils.parse_date(date_str, '%Y%m%d') def _parse_card_date(date_str: str) -> dt.date: return utils.parse_date(date_str, '%Y.%m.%d') def _parse_number(amount_str: str) -> Decimal: cleaned_str = amount_str.replace('.', '').replace(',', '.') return utils.parse_number(cleaned_str) def _validate_cd(amount: Amount, cd: str) -> None: if cd == 'T': if amount.number >= ZERO: raise InvalidEntry(f'Invalid debit amount: {amount}') elif cd == 'J': if amount.number <= ZERO: raise InvalidEntry(f'Invalid credit amount: {amount}') else: raise InvalidEntry(f'Invalid credit/debit type: {cd}') class Conversion(NamedTuple): foreign_amount: Amount foreign_rate: Amount conversion_fee: Optional[Posting] class Card(NamedTuple): card_account: str card_date: dt.date class Row(utils.Row): account_name: str cd: str native_amount: Amount booking_date: dt.date value_date: dt.date _raw_payee: Optional[str] _raw_comment: str _conversion: Optional[Conversion] _card: Optional[Card] def __init__(self, file_name: str, index: int, row: List[str], accounts: Dict[str, str]): account_number, cd, amount_str, currency, booking_date_str, \ value_date_str, _, _, payee, comment1, comment2, comment3, entry_type, _, _ = row comment = f'{comment1}{comment2}{comment3}'.strip() super().__init__(file_name, index, entry_type, payee, comment) if account_name := accounts.get(account_number, None): self.account_name = account_name else: raise InvalidEntry(f'Unknown account number {account_number}') self.cd = cd self.native_amount = Amount(_parse_number(amount_str), currency) _validate_cd(self.native_amount, self.cd) self.booking_date = _parse_date(booking_date_str) self.value_date = _parse_date(value_date_str) self.tags.add(OTPBANK_CSV_TAG) self._raw_payee = payee self._raw_comment = comment self._conversion = None self._card = None @property def conversion(self) -> Optional[Conversion]: return self._conversion @conversion.setter def conversion(self, conversion: Optional[Conversion]) -> None: if self._conversion: raise InvalidEntry( f'Conversion {self._conversion} was already set for row ' + f' when trying to set it to {conversion}') self._conversion = conversion @property def card(self) -> Optional[Card]: return self._card @card.setter def card(self, card: Optional[Card]) -> None: if self._card: raise InvalidEntry( f'Card {self._card} was already set for row ' + f' when trying to set it to {card}') self._card = card @property def transacted_amount(self) -> Amount: if self.conversion: return self.conversion.foreign_amount return self.native_amount @property def _extended_meta(self) -> data.Meta: meta = dict(self.meta) if self.entry_type: meta[ENTRY_TYPE_META] = self.entry_type if self.booking_date != self.value_date: meta[BOOKING_DATE_META] = self.booking_date if self._raw_payee: meta[PAYEE_META] = self._raw_payee if self._raw_comment: meta[COMMENT_META] = self._raw_comment return meta @property def _card_meta(self) -> Optional[data.Meta]: if self._card: card_meta: data.Meta = { TRANSFER_ACCOUNT_META: self._card.card_account } if self._card.card_date != self.value_date: card_meta[TRANSFER_DATE_META] = self._card.card_date return card_meta else: return None def _to_transaction(self) -> data.Transaction: if not self.postings: raise InvalidEntry('No postings were extracted from this entry') if self.payee == '': payee = None else: payee = self.payee if self.comment == '' or self.comment == self.payee: if payee: desc = '' else: desc = self.entry_type or '' else: desc = self.comment if self._conversion: foreign_rate = self._conversion.foreign_rate conversion_fee = self._conversion.conversion_fee else: foreign_rate = None conversion_fee = None meta = self._extended_meta card_meta = self._card_meta postings = [ data.Posting(self.account_name, self.native_amount, None, None, None, None) ] if len(self.postings) == 1 and not foreign_rate: account, amount = self.postings[0] postings.append( data.Posting(account, utils.MISSING_AMOUNT, None, None, None, card_meta)) else: for account, amount in self.postings: postings.append( data.Posting(account, -amount, None, foreign_rate, None, card_meta)) if conversion_fee and conversion_fee.amount.number != ZERO: account, amount = conversion_fee postings.append( data.Posting(account, amount, None, None, None, None)) return data.Transaction( meta, self.value_date, self.flag, payee, desc, self.tags, self.links, postings) Extractor = Callable[[Row], None] def extract_foreign_currencies( currencies: Iterable[str], conversion_fees_account: str) -> Extractor: currencies_joined = '|'.join(currencies) currency_regex = re.compile( r'(?P\d+(,\d+)?)(?P' + currencies_joined + r')\s+(?P\d+(,\d+)?)$', re.IGNORECASE) def do_extract(row: Row) -> None: if currency_match := currency_regex.search(row.comment): foreign_amount_d = _parse_number(currency_match.group('amount')) if row.cd == 'T': foreign_amount_d *= -1 foreign_currency = currency_match.group('currency') foreign_amount = Amount( foreign_amount_d, foreign_currency.upper()) foreign_rate_d = _parse_number(currency_match.group('rate')) foreign_rate = Amount( foreign_rate_d, row.native_amount.currency) converted_amount = am.mul(foreign_rate, foreign_amount_d) conversion_fee_amount = am.sub( converted_amount, row.native_amount) conversion_fee = Posting( conversion_fees_account, conversion_fee_amount) row.conversion = Conversion( foreign_amount, foreign_rate, conversion_fee) row.comment = row.comment[:currency_match.start()].strip() return do_extract def extract_cards(card_accounts: Dict[str, str]) -> Extractor: def do_extract(row: Row) -> None: if row.entry_type.lower() not in [ 'vásárlás kártyával', 'bankkártyával kapcs. díj', ]: return if card_match := CARD_REGEX.search(row.comment): card_number = card_match.group('card_number') if card_number not in card_accounts: raise InvalidEntry(f'No account for card {card_number}') card_account = card_accounts[card_number] card_date = _parse_card_date(card_match.group('date')) row.card = Card(card_account, card_date) row.comment = row.comment[card_match.end():].strip() else: raise InvalidEntry( f'Cannot extract card information from: {row.comment}') if paypass_match := PAYPASS_REGEX.search(row.comment): row.comment = row.comment[:paypass_match.start()].strip() row.tags.add(PAYPASS_TAG) elif simple_match := PAYPASS_REGEX.search(row.comment): row.comment = row.comment[:simple_match.start()].strip() row.tags.add(SIMPLE_TAG) return do_extract def extract_sms(sms: str, sms_liability: str, sms_expense: str) -> Extractor: def do_extract(row: Row) -> None: if row.entry_type != 'OTPdirekt ÜZENETDÍJ': return if sms_match := SMS_REGEX.search(row.comment): card_account = sms_liability card_date = _parse_card_date(sms_match.group('date')) row.card = Card(card_account, card_date) sms_amount_d = _parse_number(sms_match.group('count')) foreign_amount = -Amount(sms_amount_d, sms) foreign_rate = am.div(-row.native_amount, sms_amount_d) row.conversion = Conversion( foreign_amount, foreign_rate, None) row.comment = 'SMS díj' row.payee = OTP_BANK row.assign_to_account(sms_expense) else: raise InvalidEntry( f'Cannot parse SMS transaction: {row.comment}') return do_extract class Importer(ImporterProtocol): ''' Importer for OTP Bank CSV transaction history. ''' _log: logging.Logger _accounts: Dict[str, str] _extracts: List[Extractor] def __init__(self, accounts: Dict[str, str], extractors: List[Extractor]): self._log = logging.getLogger(type(self).__qualname__) self._accounts = {number.replace('-', ''): name for number, name in accounts.items()} self._extractors = extractors def identify(self, file: FileMemo) -> bool: _, extension = path.splitext(file.name) if extension.lower() != '.csv': return False return self._find_account(file) is not None def _find_account(self, file: FileMemo) -> Optional[str]: head = file.head().strip().split('\n')[0] if head.count(';') != 14: return None for account_number, account_name in self._accounts.items(): if head.startswith(f'"{account_number}"'): return account_name return None def file_name(self, file: FileMemo) -> str: return 'otpbank.csv' def file_account(self, file: FileMemo) -> str: account_name = self._find_account(file) if not account_name: raise RuntimeError(f'Invalid account number in {file.name}') return account_name def file_date(self, file: FileMemo) -> Optional[dt.date]: ''' Files account statements according to the booking date of the last transaction. ''' date = None with open(file.name, 'r') as csv_file: for row in csv.reader(csv_file, delimiter=';'): date_str = row[4] try: date = _parse_date(date_str) except InvalidEntry as exc: self._log.error( 'Invalid entry in %s when looking for filing date', file.name, exc_info=exc) return None return date def extract(self, file: FileMemo) -> data.Entries: file_name = file.name entries: data.Entries = [] with open(file_name, 'r') as csv_file: last_date: Optional[dt.date] = None last_date_str = "" count_within_date = 1 for index, row_str in enumerate(csv.reader(csv_file, delimiter=';')): try: row = Row(file_name, index, row_str, self._accounts) if last_date != row.booking_date: last_date = row.booking_date last_date_str = last_date.strftime('%Y-%m-%d') count_within_date = 1 else: count_within_date += 1 row.links.add(f'otpbank_{last_date_str}_{count_within_date:03}') self._run_row_extractors(row) entries.append(row._to_transaction()) except InvalidEntry as exc: self._log.warning( 'Skipping invalid entry %d of %s', index, file_name, exc_info=exc) return entries def _run_row_extractors(self, row: Row): utils.run_row_extractors(row, self._extractors)