diff options
Diffstat (limited to 'beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py')
-rw-r--r-- | beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py new file mode 100644 index 0000000..c2de559 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py | |||
@@ -0,0 +1,187 @@ | |||
1 | ''' | ||
2 | Importer for OTP Bank PDF account statements. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | from decimal import Decimal | ||
8 | import datetime as dt | ||
9 | import logging | ||
10 | import re | ||
11 | from typing import cast, Dict, Iterable, List, NamedTuple, Optional | ||
12 | |||
13 | from beancount.core import data | ||
14 | from beancount.core.amount import Amount | ||
15 | from beancount.core.number import D | ||
16 | from beancount.ingest.cache import _FileMemo as FileMemo | ||
17 | from beancount.ingest.importer import ImporterProtocol | ||
18 | |||
19 | from pdfminer.high_level import extract_pages | ||
20 | from pdfminer.layout import LTPage, LTTextContainer | ||
21 | |||
22 | |||
23 | STATEMENT_NAME_REGEX = re.compile( | ||
24 | r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap rsz mla)_)kivonat_(?P<account>\d[\d-]*\d)_' + | ||
25 | r'(?P<date>\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf') | ||
26 | CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( | ||
27 | r'.*Banksz[a ]mlakivonat_(?P<account>\d[\d-]*\d)_.+') | ||
28 | INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( | ||
29 | r'.*(Ertekpapirszamla|rt `kpap rsz mla)_kivonat_(?P<account>\d[\d-]*\d)_.+') | ||
30 | ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P<account>\d[\d-]*\d)') | ||
31 | CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P<currency>[A-Z]+)$') | ||
32 | |||
33 | |||
34 | class Total(NamedTuple): | ||
35 | date: dt.date | ||
36 | units: Decimal | ||
37 | |||
38 | |||
39 | def _append_total(entries: data.Entries, | ||
40 | meta: data.Meta, | ||
41 | account: str, | ||
42 | currency: str, | ||
43 | total: Optional[Total], | ||
44 | delta: dt.timedelta = dt.timedelta(days=0)) -> None: | ||
45 | if not total: | ||
46 | return | ||
47 | date, units = total | ||
48 | amount = Amount(units, currency) | ||
49 | balance = data.Balance(meta, date + delta, account, amount, None, None) | ||
50 | entries.append(balance) | ||
51 | |||
52 | |||
53 | def _find_label_y(page: LTPage, label: str) -> Optional[int]: | ||
54 | for element in page: | ||
55 | if isinstance(element, LTTextContainer) and element.get_text().strip() == label: | ||
56 | return element.bbox[1] | ||
57 | return None | ||
58 | |||
59 | |||
60 | def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]: | ||
61 | for element in page: | ||
62 | if isinstance(element, LTTextContainer): | ||
63 | text = element.get_text().strip() | ||
64 | if match := pattern.search(text): | ||
65 | return match | ||
66 | return None | ||
67 | |||
68 | |||
69 | class Importer(ImporterProtocol): | ||
70 | ''' | ||
71 | Importer for OTP Bank PDF account statements. | ||
72 | ''' | ||
73 | |||
74 | _log: logging.Logger | ||
75 | _accounts: Dict[str, str] | ||
76 | _extract_opening: bool | ||
77 | |||
78 | def __init__(self, accounts: Dict[str, str], extract_opening: bool = False): | ||
79 | self._log = logging.getLogger(type(self).__qualname__) | ||
80 | self._accounts = accounts | ||
81 | self._extract_opening = extract_opening | ||
82 | |||
83 | def identify(self, file: FileMemo) -> bool: | ||
84 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
85 | return match.group('account') in self._accounts | ||
86 | else: | ||
87 | return False | ||
88 | |||
89 | def file_name(self, file: FileMemo) -> str: | ||
90 | if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
91 | account_number = match.group('account') | ||
92 | return f'Bankszámlakivonat_{account_number}.pdf' | ||
93 | elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
94 | account_number = match.group('account') | ||
95 | return f'Értékpapírszámla_kivonat_{account_number}.pdf' | ||
96 | else: | ||
97 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
98 | |||
99 | def file_account(self, file: FileMemo) -> str: | ||
100 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
101 | account_number = match.group('account') | ||
102 | return self._accounts[account_number] | ||
103 | else: | ||
104 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
105 | |||
106 | def file_date(self, file: FileMemo) -> dt.date: | ||
107 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
108 | date_str = match.group('date') | ||
109 | return dt.datetime.strptime(date_str, '%Y.%m.%d').date() | ||
110 | else: | ||
111 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
112 | |||
113 | def extract(self, file: FileMemo) -> data.Entries: | ||
114 | if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
115 | return [] | ||
116 | pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))] | ||
117 | if not pages: | ||
118 | return [] | ||
119 | entries: data.Entries = [] | ||
120 | meta = data.new_metadata(file.name, 1) | ||
121 | if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX): | ||
122 | account_name = self._accounts[account_match.group('account')] | ||
123 | else: | ||
124 | self._log.warning('No account number in %s', file.name) | ||
125 | account_name = self.file_account(file) | ||
126 | if currency_match := _find_match(pages[0], CURRENCY_REGEX): | ||
127 | currency = currency_match.group('currency') | ||
128 | else: | ||
129 | self._log.warning('No currency number in %s', file.name) | ||
130 | currency = 'HUF' | ||
131 | if self._extract_opening: | ||
132 | opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG') | ||
133 | _append_total(entries, meta, account_name, currency, opening_balance) | ||
134 | closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG') | ||
135 | _append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1)) | ||
136 | return entries | ||
137 | |||
138 | def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]: | ||
139 | for page in pages: | ||
140 | if total := self._extract_total_from_page(page, label): | ||
141 | return total | ||
142 | self._log.error('%s was not found in the pdf file', label) | ||
143 | return None | ||
144 | |||
145 | def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]: | ||
146 | if total_y := _find_label_y(page, label): | ||
147 | return self._extract_total_by_y(page, total_y) | ||
148 | return None | ||
149 | |||
150 | def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]: | ||
151 | date: Optional[dt.date] = None | ||
152 | units: Optional[Decimal] = None | ||
153 | for element in page: | ||
154 | if isinstance(element, LTTextContainer): | ||
155 | x, y, x2, _ = element.bbox | ||
156 | if abs(y - total_y) > 0.5: | ||
157 | continue | ||
158 | elif abs(x - 34) <= 0.5: | ||
159 | date_str = element.get_text().strip() | ||
160 | if date is not None: | ||
161 | self._log.warning( | ||
162 | 'Found date %s, but date was already set to %s', | ||
163 | date_str, | ||
164 | date) | ||
165 | continue | ||
166 | try: | ||
167 | date = dt.datetime.strptime(date_str, '%y.%m.%d').date() | ||
168 | except ValueError as exc: | ||
169 | self._log.warning(f'Invalid date {date_str}', exc_info=exc) | ||
170 | elif abs(x2 - 572.68) <= 0.5: | ||
171 | units_str = element.get_text().strip().replace('.', '').replace(',', '.') | ||
172 | if units is not None: | ||
173 | self._log.warning( | ||
174 | 'Found units %s, but units were already set to %s', | ||
175 | units_str, | ||
176 | units) | ||
177 | try: | ||
178 | units = D(units_str) | ||
179 | except ValueError as exc: | ||
180 | self._log.error('Invalid units %s', units_str, exc_info=exc) | ||
181 | if not date: | ||
182 | self._log.error('Date was not found at y=%d', total_y) | ||
183 | return None | ||
184 | if not units: | ||
185 | self._log.error('Units were not found at y=%d', total_y) | ||
186 | return None | ||
187 | return Total(date, units) | ||