''' Importer for OTP Bank PDF account statements. ''' __copyright__ = 'Copyright (c) 2020 Kristóf Marussy ' __license__ = 'GNU GPLv2' from decimal import Decimal import datetime as dt import logging import re from typing import cast, Dict, Iterable, List, NamedTuple, Optional from beancount.core import data from beancount.core.amount import Amount from beancount.core.number import D from beancount.ingest.cache import _FileMemo as FileMemo from beancount.ingest.importer import ImporterProtocol from pdfminer.high_level import extract_pages from pdfminer.layout import LTPage, LTTextContainer STATEMENT_NAME_REGEX = re.compile( r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap ­rsz mla)_)kivonat_(?P\d[\d-]*\d)_' + r'(?P\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf') CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( r'.*Banksz[a ]mlakivonat_(?P\d[\d-]*\d)_.+') INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( r'.*(Ertekpapirszamla|rt `kpap ­rsz mla)_kivonat_(?P\d[\d-]*\d)_.+') ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P\d[\d-]*\d)') CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P[A-Z]+)$') class Total(NamedTuple): date: dt.date units: Decimal def _append_total(entries: data.Entries, meta: data.Meta, account: str, currency: str, total: Optional[Total], delta: dt.timedelta = dt.timedelta(days=0)) -> None: if not total: return date, units = total amount = Amount(units, currency) balance = data.Balance(meta, date + delta, account, amount, None, None) entries.append(balance) def _find_label_y(page: LTPage, label: str) -> Optional[int]: for element in page: if isinstance(element, LTTextContainer) and element.get_text().strip() == label: return element.bbox[1] return None def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]: for element in page: if isinstance(element, LTTextContainer): text = element.get_text().strip() if match := pattern.search(text): return match return None class Importer(ImporterProtocol): ''' Importer for OTP Bank PDF account statements. ''' _log: logging.Logger _accounts: Dict[str, str] _extract_opening: bool def __init__(self, accounts: Dict[str, str], extract_opening: bool = False): self._log = logging.getLogger(type(self).__qualname__) self._accounts = accounts self._extract_opening = extract_opening def identify(self, file: FileMemo) -> bool: if match := STATEMENT_NAME_REGEX.match(file.name): return match.group('account') in self._accounts else: return False def file_name(self, file: FileMemo) -> str: if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): account_number = match.group('account') return f'Bankszámlakivonat_{account_number}.pdf' elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): account_number = match.group('account') return f'Értékpapírszámla_kivonat_{account_number}.pdf' else: raise RuntimeError(f'Not an account statement: {file.name}') def file_account(self, file: FileMemo) -> str: if match := STATEMENT_NAME_REGEX.match(file.name): account_number = match.group('account') return self._accounts[account_number] else: raise RuntimeError(f'Not an account statement: {file.name}') def file_date(self, file: FileMemo) -> dt.date: if match := STATEMENT_NAME_REGEX.match(file.name): date_str = match.group('date') return dt.datetime.strptime(date_str, '%Y.%m.%d').date() else: raise RuntimeError(f'Not an account statement: {file.name}') def extract(self, file: FileMemo) -> data.Entries: if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): return [] pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))] if not pages: return [] entries: data.Entries = [] meta = data.new_metadata(file.name, 1) if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX): account_name = self._accounts[account_match.group('account')] else: self._log.warning('No account number in %s', file.name) account_name = self.file_account(file) if currency_match := _find_match(pages[0], CURRENCY_REGEX): currency = currency_match.group('currency') else: self._log.warning('No currency number in %s', file.name) currency = 'HUF' if self._extract_opening: opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG') _append_total(entries, meta, account_name, currency, opening_balance) closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG') _append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1)) return entries def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]: for page in pages: if total := self._extract_total_from_page(page, label): return total self._log.error('%s was not found in the pdf file', label) return None def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]: if total_y := _find_label_y(page, label): return self._extract_total_by_y(page, total_y) return None def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]: date: Optional[dt.date] = None units: Optional[Decimal] = None for element in page: if isinstance(element, LTTextContainer): x, y, x2, _ = element.bbox if abs(y - total_y) > 0.5: continue elif abs(x - 34) <= 0.5: date_str = element.get_text().strip() if date is not None: self._log.warning( 'Found date %s, but date was already set to %s', date_str, date) continue try: date = dt.datetime.strptime(date_str, '%y.%m.%d').date() except ValueError as exc: self._log.warning(f'Invalid date {date_str}', exc_info=exc) elif abs(x2 - 572.68) <= 0.5: units_str = element.get_text().strip().replace('.', '').replace(',', '.') if units is not None: self._log.warning( 'Found units %s, but units were already set to %s', units_str, units) try: units = D(units_str) except ValueError as exc: self._log.error('Invalid units %s', units_str, exc_info=exc) if not date: self._log.error('Date was not found at y=%d', total_y) return None if not units: self._log.error('Units were not found at y=%d', total_y) return None return Total(date, units)