aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py
blob: c2de5592693e422c1435acb52b73413c653ed65c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
'''
Importer for OTP Bank PDF account statements.
'''
__copyright__ = 'Copyright (c) 2020  Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'

from decimal import Decimal
import datetime as dt
import logging
import re
from typing import cast, Dict, Iterable, List, NamedTuple, Optional

from beancount.core import data
from beancount.core.amount import Amount
from beancount.core.number import D
from beancount.ingest.cache import _FileMemo as FileMemo
from beancount.ingest.importer import ImporterProtocol

from pdfminer.high_level import extract_pages
from pdfminer.layout import LTPage, LTTextContainer


STATEMENT_NAME_REGEX = re.compile(
    r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap ­rsz mla)_)kivonat_(?P<account>\d[\d-]*\d)_' +
    r'(?P<date>\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf')
CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile(
    r'.*Banksz[a ]mlakivonat_(?P<account>\d[\d-]*\d)_.+')
INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile(
    r'.*(Ertekpapirszamla|rt `kpap ­rsz mla)_kivonat_(?P<account>\d[\d-]*\d)_.+')
ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P<account>\d[\d-]*\d)')
CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P<currency>[A-Z]+)$')


class Total(NamedTuple):
    date: dt.date
    units: Decimal


def _append_total(entries: data.Entries,
                  meta: data.Meta,
                  account: str,
                  currency: str,
                  total: Optional[Total],
                  delta: dt.timedelta = dt.timedelta(days=0)) -> None:
    if not total:
        return
    date, units = total
    amount = Amount(units, currency)
    balance = data.Balance(meta, date + delta, account, amount, None, None)
    entries.append(balance)


def _find_label_y(page: LTPage, label: str) -> Optional[int]:
    for element in page:
        if isinstance(element, LTTextContainer) and element.get_text().strip() == label:
            return element.bbox[1]
    return None


def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]:
    for element in page:
        if isinstance(element, LTTextContainer):
            text = element.get_text().strip()
            if match := pattern.search(text):
                return match
    return None


class Importer(ImporterProtocol):
    '''
    Importer for OTP Bank PDF account statements.
    '''

    _log: logging.Logger
    _accounts: Dict[str, str]
    _extract_opening: bool

    def __init__(self, accounts: Dict[str, str], extract_opening: bool = False):
        self._log = logging.getLogger(type(self).__qualname__)
        self._accounts = accounts
        self._extract_opening = extract_opening

    def identify(self, file: FileMemo) -> bool:
        if match := STATEMENT_NAME_REGEX.match(file.name):
            return match.group('account') in self._accounts
        else:
            return False

    def file_name(self, file: FileMemo) -> str:
        if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
            account_number = match.group('account')
            return f'Bankszámlakivonat_{account_number}.pdf'
        elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
            account_number = match.group('account')
            return f'Értékpapírszámla_kivonat_{account_number}.pdf'
        else:
            raise RuntimeError(f'Not an account statement: {file.name}')

    def file_account(self, file: FileMemo) -> str:
        if match := STATEMENT_NAME_REGEX.match(file.name):
            account_number = match.group('account')
            return self._accounts[account_number]
        else:
            raise RuntimeError(f'Not an account statement: {file.name}')

    def file_date(self, file: FileMemo) -> dt.date:
        if match := STATEMENT_NAME_REGEX.match(file.name):
            date_str = match.group('date')
            return dt.datetime.strptime(date_str, '%Y.%m.%d').date()
        else:
            raise RuntimeError(f'Not an account statement: {file.name}')

    def extract(self, file: FileMemo) -> data.Entries:
        if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
            return []
        pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))]
        if not pages:
            return []
        entries: data.Entries = []
        meta = data.new_metadata(file.name, 1)
        if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX):
            account_name = self._accounts[account_match.group('account')]
        else:
            self._log.warning('No account number in %s', file.name)
            account_name = self.file_account(file)
        if currency_match := _find_match(pages[0], CURRENCY_REGEX):
            currency = currency_match.group('currency')
        else:
            self._log.warning('No currency number in %s', file.name)
            currency = 'HUF'
        if self._extract_opening:
            opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG')
            _append_total(entries, meta, account_name, currency, opening_balance)
        closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG')
        _append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1))
        return entries

    def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]:
        for page in pages:
            if total := self._extract_total_from_page(page, label):
                return total
        self._log.error('%s was not found in the pdf file', label)
        return None

    def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]:
        if total_y := _find_label_y(page, label):
            return self._extract_total_by_y(page, total_y)
        return None

    def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]:
        date: Optional[dt.date] = None
        units: Optional[Decimal] = None
        for element in page:
            if isinstance(element, LTTextContainer):
                x, y, x2, _ = element.bbox
                if abs(y - total_y) > 0.5:
                    continue
                elif abs(x - 34) <= 0.5:
                    date_str = element.get_text().strip()
                    if date is not None:
                        self._log.warning(
                            'Found date %s, but date was already set to %s',
                            date_str,
                            date)
                        continue
                    try:
                        date = dt.datetime.strptime(date_str, '%y.%m.%d').date()
                    except ValueError as exc:
                        self._log.warning(f'Invalid date {date_str}', exc_info=exc)
                elif abs(x2 - 572.68) <= 0.5:
                    units_str = element.get_text().strip().replace('.', '').replace(',', '.')
                    if units is not None:
                        self._log.warning(
                            'Found units %s, but units were already set to %s',
                            units_str,
                            units)
                    try:
                        units = D(units_str)
                    except ValueError as exc:
                        self._log.error('Invalid units %s', units_str, exc_info=exc)
        if not date:
            self._log.error('Date was not found at y=%d', total_y)
            return None
        if not units:
            self._log.error('Units were not found at y=%d', total_y)
            return None
        return Total(date, units)