diff options
author | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
commit | a1c2a999e449054d6641bbb633954e45fcd63f90 (patch) | |
tree | 47628c10ded721d66e47b5f87f501293cd8af003 /beancount_extras_kris7t/importers/otpbank | |
parent | Initialize package (diff) | |
download | beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.gz beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.zst beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.zip |
Add plugins and importers from private config
The importers are missing tests, because not having any specifications
for the import formats means we must use real, private data as test inputs
Diffstat (limited to 'beancount_extras_kris7t/importers/otpbank')
3 files changed, 557 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/otpbank/__init__.py b/beancount_extras_kris7t/importers/otpbank/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/__init__.py | |||
diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py b/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py new file mode 100644 index 0000000..9e12ec3 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py | |||
@@ -0,0 +1,370 @@ | |||
1 | ''' | ||
2 | Importer for OTP Bank CSV transaction history. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | import csv | ||
8 | import datetime as dt | ||
9 | from decimal import Decimal | ||
10 | import logging | ||
11 | from typing import Callable, Dict, Iterable, List, NamedTuple, Optional | ||
12 | import re | ||
13 | from os import path | ||
14 | |||
15 | import beancount.core.amount as am | ||
16 | from beancount.core.amount import Amount | ||
17 | from beancount.core import data | ||
18 | from beancount.core.number import ZERO | ||
19 | from beancount.ingest.cache import _FileMemo as FileMemo | ||
20 | from beancount.ingest.importer import ImporterProtocol | ||
21 | |||
22 | from beancount_extras_kris7t.importers import utils | ||
23 | from beancount_extras_kris7t.importers.utils import COMMENT_META, InvalidEntry, PAYEE_META, \ | ||
24 | Posting | ||
25 | from beancount_extras_kris7t.plugins.transfer_accounts import TRANSFER_ACCOUNT_META, \ | ||
26 | TRANSFER_DATE_META | ||
27 | |||
28 | OTP_BANK = 'OTP Bank' | ||
29 | BOOKING_DATE_META = 'booking-date' | ||
30 | ENTRY_TYPE_META = 'otpbank-entry-type' | ||
31 | OTPBANK_CSV_TAG = 'otpbank-csv' | ||
32 | PAYPASS_TAG = 'paypass' | ||
33 | SIMPLE_TAG = 'processor-simple' | ||
34 | CARD_REGEX = re.compile( | ||
35 | r'^(?P<date>\d{4}\.\d{2}\.\d{2})\s+(?P<card_number>\d+)') | ||
36 | PAYPASS_REGEX = re.compile(r'-ÉRINT(Ő|\?)|PPASS$', re.IGNORECASE) | ||
37 | SIMPLE_REGEX = re.compile(r'-SIMPLE$', re.IGNORECASE) | ||
38 | SMS_REGEX = re.compile( | ||
39 | r'^(?P<date>\d{4}\.\d{2}\.\d{2})\s+\((?P<count>\d+) DB SMS\)', | ||
40 | re.IGNORECASE) | ||
41 | |||
42 | |||
43 | def _parse_date(date_str: str) -> dt.date: | ||
44 | return utils.parse_date(date_str, '%Y%m%d') | ||
45 | |||
46 | |||
47 | def _parse_card_date(date_str: str) -> dt.date: | ||
48 | return utils.parse_date(date_str, '%Y.%m.%d') | ||
49 | |||
50 | |||
51 | def _parse_number(amount_str: str) -> Decimal: | ||
52 | cleaned_str = amount_str.replace('.', '').replace(',', '.') | ||
53 | return utils.parse_number(cleaned_str) | ||
54 | |||
55 | |||
56 | def _validate_cd(amount: Amount, cd: str) -> None: | ||
57 | if cd == 'T': | ||
58 | if amount.number >= ZERO: | ||
59 | raise InvalidEntry(f'Invalid debit amount: {amount}') | ||
60 | elif cd == 'J': | ||
61 | if amount.number <= ZERO: | ||
62 | raise InvalidEntry(f'Invalid credit amount: {amount}') | ||
63 | else: | ||
64 | raise InvalidEntry(f'Invalid credit/debit type: {cd}') | ||
65 | |||
66 | |||
67 | class Conversion(NamedTuple): | ||
68 | foreign_amount: Amount | ||
69 | foreign_rate: Amount | ||
70 | conversion_fee: Optional[Posting] | ||
71 | |||
72 | |||
73 | class Card(NamedTuple): | ||
74 | card_account: str | ||
75 | card_date: dt.date | ||
76 | |||
77 | |||
78 | class Row(utils.Row): | ||
79 | account_name: str | ||
80 | cd: str | ||
81 | native_amount: Amount | ||
82 | booking_date: dt.date | ||
83 | value_date: dt.date | ||
84 | _raw_payee: Optional[str] | ||
85 | _raw_comment: str | ||
86 | _conversion: Optional[Conversion] | ||
87 | _card: Optional[Card] | ||
88 | |||
89 | def __init__(self, file_name: str, index: int, row: List[str], accounts: Dict[str, str]): | ||
90 | account_number, cd, amount_str, currency, booking_date_str, \ | ||
91 | value_date_str, _, _, payee, comment1, comment2, comment3, entry_type, _, _ = row | ||
92 | comment = f'{comment1}{comment2}{comment3}'.strip() | ||
93 | super().__init__(file_name, index, entry_type, payee, comment) | ||
94 | if account_name := accounts.get(account_number, None): | ||
95 | self.account_name = account_name | ||
96 | else: | ||
97 | raise InvalidEntry(f'Unknown account number {account_number}') | ||
98 | self.cd = cd | ||
99 | self.native_amount = Amount(_parse_number(amount_str), currency) | ||
100 | _validate_cd(self.native_amount, self.cd) | ||
101 | self.booking_date = _parse_date(booking_date_str) | ||
102 | self.value_date = _parse_date(value_date_str) | ||
103 | self.tags.add(OTPBANK_CSV_TAG) | ||
104 | self._raw_payee = payee | ||
105 | self._raw_comment = comment | ||
106 | self._conversion = None | ||
107 | self._card = None | ||
108 | |||
109 | @property | ||
110 | def conversion(self) -> Optional[Conversion]: | ||
111 | return self._conversion | ||
112 | |||
113 | @conversion.setter | ||
114 | def conversion(self, conversion: Optional[Conversion]) -> None: | ||
115 | if self._conversion: | ||
116 | raise InvalidEntry( | ||
117 | f'Conversion {self._conversion} was already set for row ' + | ||
118 | f' when trying to set it to {conversion}') | ||
119 | self._conversion = conversion | ||
120 | |||
121 | @property | ||
122 | def card(self) -> Optional[Card]: | ||
123 | return self._card | ||
124 | |||
125 | @card.setter | ||
126 | def card(self, card: Optional[Card]) -> None: | ||
127 | if self._card: | ||
128 | raise InvalidEntry( | ||
129 | f'Card {self._card} was already set for row ' + | ||
130 | f' when trying to set it to {card}') | ||
131 | self._card = card | ||
132 | |||
133 | @property | ||
134 | def transacted_amount(self) -> Amount: | ||
135 | if self.conversion: | ||
136 | return self.conversion.foreign_amount | ||
137 | return self.native_amount | ||
138 | |||
139 | @property | ||
140 | def _extended_meta(self) -> data.Meta: | ||
141 | meta = dict(self.meta) | ||
142 | if self.entry_type: | ||
143 | meta[ENTRY_TYPE_META] = self.entry_type | ||
144 | if self.booking_date != self.value_date: | ||
145 | meta[BOOKING_DATE_META] = self.booking_date | ||
146 | if self._raw_payee: | ||
147 | meta[PAYEE_META] = self._raw_payee | ||
148 | if self._raw_comment: | ||
149 | meta[COMMENT_META] = self._raw_comment | ||
150 | return meta | ||
151 | |||
152 | @property | ||
153 | def _card_meta(self) -> Optional[data.Meta]: | ||
154 | if self._card: | ||
155 | card_meta: data.Meta = { | ||
156 | TRANSFER_ACCOUNT_META: self._card.card_account | ||
157 | } | ||
158 | if self._card.card_date != self.value_date: | ||
159 | card_meta[TRANSFER_DATE_META] = self._card.card_date | ||
160 | return card_meta | ||
161 | else: | ||
162 | return None | ||
163 | |||
164 | def _to_transaction(self) -> data.Transaction: | ||
165 | if not self.postings: | ||
166 | raise InvalidEntry('No postings were extracted from this entry') | ||
167 | if self.payee == '': | ||
168 | payee = None | ||
169 | else: | ||
170 | payee = self.payee | ||
171 | if self.comment == '' or self.comment == self.payee: | ||
172 | if payee: | ||
173 | desc = '' | ||
174 | else: | ||
175 | desc = self.entry_type or '' | ||
176 | else: | ||
177 | desc = self.comment | ||
178 | if self._conversion: | ||
179 | foreign_rate = self._conversion.foreign_rate | ||
180 | conversion_fee = self._conversion.conversion_fee | ||
181 | else: | ||
182 | foreign_rate = None | ||
183 | conversion_fee = None | ||
184 | meta = self._extended_meta | ||
185 | card_meta = self._card_meta | ||
186 | postings = [ | ||
187 | data.Posting(self.account_name, self.native_amount, None, None, None, None) | ||
188 | ] | ||
189 | if len(self.postings) == 1 and not foreign_rate: | ||
190 | account, amount = self.postings[0] | ||
191 | postings.append( | ||
192 | data.Posting(account, utils.MISSING_AMOUNT, None, None, None, card_meta)) | ||
193 | else: | ||
194 | for account, amount in self.postings: | ||
195 | postings.append( | ||
196 | data.Posting(account, -amount, None, foreign_rate, None, card_meta)) | ||
197 | if conversion_fee and conversion_fee.amount.number != ZERO: | ||
198 | account, amount = conversion_fee | ||
199 | postings.append( | ||
200 | data.Posting(account, amount, None, None, None, None)) | ||
201 | return data.Transaction( | ||
202 | meta, self.value_date, self.flag, payee, desc, self.tags, self.links, postings) | ||
203 | |||
204 | |||
205 | Extractor = Callable[[Row], None] | ||
206 | |||
207 | |||
208 | def extract_foreign_currencies( | ||
209 | currencies: Iterable[str], conversion_fees_account: str) -> Extractor: | ||
210 | currencies_joined = '|'.join(currencies) | ||
211 | currency_regex = re.compile( | ||
212 | r'(?P<amount>\d+(,\d+)?)(?P<currency>' + currencies_joined | ||
213 | + r')\s+(?P<rate>\d+(,\d+)?)$', re.IGNORECASE) | ||
214 | |||
215 | def do_extract(row: Row) -> None: | ||
216 | if currency_match := currency_regex.search(row.comment): | ||
217 | foreign_amount_d = _parse_number(currency_match.group('amount')) | ||
218 | if row.cd == 'T': | ||
219 | foreign_amount_d *= -1 | ||
220 | foreign_currency = currency_match.group('currency') | ||
221 | foreign_amount = Amount( | ||
222 | foreign_amount_d, foreign_currency.upper()) | ||
223 | foreign_rate_d = _parse_number(currency_match.group('rate')) | ||
224 | foreign_rate = Amount( | ||
225 | foreign_rate_d, row.native_amount.currency) | ||
226 | converted_amount = am.mul(foreign_rate, foreign_amount_d) | ||
227 | conversion_fee_amount = am.sub( | ||
228 | converted_amount, row.native_amount) | ||
229 | conversion_fee = Posting( | ||
230 | conversion_fees_account, conversion_fee_amount) | ||
231 | row.conversion = Conversion( | ||
232 | foreign_amount, foreign_rate, conversion_fee) | ||
233 | row.comment = row.comment[:currency_match.start()].strip() | ||
234 | return do_extract | ||
235 | |||
236 | |||
237 | def extract_cards(card_accounts: Dict[str, str]) -> Extractor: | ||
238 | def do_extract(row: Row) -> None: | ||
239 | if row.entry_type.lower() not in [ | ||
240 | 'vásárlás kártyával', | ||
241 | 'bankkártyával kapcs. díj', | ||
242 | ]: | ||
243 | return | ||
244 | if card_match := CARD_REGEX.search(row.comment): | ||
245 | card_number = card_match.group('card_number') | ||
246 | if card_number not in card_accounts: | ||
247 | raise InvalidEntry(f'No account for card {card_number}') | ||
248 | card_account = card_accounts[card_number] | ||
249 | card_date = _parse_card_date(card_match.group('date')) | ||
250 | row.card = Card(card_account, card_date) | ||
251 | row.comment = row.comment[card_match.end():].strip() | ||
252 | else: | ||
253 | raise InvalidEntry( | ||
254 | f'Cannot extract card information from: {row.comment}') | ||
255 | if paypass_match := PAYPASS_REGEX.search(row.comment): | ||
256 | row.comment = row.comment[:paypass_match.start()].strip() | ||
257 | row.tags.add(PAYPASS_TAG) | ||
258 | elif simple_match := PAYPASS_REGEX.search(row.comment): | ||
259 | row.comment = row.comment[:simple_match.start()].strip() | ||
260 | row.tags.add(SIMPLE_TAG) | ||
261 | return do_extract | ||
262 | |||
263 | |||
264 | def extract_sms(sms: str, sms_liability: str, sms_expense: str) -> Extractor: | ||
265 | def do_extract(row: Row) -> None: | ||
266 | if row.entry_type != 'OTPdirekt ÜZENETDÍJ': | ||
267 | return | ||
268 | if sms_match := SMS_REGEX.search(row.comment): | ||
269 | card_account = sms_liability | ||
270 | card_date = _parse_card_date(sms_match.group('date')) | ||
271 | row.card = Card(card_account, card_date) | ||
272 | sms_amount_d = _parse_number(sms_match.group('count')) | ||
273 | foreign_amount = -Amount(sms_amount_d, sms) | ||
274 | foreign_rate = am.div(-row.native_amount, sms_amount_d) | ||
275 | row.conversion = Conversion( | ||
276 | foreign_amount, foreign_rate, None) | ||
277 | row.comment = 'SMS díj' | ||
278 | row.payee = OTP_BANK | ||
279 | row.assign_to_account(sms_expense) | ||
280 | else: | ||
281 | raise InvalidEntry( | ||
282 | f'Cannot parse SMS transaction: {row.comment}') | ||
283 | return do_extract | ||
284 | |||
285 | |||
286 | class Importer(ImporterProtocol): | ||
287 | ''' | ||
288 | Importer for OTP Bank CSV transaction history. | ||
289 | ''' | ||
290 | |||
291 | _log: logging.Logger | ||
292 | _accounts: Dict[str, str] | ||
293 | _extracts: List[Extractor] | ||
294 | |||
295 | def __init__(self, | ||
296 | accounts: Dict[str, str], | ||
297 | extractors: List[Extractor]): | ||
298 | self._log = logging.getLogger(type(self).__qualname__) | ||
299 | self._accounts = {number.replace('-', ''): name for number, name in accounts.items()} | ||
300 | self._extractors = extractors | ||
301 | |||
302 | def identify(self, file: FileMemo) -> bool: | ||
303 | _, extension = path.splitext(file.name) | ||
304 | if extension.lower() != '.csv': | ||
305 | return False | ||
306 | return self._find_account(file) is not None | ||
307 | |||
308 | def _find_account(self, file: FileMemo) -> Optional[str]: | ||
309 | head = file.head().strip().split('\n')[0] | ||
310 | if head.count(';') != 14: | ||
311 | return None | ||
312 | for account_number, account_name in self._accounts.items(): | ||
313 | if head.startswith(f'"{account_number}"'): | ||
314 | return account_name | ||
315 | return None | ||
316 | |||
317 | def file_name(self, file: FileMemo) -> str: | ||
318 | return 'otpbank.csv' | ||
319 | |||
320 | def file_account(self, file: FileMemo) -> str: | ||
321 | account_name = self._find_account(file) | ||
322 | if not account_name: | ||
323 | raise RuntimeError(f'Invalid account number in {file.name}') | ||
324 | return account_name | ||
325 | |||
326 | def file_date(self, file: FileMemo) -> Optional[dt.date]: | ||
327 | ''' | ||
328 | Files account statements according to the booking date of the last | ||
329 | transaction. | ||
330 | ''' | ||
331 | date = None | ||
332 | with open(file.name, 'r') as csv_file: | ||
333 | for row in csv.reader(csv_file, delimiter=';'): | ||
334 | date_str = row[4] | ||
335 | try: | ||
336 | date = _parse_date(date_str) | ||
337 | except InvalidEntry as exc: | ||
338 | self._log.error( | ||
339 | 'Invalid entry in %s when looking for filing date', | ||
340 | file.name, exc_info=exc) | ||
341 | return None | ||
342 | return date | ||
343 | |||
344 | def extract(self, file: FileMemo) -> data.Entries: | ||
345 | file_name = file.name | ||
346 | entries: data.Entries = [] | ||
347 | with open(file_name, 'r') as csv_file: | ||
348 | last_date: Optional[dt.date] = None | ||
349 | last_date_str = "" | ||
350 | count_within_date = 1 | ||
351 | for index, row_str in enumerate(csv.reader(csv_file, delimiter=';')): | ||
352 | try: | ||
353 | row = Row(file_name, index, row_str, self._accounts) | ||
354 | if last_date != row.booking_date: | ||
355 | last_date = row.booking_date | ||
356 | last_date_str = last_date.strftime('%Y-%m-%d') | ||
357 | count_within_date = 1 | ||
358 | else: | ||
359 | count_within_date += 1 | ||
360 | row.links.add(f'otpbank_{last_date_str}_{count_within_date:03}') | ||
361 | self._run_row_extractors(row) | ||
362 | entries.append(row._to_transaction()) | ||
363 | except InvalidEntry as exc: | ||
364 | self._log.warning( | ||
365 | 'Skipping invalid entry %d of %s', | ||
366 | index, file_name, exc_info=exc) | ||
367 | return entries | ||
368 | |||
369 | def _run_row_extractors(self, row: Row): | ||
370 | utils.run_row_extractors(row, self._extractors) | ||
diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py new file mode 100644 index 0000000..c2de559 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py | |||
@@ -0,0 +1,187 @@ | |||
1 | ''' | ||
2 | Importer for OTP Bank PDF account statements. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | from decimal import Decimal | ||
8 | import datetime as dt | ||
9 | import logging | ||
10 | import re | ||
11 | from typing import cast, Dict, Iterable, List, NamedTuple, Optional | ||
12 | |||
13 | from beancount.core import data | ||
14 | from beancount.core.amount import Amount | ||
15 | from beancount.core.number import D | ||
16 | from beancount.ingest.cache import _FileMemo as FileMemo | ||
17 | from beancount.ingest.importer import ImporterProtocol | ||
18 | |||
19 | from pdfminer.high_level import extract_pages | ||
20 | from pdfminer.layout import LTPage, LTTextContainer | ||
21 | |||
22 | |||
23 | STATEMENT_NAME_REGEX = re.compile( | ||
24 | r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap rsz mla)_)kivonat_(?P<account>\d[\d-]*\d)_' + | ||
25 | r'(?P<date>\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf') | ||
26 | CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( | ||
27 | r'.*Banksz[a ]mlakivonat_(?P<account>\d[\d-]*\d)_.+') | ||
28 | INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( | ||
29 | r'.*(Ertekpapirszamla|rt `kpap rsz mla)_kivonat_(?P<account>\d[\d-]*\d)_.+') | ||
30 | ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P<account>\d[\d-]*\d)') | ||
31 | CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P<currency>[A-Z]+)$') | ||
32 | |||
33 | |||
34 | class Total(NamedTuple): | ||
35 | date: dt.date | ||
36 | units: Decimal | ||
37 | |||
38 | |||
39 | def _append_total(entries: data.Entries, | ||
40 | meta: data.Meta, | ||
41 | account: str, | ||
42 | currency: str, | ||
43 | total: Optional[Total], | ||
44 | delta: dt.timedelta = dt.timedelta(days=0)) -> None: | ||
45 | if not total: | ||
46 | return | ||
47 | date, units = total | ||
48 | amount = Amount(units, currency) | ||
49 | balance = data.Balance(meta, date + delta, account, amount, None, None) | ||
50 | entries.append(balance) | ||
51 | |||
52 | |||
53 | def _find_label_y(page: LTPage, label: str) -> Optional[int]: | ||
54 | for element in page: | ||
55 | if isinstance(element, LTTextContainer) and element.get_text().strip() == label: | ||
56 | return element.bbox[1] | ||
57 | return None | ||
58 | |||
59 | |||
60 | def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]: | ||
61 | for element in page: | ||
62 | if isinstance(element, LTTextContainer): | ||
63 | text = element.get_text().strip() | ||
64 | if match := pattern.search(text): | ||
65 | return match | ||
66 | return None | ||
67 | |||
68 | |||
69 | class Importer(ImporterProtocol): | ||
70 | ''' | ||
71 | Importer for OTP Bank PDF account statements. | ||
72 | ''' | ||
73 | |||
74 | _log: logging.Logger | ||
75 | _accounts: Dict[str, str] | ||
76 | _extract_opening: bool | ||
77 | |||
78 | def __init__(self, accounts: Dict[str, str], extract_opening: bool = False): | ||
79 | self._log = logging.getLogger(type(self).__qualname__) | ||
80 | self._accounts = accounts | ||
81 | self._extract_opening = extract_opening | ||
82 | |||
83 | def identify(self, file: FileMemo) -> bool: | ||
84 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
85 | return match.group('account') in self._accounts | ||
86 | else: | ||
87 | return False | ||
88 | |||
89 | def file_name(self, file: FileMemo) -> str: | ||
90 | if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
91 | account_number = match.group('account') | ||
92 | return f'Bankszámlakivonat_{account_number}.pdf' | ||
93 | elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
94 | account_number = match.group('account') | ||
95 | return f'Értékpapírszámla_kivonat_{account_number}.pdf' | ||
96 | else: | ||
97 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
98 | |||
99 | def file_account(self, file: FileMemo) -> str: | ||
100 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
101 | account_number = match.group('account') | ||
102 | return self._accounts[account_number] | ||
103 | else: | ||
104 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
105 | |||
106 | def file_date(self, file: FileMemo) -> dt.date: | ||
107 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
108 | date_str = match.group('date') | ||
109 | return dt.datetime.strptime(date_str, '%Y.%m.%d').date() | ||
110 | else: | ||
111 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
112 | |||
113 | def extract(self, file: FileMemo) -> data.Entries: | ||
114 | if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
115 | return [] | ||
116 | pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))] | ||
117 | if not pages: | ||
118 | return [] | ||
119 | entries: data.Entries = [] | ||
120 | meta = data.new_metadata(file.name, 1) | ||
121 | if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX): | ||
122 | account_name = self._accounts[account_match.group('account')] | ||
123 | else: | ||
124 | self._log.warning('No account number in %s', file.name) | ||
125 | account_name = self.file_account(file) | ||
126 | if currency_match := _find_match(pages[0], CURRENCY_REGEX): | ||
127 | currency = currency_match.group('currency') | ||
128 | else: | ||
129 | self._log.warning('No currency number in %s', file.name) | ||
130 | currency = 'HUF' | ||
131 | if self._extract_opening: | ||
132 | opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG') | ||
133 | _append_total(entries, meta, account_name, currency, opening_balance) | ||
134 | closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG') | ||
135 | _append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1)) | ||
136 | return entries | ||
137 | |||
138 | def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]: | ||
139 | for page in pages: | ||
140 | if total := self._extract_total_from_page(page, label): | ||
141 | return total | ||
142 | self._log.error('%s was not found in the pdf file', label) | ||
143 | return None | ||
144 | |||
145 | def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]: | ||
146 | if total_y := _find_label_y(page, label): | ||
147 | return self._extract_total_by_y(page, total_y) | ||
148 | return None | ||
149 | |||
150 | def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]: | ||
151 | date: Optional[dt.date] = None | ||
152 | units: Optional[Decimal] = None | ||
153 | for element in page: | ||
154 | if isinstance(element, LTTextContainer): | ||
155 | x, y, x2, _ = element.bbox | ||
156 | if abs(y - total_y) > 0.5: | ||
157 | continue | ||
158 | elif abs(x - 34) <= 0.5: | ||
159 | date_str = element.get_text().strip() | ||
160 | if date is not None: | ||
161 | self._log.warning( | ||
162 | 'Found date %s, but date was already set to %s', | ||
163 | date_str, | ||
164 | date) | ||
165 | continue | ||
166 | try: | ||
167 | date = dt.datetime.strptime(date_str, '%y.%m.%d').date() | ||
168 | except ValueError as exc: | ||
169 | self._log.warning(f'Invalid date {date_str}', exc_info=exc) | ||
170 | elif abs(x2 - 572.68) <= 0.5: | ||
171 | units_str = element.get_text().strip().replace('.', '').replace(',', '.') | ||
172 | if units is not None: | ||
173 | self._log.warning( | ||
174 | 'Found units %s, but units were already set to %s', | ||
175 | units_str, | ||
176 | units) | ||
177 | try: | ||
178 | units = D(units_str) | ||
179 | except ValueError as exc: | ||
180 | self._log.error('Invalid units %s', units_str, exc_info=exc) | ||
181 | if not date: | ||
182 | self._log.error('Date was not found at y=%d', total_y) | ||
183 | return None | ||
184 | if not units: | ||
185 | self._log.error('Units were not found at y=%d', total_y) | ||
186 | return None | ||
187 | return Total(date, units) | ||