aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py
diff options
context:
space:
mode:
Diffstat (limited to 'beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py')
-rw-r--r--beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py187
1 files changed, 187 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py
new file mode 100644
index 0000000..c2de559
--- /dev/null
+++ b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py
@@ -0,0 +1,187 @@
1'''
2Importer for OTP Bank PDF account statements.
3'''
4__copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>'
5__license__ = 'GNU GPLv2'
6
7from decimal import Decimal
8import datetime as dt
9import logging
10import re
11from typing import cast, Dict, Iterable, List, NamedTuple, Optional
12
13from beancount.core import data
14from beancount.core.amount import Amount
15from beancount.core.number import D
16from beancount.ingest.cache import _FileMemo as FileMemo
17from beancount.ingest.importer import ImporterProtocol
18
19from pdfminer.high_level import extract_pages
20from pdfminer.layout import LTPage, LTTextContainer
21
22
23STATEMENT_NAME_REGEX = re.compile(
24 r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap ­rsz mla)_)kivonat_(?P<account>\d[\d-]*\d)_' +
25 r'(?P<date>\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf')
26CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile(
27 r'.*Banksz[a ]mlakivonat_(?P<account>\d[\d-]*\d)_.+')
28INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile(
29 r'.*(Ertekpapirszamla|rt `kpap ­rsz mla)_kivonat_(?P<account>\d[\d-]*\d)_.+')
30ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P<account>\d[\d-]*\d)')
31CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P<currency>[A-Z]+)$')
32
33
34class Total(NamedTuple):
35 date: dt.date
36 units: Decimal
37
38
39def _append_total(entries: data.Entries,
40 meta: data.Meta,
41 account: str,
42 currency: str,
43 total: Optional[Total],
44 delta: dt.timedelta = dt.timedelta(days=0)) -> None:
45 if not total:
46 return
47 date, units = total
48 amount = Amount(units, currency)
49 balance = data.Balance(meta, date + delta, account, amount, None, None)
50 entries.append(balance)
51
52
53def _find_label_y(page: LTPage, label: str) -> Optional[int]:
54 for element in page:
55 if isinstance(element, LTTextContainer) and element.get_text().strip() == label:
56 return element.bbox[1]
57 return None
58
59
60def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]:
61 for element in page:
62 if isinstance(element, LTTextContainer):
63 text = element.get_text().strip()
64 if match := pattern.search(text):
65 return match
66 return None
67
68
69class Importer(ImporterProtocol):
70 '''
71 Importer for OTP Bank PDF account statements.
72 '''
73
74 _log: logging.Logger
75 _accounts: Dict[str, str]
76 _extract_opening: bool
77
78 def __init__(self, accounts: Dict[str, str], extract_opening: bool = False):
79 self._log = logging.getLogger(type(self).__qualname__)
80 self._accounts = accounts
81 self._extract_opening = extract_opening
82
83 def identify(self, file: FileMemo) -> bool:
84 if match := STATEMENT_NAME_REGEX.match(file.name):
85 return match.group('account') in self._accounts
86 else:
87 return False
88
89 def file_name(self, file: FileMemo) -> str:
90 if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
91 account_number = match.group('account')
92 return f'Bankszámlakivonat_{account_number}.pdf'
93 elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
94 account_number = match.group('account')
95 return f'Értékpapírszámla_kivonat_{account_number}.pdf'
96 else:
97 raise RuntimeError(f'Not an account statement: {file.name}')
98
99 def file_account(self, file: FileMemo) -> str:
100 if match := STATEMENT_NAME_REGEX.match(file.name):
101 account_number = match.group('account')
102 return self._accounts[account_number]
103 else:
104 raise RuntimeError(f'Not an account statement: {file.name}')
105
106 def file_date(self, file: FileMemo) -> dt.date:
107 if match := STATEMENT_NAME_REGEX.match(file.name):
108 date_str = match.group('date')
109 return dt.datetime.strptime(date_str, '%Y.%m.%d').date()
110 else:
111 raise RuntimeError(f'Not an account statement: {file.name}')
112
113 def extract(self, file: FileMemo) -> data.Entries:
114 if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name):
115 return []
116 pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))]
117 if not pages:
118 return []
119 entries: data.Entries = []
120 meta = data.new_metadata(file.name, 1)
121 if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX):
122 account_name = self._accounts[account_match.group('account')]
123 else:
124 self._log.warning('No account number in %s', file.name)
125 account_name = self.file_account(file)
126 if currency_match := _find_match(pages[0], CURRENCY_REGEX):
127 currency = currency_match.group('currency')
128 else:
129 self._log.warning('No currency number in %s', file.name)
130 currency = 'HUF'
131 if self._extract_opening:
132 opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG')
133 _append_total(entries, meta, account_name, currency, opening_balance)
134 closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG')
135 _append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1))
136 return entries
137
138 def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]:
139 for page in pages:
140 if total := self._extract_total_from_page(page, label):
141 return total
142 self._log.error('%s was not found in the pdf file', label)
143 return None
144
145 def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]:
146 if total_y := _find_label_y(page, label):
147 return self._extract_total_by_y(page, total_y)
148 return None
149
150 def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]:
151 date: Optional[dt.date] = None
152 units: Optional[Decimal] = None
153 for element in page:
154 if isinstance(element, LTTextContainer):
155 x, y, x2, _ = element.bbox
156 if abs(y - total_y) > 0.5:
157 continue
158 elif abs(x - 34) <= 0.5:
159 date_str = element.get_text().strip()
160 if date is not None:
161 self._log.warning(
162 'Found date %s, but date was already set to %s',
163 date_str,
164 date)
165 continue
166 try:
167 date = dt.datetime.strptime(date_str, '%y.%m.%d').date()
168 except ValueError as exc:
169 self._log.warning(f'Invalid date {date_str}', exc_info=exc)
170 elif abs(x2 - 572.68) <= 0.5:
171 units_str = element.get_text().strip().replace('.', '').replace(',', '.')
172 if units is not None:
173 self._log.warning(
174 'Found units %s, but units were already set to %s',
175 units_str,
176 units)
177 try:
178 units = D(units_str)
179 except ValueError as exc:
180 self._log.error('Invalid units %s', units_str, exc_info=exc)
181 if not date:
182 self._log.error('Date was not found at y=%d', total_y)
183 return None
184 if not units:
185 self._log.error('Units were not found at y=%d', total_y)
186 return None
187 return Total(date, units)