diff options
author | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
commit | a1c2a999e449054d6641bbb633954e45fcd63f90 (patch) | |
tree | 47628c10ded721d66e47b5f87f501293cd8af003 /beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py | |
parent | Initialize package (diff) | |
download | beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.gz beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.zst beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.zip |
Add plugins and importers from private config
The importers are missing tests, because not having any specifications
for the import formats means we must use real, private data as test inputs
Diffstat (limited to 'beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py')
-rw-r--r-- | beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py new file mode 100644 index 0000000..c2de559 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/otpbank_pdf.py | |||
@@ -0,0 +1,187 @@ | |||
1 | ''' | ||
2 | Importer for OTP Bank PDF account statements. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | from decimal import Decimal | ||
8 | import datetime as dt | ||
9 | import logging | ||
10 | import re | ||
11 | from typing import cast, Dict, Iterable, List, NamedTuple, Optional | ||
12 | |||
13 | from beancount.core import data | ||
14 | from beancount.core.amount import Amount | ||
15 | from beancount.core.number import D | ||
16 | from beancount.ingest.cache import _FileMemo as FileMemo | ||
17 | from beancount.ingest.importer import ImporterProtocol | ||
18 | |||
19 | from pdfminer.high_level import extract_pages | ||
20 | from pdfminer.layout import LTPage, LTTextContainer | ||
21 | |||
22 | |||
23 | STATEMENT_NAME_REGEX = re.compile( | ||
24 | r'.*(Banksz[a ]mla|(Ertekpapirszamla|rt `kpap rsz mla)_)kivonat_(?P<account>\d[\d-]*\d)_' + | ||
25 | r'(?P<date>\d{4}\.\d{2}\.\d{2})(_\d+)?\.pdf') | ||
26 | CHECKING_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( | ||
27 | r'.*Banksz[a ]mlakivonat_(?P<account>\d[\d-]*\d)_.+') | ||
28 | INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX = re.compile( | ||
29 | r'.*(Ertekpapirszamla|rt `kpap rsz mla)_kivonat_(?P<account>\d[\d-]*\d)_.+') | ||
30 | ACCOUNT_NUMBER_REGEX = re.compile(r'SZÁMLASZÁM: (?P<account>\d[\d-]*\d)') | ||
31 | CURRENCY_REGEX = re.compile(r'DEVIZANEM: (?P<currency>[A-Z]+)$') | ||
32 | |||
33 | |||
34 | class Total(NamedTuple): | ||
35 | date: dt.date | ||
36 | units: Decimal | ||
37 | |||
38 | |||
39 | def _append_total(entries: data.Entries, | ||
40 | meta: data.Meta, | ||
41 | account: str, | ||
42 | currency: str, | ||
43 | total: Optional[Total], | ||
44 | delta: dt.timedelta = dt.timedelta(days=0)) -> None: | ||
45 | if not total: | ||
46 | return | ||
47 | date, units = total | ||
48 | amount = Amount(units, currency) | ||
49 | balance = data.Balance(meta, date + delta, account, amount, None, None) | ||
50 | entries.append(balance) | ||
51 | |||
52 | |||
53 | def _find_label_y(page: LTPage, label: str) -> Optional[int]: | ||
54 | for element in page: | ||
55 | if isinstance(element, LTTextContainer) and element.get_text().strip() == label: | ||
56 | return element.bbox[1] | ||
57 | return None | ||
58 | |||
59 | |||
60 | def _find_match(page: LTPage, pattern: re.Pattern) -> Optional[re.Match]: | ||
61 | for element in page: | ||
62 | if isinstance(element, LTTextContainer): | ||
63 | text = element.get_text().strip() | ||
64 | if match := pattern.search(text): | ||
65 | return match | ||
66 | return None | ||
67 | |||
68 | |||
69 | class Importer(ImporterProtocol): | ||
70 | ''' | ||
71 | Importer for OTP Bank PDF account statements. | ||
72 | ''' | ||
73 | |||
74 | _log: logging.Logger | ||
75 | _accounts: Dict[str, str] | ||
76 | _extract_opening: bool | ||
77 | |||
78 | def __init__(self, accounts: Dict[str, str], extract_opening: bool = False): | ||
79 | self._log = logging.getLogger(type(self).__qualname__) | ||
80 | self._accounts = accounts | ||
81 | self._extract_opening = extract_opening | ||
82 | |||
83 | def identify(self, file: FileMemo) -> bool: | ||
84 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
85 | return match.group('account') in self._accounts | ||
86 | else: | ||
87 | return False | ||
88 | |||
89 | def file_name(self, file: FileMemo) -> str: | ||
90 | if match := CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
91 | account_number = match.group('account') | ||
92 | return f'Bankszámlakivonat_{account_number}.pdf' | ||
93 | elif match := INVESTMENT_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
94 | account_number = match.group('account') | ||
95 | return f'Értékpapírszámla_kivonat_{account_number}.pdf' | ||
96 | else: | ||
97 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
98 | |||
99 | def file_account(self, file: FileMemo) -> str: | ||
100 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
101 | account_number = match.group('account') | ||
102 | return self._accounts[account_number] | ||
103 | else: | ||
104 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
105 | |||
106 | def file_date(self, file: FileMemo) -> dt.date: | ||
107 | if match := STATEMENT_NAME_REGEX.match(file.name): | ||
108 | date_str = match.group('date') | ||
109 | return dt.datetime.strptime(date_str, '%Y.%m.%d').date() | ||
110 | else: | ||
111 | raise RuntimeError(f'Not an account statement: {file.name}') | ||
112 | |||
113 | def extract(self, file: FileMemo) -> data.Entries: | ||
114 | if not CHECKING_ACCOUNT_STATEMENT_NAME_REGEX.match(file.name): | ||
115 | return [] | ||
116 | pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))] | ||
117 | if not pages: | ||
118 | return [] | ||
119 | entries: data.Entries = [] | ||
120 | meta = data.new_metadata(file.name, 1) | ||
121 | if account_match := _find_match(pages[0], ACCOUNT_NUMBER_REGEX): | ||
122 | account_name = self._accounts[account_match.group('account')] | ||
123 | else: | ||
124 | self._log.warning('No account number in %s', file.name) | ||
125 | account_name = self.file_account(file) | ||
126 | if currency_match := _find_match(pages[0], CURRENCY_REGEX): | ||
127 | currency = currency_match.group('currency') | ||
128 | else: | ||
129 | self._log.warning('No currency number in %s', file.name) | ||
130 | currency = 'HUF' | ||
131 | if self._extract_opening: | ||
132 | opening_balance = self._extract_total_from_page(pages[0], 'NYITÓ EGYENLEG') | ||
133 | _append_total(entries, meta, account_name, currency, opening_balance) | ||
134 | closing_balance = self._extract_total(pages, 'ZÁRÓ EGYENLEG') | ||
135 | _append_total(entries, meta, account_name, currency, closing_balance, dt.timedelta(days=1)) | ||
136 | return entries | ||
137 | |||
138 | def _extract_total(self, pages: List[LTPage], label: str) -> Optional[Total]: | ||
139 | for page in pages: | ||
140 | if total := self._extract_total_from_page(page, label): | ||
141 | return total | ||
142 | self._log.error('%s was not found in the pdf file', label) | ||
143 | return None | ||
144 | |||
145 | def _extract_total_from_page(self, page: LTPage, label: str) -> Optional[Total]: | ||
146 | if total_y := _find_label_y(page, label): | ||
147 | return self._extract_total_by_y(page, total_y) | ||
148 | return None | ||
149 | |||
150 | def _extract_total_by_y(self, page: LTPage, total_y: int) -> Optional[Total]: | ||
151 | date: Optional[dt.date] = None | ||
152 | units: Optional[Decimal] = None | ||
153 | for element in page: | ||
154 | if isinstance(element, LTTextContainer): | ||
155 | x, y, x2, _ = element.bbox | ||
156 | if abs(y - total_y) > 0.5: | ||
157 | continue | ||
158 | elif abs(x - 34) <= 0.5: | ||
159 | date_str = element.get_text().strip() | ||
160 | if date is not None: | ||
161 | self._log.warning( | ||
162 | 'Found date %s, but date was already set to %s', | ||
163 | date_str, | ||
164 | date) | ||
165 | continue | ||
166 | try: | ||
167 | date = dt.datetime.strptime(date_str, '%y.%m.%d').date() | ||
168 | except ValueError as exc: | ||
169 | self._log.warning(f'Invalid date {date_str}', exc_info=exc) | ||
170 | elif abs(x2 - 572.68) <= 0.5: | ||
171 | units_str = element.get_text().strip().replace('.', '').replace(',', '.') | ||
172 | if units is not None: | ||
173 | self._log.warning( | ||
174 | 'Found units %s, but units were already set to %s', | ||
175 | units_str, | ||
176 | units) | ||
177 | try: | ||
178 | units = D(units_str) | ||
179 | except ValueError as exc: | ||
180 | self._log.error('Invalid units %s', units_str, exc_info=exc) | ||
181 | if not date: | ||
182 | self._log.error('Date was not found at y=%d', total_y) | ||
183 | return None | ||
184 | if not units: | ||
185 | self._log.error('Units were not found at y=%d', total_y) | ||
186 | return None | ||
187 | return Total(date, units) | ||