From a1c2a999e449054d6641bbb633954e45fcd63f90 Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Mon, 25 Jan 2021 01:14:28 +0100 Subject: Add plugins and importers from private config The importers are missing tests, because not having any specifications for the import formats means we must use real, private data as test inputs --- .../importers/otpbank/otpbank_csv.py | 370 +++++++++++++++++++++ 1 file changed, 370 insertions(+) create mode 100644 beancount_extras_kris7t/importers/otpbank/otpbank_csv.py (limited to 'beancount_extras_kris7t/importers/otpbank/otpbank_csv.py') diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py b/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py new file mode 100644 index 0000000..9e12ec3 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py @@ -0,0 +1,370 @@ +''' +Importer for OTP Bank CSV transaction history. +''' +__copyright__ = 'Copyright (c) 2020 Kristóf Marussy ' +__license__ = 'GNU GPLv2' + +import csv +import datetime as dt +from decimal import Decimal +import logging +from typing import Callable, Dict, Iterable, List, NamedTuple, Optional +import re +from os import path + +import beancount.core.amount as am +from beancount.core.amount import Amount +from beancount.core import data +from beancount.core.number import ZERO +from beancount.ingest.cache import _FileMemo as FileMemo +from beancount.ingest.importer import ImporterProtocol + +from beancount_extras_kris7t.importers import utils +from beancount_extras_kris7t.importers.utils import COMMENT_META, InvalidEntry, PAYEE_META, \ + Posting +from beancount_extras_kris7t.plugins.transfer_accounts import TRANSFER_ACCOUNT_META, \ + TRANSFER_DATE_META + +OTP_BANK = 'OTP Bank' +BOOKING_DATE_META = 'booking-date' +ENTRY_TYPE_META = 'otpbank-entry-type' +OTPBANK_CSV_TAG = 'otpbank-csv' +PAYPASS_TAG = 'paypass' +SIMPLE_TAG = 'processor-simple' +CARD_REGEX = re.compile( + r'^(?P\d{4}\.\d{2}\.\d{2})\s+(?P\d+)') +PAYPASS_REGEX = re.compile(r'-ÉRINT(Ő|\?)|PPASS$', re.IGNORECASE) +SIMPLE_REGEX = re.compile(r'-SIMPLE$', re.IGNORECASE) +SMS_REGEX = re.compile( + r'^(?P\d{4}\.\d{2}\.\d{2})\s+\((?P\d+) DB SMS\)', + re.IGNORECASE) + + +def _parse_date(date_str: str) -> dt.date: + return utils.parse_date(date_str, '%Y%m%d') + + +def _parse_card_date(date_str: str) -> dt.date: + return utils.parse_date(date_str, '%Y.%m.%d') + + +def _parse_number(amount_str: str) -> Decimal: + cleaned_str = amount_str.replace('.', '').replace(',', '.') + return utils.parse_number(cleaned_str) + + +def _validate_cd(amount: Amount, cd: str) -> None: + if cd == 'T': + if amount.number >= ZERO: + raise InvalidEntry(f'Invalid debit amount: {amount}') + elif cd == 'J': + if amount.number <= ZERO: + raise InvalidEntry(f'Invalid credit amount: {amount}') + else: + raise InvalidEntry(f'Invalid credit/debit type: {cd}') + + +class Conversion(NamedTuple): + foreign_amount: Amount + foreign_rate: Amount + conversion_fee: Optional[Posting] + + +class Card(NamedTuple): + card_account: str + card_date: dt.date + + +class Row(utils.Row): + account_name: str + cd: str + native_amount: Amount + booking_date: dt.date + value_date: dt.date + _raw_payee: Optional[str] + _raw_comment: str + _conversion: Optional[Conversion] + _card: Optional[Card] + + def __init__(self, file_name: str, index: int, row: List[str], accounts: Dict[str, str]): + account_number, cd, amount_str, currency, booking_date_str, \ + value_date_str, _, _, payee, comment1, comment2, comment3, entry_type, _, _ = row + comment = f'{comment1}{comment2}{comment3}'.strip() + super().__init__(file_name, index, entry_type, payee, comment) + if account_name := accounts.get(account_number, None): + self.account_name = account_name + else: + raise InvalidEntry(f'Unknown account number {account_number}') + self.cd = cd + self.native_amount = Amount(_parse_number(amount_str), currency) + _validate_cd(self.native_amount, self.cd) + self.booking_date = _parse_date(booking_date_str) + self.value_date = _parse_date(value_date_str) + self.tags.add(OTPBANK_CSV_TAG) + self._raw_payee = payee + self._raw_comment = comment + self._conversion = None + self._card = None + + @property + def conversion(self) -> Optional[Conversion]: + return self._conversion + + @conversion.setter + def conversion(self, conversion: Optional[Conversion]) -> None: + if self._conversion: + raise InvalidEntry( + f'Conversion {self._conversion} was already set for row ' + + f' when trying to set it to {conversion}') + self._conversion = conversion + + @property + def card(self) -> Optional[Card]: + return self._card + + @card.setter + def card(self, card: Optional[Card]) -> None: + if self._card: + raise InvalidEntry( + f'Card {self._card} was already set for row ' + + f' when trying to set it to {card}') + self._card = card + + @property + def transacted_amount(self) -> Amount: + if self.conversion: + return self.conversion.foreign_amount + return self.native_amount + + @property + def _extended_meta(self) -> data.Meta: + meta = dict(self.meta) + if self.entry_type: + meta[ENTRY_TYPE_META] = self.entry_type + if self.booking_date != self.value_date: + meta[BOOKING_DATE_META] = self.booking_date + if self._raw_payee: + meta[PAYEE_META] = self._raw_payee + if self._raw_comment: + meta[COMMENT_META] = self._raw_comment + return meta + + @property + def _card_meta(self) -> Optional[data.Meta]: + if self._card: + card_meta: data.Meta = { + TRANSFER_ACCOUNT_META: self._card.card_account + } + if self._card.card_date != self.value_date: + card_meta[TRANSFER_DATE_META] = self._card.card_date + return card_meta + else: + return None + + def _to_transaction(self) -> data.Transaction: + if not self.postings: + raise InvalidEntry('No postings were extracted from this entry') + if self.payee == '': + payee = None + else: + payee = self.payee + if self.comment == '' or self.comment == self.payee: + if payee: + desc = '' + else: + desc = self.entry_type or '' + else: + desc = self.comment + if self._conversion: + foreign_rate = self._conversion.foreign_rate + conversion_fee = self._conversion.conversion_fee + else: + foreign_rate = None + conversion_fee = None + meta = self._extended_meta + card_meta = self._card_meta + postings = [ + data.Posting(self.account_name, self.native_amount, None, None, None, None) + ] + if len(self.postings) == 1 and not foreign_rate: + account, amount = self.postings[0] + postings.append( + data.Posting(account, utils.MISSING_AMOUNT, None, None, None, card_meta)) + else: + for account, amount in self.postings: + postings.append( + data.Posting(account, -amount, None, foreign_rate, None, card_meta)) + if conversion_fee and conversion_fee.amount.number != ZERO: + account, amount = conversion_fee + postings.append( + data.Posting(account, amount, None, None, None, None)) + return data.Transaction( + meta, self.value_date, self.flag, payee, desc, self.tags, self.links, postings) + + +Extractor = Callable[[Row], None] + + +def extract_foreign_currencies( + currencies: Iterable[str], conversion_fees_account: str) -> Extractor: + currencies_joined = '|'.join(currencies) + currency_regex = re.compile( + r'(?P\d+(,\d+)?)(?P' + currencies_joined + + r')\s+(?P\d+(,\d+)?)$', re.IGNORECASE) + + def do_extract(row: Row) -> None: + if currency_match := currency_regex.search(row.comment): + foreign_amount_d = _parse_number(currency_match.group('amount')) + if row.cd == 'T': + foreign_amount_d *= -1 + foreign_currency = currency_match.group('currency') + foreign_amount = Amount( + foreign_amount_d, foreign_currency.upper()) + foreign_rate_d = _parse_number(currency_match.group('rate')) + foreign_rate = Amount( + foreign_rate_d, row.native_amount.currency) + converted_amount = am.mul(foreign_rate, foreign_amount_d) + conversion_fee_amount = am.sub( + converted_amount, row.native_amount) + conversion_fee = Posting( + conversion_fees_account, conversion_fee_amount) + row.conversion = Conversion( + foreign_amount, foreign_rate, conversion_fee) + row.comment = row.comment[:currency_match.start()].strip() + return do_extract + + +def extract_cards(card_accounts: Dict[str, str]) -> Extractor: + def do_extract(row: Row) -> None: + if row.entry_type.lower() not in [ + 'vásárlás kártyával', + 'bankkártyával kapcs. díj', + ]: + return + if card_match := CARD_REGEX.search(row.comment): + card_number = card_match.group('card_number') + if card_number not in card_accounts: + raise InvalidEntry(f'No account for card {card_number}') + card_account = card_accounts[card_number] + card_date = _parse_card_date(card_match.group('date')) + row.card = Card(card_account, card_date) + row.comment = row.comment[card_match.end():].strip() + else: + raise InvalidEntry( + f'Cannot extract card information from: {row.comment}') + if paypass_match := PAYPASS_REGEX.search(row.comment): + row.comment = row.comment[:paypass_match.start()].strip() + row.tags.add(PAYPASS_TAG) + elif simple_match := PAYPASS_REGEX.search(row.comment): + row.comment = row.comment[:simple_match.start()].strip() + row.tags.add(SIMPLE_TAG) + return do_extract + + +def extract_sms(sms: str, sms_liability: str, sms_expense: str) -> Extractor: + def do_extract(row: Row) -> None: + if row.entry_type != 'OTPdirekt ÜZENETDÍJ': + return + if sms_match := SMS_REGEX.search(row.comment): + card_account = sms_liability + card_date = _parse_card_date(sms_match.group('date')) + row.card = Card(card_account, card_date) + sms_amount_d = _parse_number(sms_match.group('count')) + foreign_amount = -Amount(sms_amount_d, sms) + foreign_rate = am.div(-row.native_amount, sms_amount_d) + row.conversion = Conversion( + foreign_amount, foreign_rate, None) + row.comment = 'SMS díj' + row.payee = OTP_BANK + row.assign_to_account(sms_expense) + else: + raise InvalidEntry( + f'Cannot parse SMS transaction: {row.comment}') + return do_extract + + +class Importer(ImporterProtocol): + ''' + Importer for OTP Bank CSV transaction history. + ''' + + _log: logging.Logger + _accounts: Dict[str, str] + _extracts: List[Extractor] + + def __init__(self, + accounts: Dict[str, str], + extractors: List[Extractor]): + self._log = logging.getLogger(type(self).__qualname__) + self._accounts = {number.replace('-', ''): name for number, name in accounts.items()} + self._extractors = extractors + + def identify(self, file: FileMemo) -> bool: + _, extension = path.splitext(file.name) + if extension.lower() != '.csv': + return False + return self._find_account(file) is not None + + def _find_account(self, file: FileMemo) -> Optional[str]: + head = file.head().strip().split('\n')[0] + if head.count(';') != 14: + return None + for account_number, account_name in self._accounts.items(): + if head.startswith(f'"{account_number}"'): + return account_name + return None + + def file_name(self, file: FileMemo) -> str: + return 'otpbank.csv' + + def file_account(self, file: FileMemo) -> str: + account_name = self._find_account(file) + if not account_name: + raise RuntimeError(f'Invalid account number in {file.name}') + return account_name + + def file_date(self, file: FileMemo) -> Optional[dt.date]: + ''' + Files account statements according to the booking date of the last + transaction. + ''' + date = None + with open(file.name, 'r') as csv_file: + for row in csv.reader(csv_file, delimiter=';'): + date_str = row[4] + try: + date = _parse_date(date_str) + except InvalidEntry as exc: + self._log.error( + 'Invalid entry in %s when looking for filing date', + file.name, exc_info=exc) + return None + return date + + def extract(self, file: FileMemo) -> data.Entries: + file_name = file.name + entries: data.Entries = [] + with open(file_name, 'r') as csv_file: + last_date: Optional[dt.date] = None + last_date_str = "" + count_within_date = 1 + for index, row_str in enumerate(csv.reader(csv_file, delimiter=';')): + try: + row = Row(file_name, index, row_str, self._accounts) + if last_date != row.booking_date: + last_date = row.booking_date + last_date_str = last_date.strftime('%Y-%m-%d') + count_within_date = 1 + else: + count_within_date += 1 + row.links.add(f'otpbank_{last_date_str}_{count_within_date:03}') + self._run_row_extractors(row) + entries.append(row._to_transaction()) + except InvalidEntry as exc: + self._log.warning( + 'Skipping invalid entry %d of %s', + index, file_name, exc_info=exc) + return entries + + def _run_row_extractors(self, row: Row): + utils.run_row_extractors(row, self._extractors) -- cgit v1.2.3-54-g00ecf