'''
Importer for OTP Bank PDF account statements.
'''
__copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'
from decimal import Decimal
import datetime as dt
import logging
import re
from typing import cast, Dict, Iterable, List, NamedTuple, Optional
from beancount.core import data
from beancount.core.amount import Amount
from beancount.core.number import D
from beancount.ingest.cache import _FileMemo as FileMemo
from beancount.ingest.importer import ImporterProtocol
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTPage, LTTextContainer
STATEMENT_NAME_REGEX = re.compile(
r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap rsz mla)_)kivonat_(?P<account>\d[\d-]*\d)_' +
r'(?P<date>\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf')
CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile(
r'.*Banksz[a ]mlakivonat_(?P<account>\d[\d-]*\d)_.+')
INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile(
r'.*(Ertekpapirszamla|rt `kpap rsz mla)_kivonat_(?P<account>\d[\d-]*\d)_.+')
ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P<account>\d[\d-]*\d)')
CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P<currency>[A-Z]+)$')
class Total(NamedTuple):
date: dt.date
units: Decimal
def _append_total(entries: data.Entries,
meta: data.Meta,
account: str,
currency: str,
total: Optional[Total],
delta: dt.timedelta = dt.timedelta(days=0)) -> None:
if not total:
return
date, units = total
amount = Amount(units, currency)
balance = data.Balance(meta, date + delta, account, amount, None, None)
entries.append(balance)
def _find_label_y(page: LTPage, label: str) -> Optional[int]:
for element in page:
if isinstance(element, LTTextContainer) and element.get_text().strip() == label:
return element.bbox[1]
return None
def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]:
for element in page:
if isinstance(element, LTTextContainer):
text = element.get_text().strip()
if match := pattern.search(text):
return match
return None
class Importer(ImporterProtocol):
'''
Importer for OTP Bank PDF account statements.
'''
_log: logging.Logger
_accounts: Dict[str, str]
_extract_opening: bool
def __init__(self, accounts: Dict[str, str], extract_opening: bool = False):
self._log = logging.getLogger(type(self).__qualname__)
self._accounts = accounts
self._extract_opening = extract_opening
def identify(self, file: FileMemo) -> bool:
if match := STATEMENT_NAME_REGEX.match(file.name):
return match.group('account') in self._accounts
else:
return False
def file_name(self, file: FileMemo) -> str:
if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
account_number = match.group('account')
return f'Bankszámlakivonat_{account_number}.pdf'
elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
account_number = match.group('account')
return f'Értékpapírszámla_kivonat_{account_number}.pdf'
else:
raise RuntimeError(f'Not an account statement: {file.name}')
def file_account(self, file: FileMemo) -> str:
if match := STATEMENT_NAME_REGEX.match(file.name):
account_number = match.group('account')
return self._accounts[account_number]
else:
raise RuntimeError(f'Not an account statement: {file.name}')
def file_date(self, file: FileMemo) -> dt.date:
if match := STATEMENT_NAME_REGEX.match(file.name):
date_str = match.group('date')
return dt.datetime.strptime(date_str, '%Y.%m.%d').date()
else:
raise RuntimeError(f'Not an account statement: {file.name}')
def extract(self, file: FileMemo) -> data.Entries:
if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
return []
pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))]
if not pages:
return []
entries: data.Entries = []
meta = data.new_metadata(file.name, 1)
if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX):
account_name = self._accounts[account_match.group('account')]
else:
self._log.warning('No account number in %s', file.name)
account_name = self.file_account(file)
if currency_match := _find_match(pages[0], CURRENCY_REGEX):
currency = currency_match.group('currency')
else:
self._log.warning('No currency number in %s', file.name)
currency = 'HUF'
if self._extract_opening:
opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG')
_append_total(entries, meta, account_name, currency, opening_balance)
closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG')
_append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1))
return entries
def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]:
for page in pages:
if total := self._extract_total_from_page(page, label):
return total
self._log.error('%s was not found in the pdf file', label)
return None
def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]:
if total_y := _find_label_y(page, label):
return self._extract_total_by_y(page, total_y)
return None
def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]:
date: Optional[dt.date] = None
units: Optional[Decimal] = None
for element in page:
if isinstance(element, LTTextContainer):
x, y, x2, _ = element.bbox
if abs(y - total_y) > 0.5:
continue
elif abs(x - 34) <= 0.5:
date_str = element.get_text().strip()
if date is not None:
self._log.warning(
'Found date %s, but date was already set to %s',
date_str,
date)
continue
try:
date = dt.datetime.strptime(date_str, '%y.%m.%d').date()
except ValueError as exc:
self._log.warning(f'Invalid date {date_str}', exc_info=exc)
elif abs(x2 - 572.68) <= 0.5:
units_str = element.get_text().strip().replace('.', '').replace(',', '.')
if units is not None:
self._log.warning(
'Found units %s, but units were already set to %s',
units_str,
units)
try:
units = D(units_str)
except ValueError as exc:
self._log.error('Invalid units %s', units_str, exc_info=exc)
if not date:
self._log.error('Date was not found at y=%d', total_y)
return None
if not units:
self._log.error('Units were not found at y=%d', total_y)
return None
return Total(date, units)