diff options
Diffstat (limited to 'beancount_extras_kris7t/importers/otpbank/otpbank_csv.py')
-rw-r--r-- | beancount_extras_kris7t/importers/otpbank/otpbank_csv.py | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py b/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py new file mode 100644 index 0000000..9e12ec3 --- /dev/null +++ b/beancount_extras_kris7t/importers/otpbank/otpbank_csv.py | |||
@@ -0,0 +1,370 @@ | |||
1 | ''' | ||
2 | Importer for OTP Bank CSV transaction history. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | import csv | ||
8 | import datetime as dt | ||
9 | from decimal import Decimal | ||
10 | import logging | ||
11 | from typing import Callable, Dict, Iterable, List, NamedTuple, Optional | ||
12 | import re | ||
13 | from os import path | ||
14 | |||
15 | import beancount.core.amount as am | ||
16 | from beancount.core.amount import Amount | ||
17 | from beancount.core import data | ||
18 | from beancount.core.number import ZERO | ||
19 | from beancount.ingest.cache import _FileMemo as FileMemo | ||
20 | from beancount.ingest.importer import ImporterProtocol | ||
21 | |||
22 | from beancount_extras_kris7t.importers import utils | ||
23 | from beancount_extras_kris7t.importers.utils import COMMENT_META, InvalidEntry, PAYEE_META, \ | ||
24 | Posting | ||
25 | from beancount_extras_kris7t.plugins.transfer_accounts import TRANSFER_ACCOUNT_META, \ | ||
26 | TRANSFER_DATE_META | ||
27 | |||
28 | OTP_BANK = 'OTP Bank' | ||
29 | BOOKING_DATE_META = 'booking-date' | ||
30 | ENTRY_TYPE_META = 'otpbank-entry-type' | ||
31 | OTPBANK_CSV_TAG = 'otpbank-csv' | ||
32 | PAYPASS_TAG = 'paypass' | ||
33 | SIMPLE_TAG = 'processor-simple' | ||
34 | CARD_REGEX = re.compile( | ||
35 | r'^(?P<date>\d{4}\.\d{2}\.\d{2})\s+(?P<card_number>\d+)') | ||
36 | PAYPASS_REGEX = re.compile(r'-ÉRINT(Ő|\?)|PPASS$', re.IGNORECASE) | ||
37 | SIMPLE_REGEX = re.compile(r'-SIMPLE$', re.IGNORECASE) | ||
38 | SMS_REGEX = re.compile( | ||
39 | r'^(?P<date>\d{4}\.\d{2}\.\d{2})\s+\((?P<count>\d+) DB SMS\)', | ||
40 | re.IGNORECASE) | ||
41 | |||
42 | |||
43 | def _parse_date(date_str: str) -> dt.date: | ||
44 | return utils.parse_date(date_str, '%Y%m%d') | ||
45 | |||
46 | |||
47 | def _parse_card_date(date_str: str) -> dt.date: | ||
48 | return utils.parse_date(date_str, '%Y.%m.%d') | ||
49 | |||
50 | |||
51 | def _parse_number(amount_str: str) -> Decimal: | ||
52 | cleaned_str = amount_str.replace('.', '').replace(',', '.') | ||
53 | return utils.parse_number(cleaned_str) | ||
54 | |||
55 | |||
56 | def _validate_cd(amount: Amount, cd: str) -> None: | ||
57 | if cd == 'T': | ||
58 | if amount.number >= ZERO: | ||
59 | raise InvalidEntry(f'Invalid debit amount: {amount}') | ||
60 | elif cd == 'J': | ||
61 | if amount.number <= ZERO: | ||
62 | raise InvalidEntry(f'Invalid credit amount: {amount}') | ||
63 | else: | ||
64 | raise InvalidEntry(f'Invalid credit/debit type: {cd}') | ||
65 | |||
66 | |||
67 | class Conversion(NamedTuple): | ||
68 | foreign_amount: Amount | ||
69 | foreign_rate: Amount | ||
70 | conversion_fee: Optional[Posting] | ||
71 | |||
72 | |||
73 | class Card(NamedTuple): | ||
74 | card_account: str | ||
75 | card_date: dt.date | ||
76 | |||
77 | |||
78 | class Row(utils.Row): | ||
79 | account_name: str | ||
80 | cd: str | ||
81 | native_amount: Amount | ||
82 | booking_date: dt.date | ||
83 | value_date: dt.date | ||
84 | _raw_payee: Optional[str] | ||
85 | _raw_comment: str | ||
86 | _conversion: Optional[Conversion] | ||
87 | _card: Optional[Card] | ||
88 | |||
89 | def __init__(self, file_name: str, index: int, row: List[str], accounts: Dict[str, str]): | ||
90 | account_number, cd, amount_str, currency, booking_date_str, \ | ||
91 | value_date_str, _, _, payee, comment1, comment2, comment3, entry_type, _, _ = row | ||
92 | comment = f'{comment1}{comment2}{comment3}'.strip() | ||
93 | super().__init__(file_name, index, entry_type, payee, comment) | ||
94 | if account_name := accounts.get(account_number, None): | ||
95 | self.account_name = account_name | ||
96 | else: | ||
97 | raise InvalidEntry(f'Unknown account number {account_number}') | ||
98 | self.cd = cd | ||
99 | self.native_amount = Amount(_parse_number(amount_str), currency) | ||
100 | _validate_cd(self.native_amount, self.cd) | ||
101 | self.booking_date = _parse_date(booking_date_str) | ||
102 | self.value_date = _parse_date(value_date_str) | ||
103 | self.tags.add(OTPBANK_CSV_TAG) | ||
104 | self._raw_payee = payee | ||
105 | self._raw_comment = comment | ||
106 | self._conversion = None | ||
107 | self._card = None | ||
108 | |||
109 | @property | ||
110 | def conversion(self) -> Optional[Conversion]: | ||
111 | return self._conversion | ||
112 | |||
113 | @conversion.setter | ||
114 | def conversion(self, conversion: Optional[Conversion]) -> None: | ||
115 | if self._conversion: | ||
116 | raise InvalidEntry( | ||
117 | f'Conversion {self._conversion} was already set for row ' + | ||
118 | f' when trying to set it to {conversion}') | ||
119 | self._conversion = conversion | ||
120 | |||
121 | @property | ||
122 | def card(self) -> Optional[Card]: | ||
123 | return self._card | ||
124 | |||
125 | @card.setter | ||
126 | def card(self, card: Optional[Card]) -> None: | ||
127 | if self._card: | ||
128 | raise InvalidEntry( | ||
129 | f'Card {self._card} was already set for row ' + | ||
130 | f' when trying to set it to {card}') | ||
131 | self._card = card | ||
132 | |||
133 | @property | ||
134 | def transacted_amount(self) -> Amount: | ||
135 | if self.conversion: | ||
136 | return self.conversion.foreign_amount | ||
137 | return self.native_amount | ||
138 | |||
139 | @property | ||
140 | def _extended_meta(self) -> data.Meta: | ||
141 | meta = dict(self.meta) | ||
142 | if self.entry_type: | ||
143 | meta[ENTRY_TYPE_META] = self.entry_type | ||
144 | if self.booking_date != self.value_date: | ||
145 | meta[BOOKING_DATE_META] = self.booking_date | ||
146 | if self._raw_payee: | ||
147 | meta[PAYEE_META] = self._raw_payee | ||
148 | if self._raw_comment: | ||
149 | meta[COMMENT_META] = self._raw_comment | ||
150 | return meta | ||
151 | |||
152 | @property | ||
153 | def _card_meta(self) -> Optional[data.Meta]: | ||
154 | if self._card: | ||
155 | card_meta: data.Meta = { | ||
156 | TRANSFER_ACCOUNT_META: self._card.card_account | ||
157 | } | ||
158 | if self._card.card_date != self.value_date: | ||
159 | card_meta[TRANSFER_DATE_META] = self._card.card_date | ||
160 | return card_meta | ||
161 | else: | ||
162 | return None | ||
163 | |||
164 | def _to_transaction(self) -> data.Transaction: | ||
165 | if not self.postings: | ||
166 | raise InvalidEntry('No postings were extracted from this entry') | ||
167 | if self.payee == '': | ||
168 | payee = None | ||
169 | else: | ||
170 | payee = self.payee | ||
171 | if self.comment == '' or self.comment == self.payee: | ||
172 | if payee: | ||
173 | desc = '' | ||
174 | else: | ||
175 | desc = self.entry_type or '' | ||
176 | else: | ||
177 | desc = self.comment | ||
178 | if self._conversion: | ||
179 | foreign_rate = self._conversion.foreign_rate | ||
180 | conversion_fee = self._conversion.conversion_fee | ||
181 | else: | ||
182 | foreign_rate = None | ||
183 | conversion_fee = None | ||
184 | meta = self._extended_meta | ||
185 | card_meta = self._card_meta | ||
186 | postings = [ | ||
187 | data.Posting(self.account_name, self.native_amount, None, None, None, None) | ||
188 | ] | ||
189 | if len(self.postings) == 1 and not foreign_rate: | ||
190 | account, amount = self.postings[0] | ||
191 | postings.append( | ||
192 | data.Posting(account, utils.MISSING_AMOUNT, None, None, None, card_meta)) | ||
193 | else: | ||
194 | for account, amount in self.postings: | ||
195 | postings.append( | ||
196 | data.Posting(account, -amount, None, foreign_rate, None, card_meta)) | ||
197 | if conversion_fee and conversion_fee.amount.number != ZERO: | ||
198 | account, amount = conversion_fee | ||
199 | postings.append( | ||
200 | data.Posting(account, amount, None, None, None, None)) | ||
201 | return data.Transaction( | ||
202 | meta, self.value_date, self.flag, payee, desc, self.tags, self.links, postings) | ||
203 | |||
204 | |||
205 | Extractor = Callable[[Row], None] | ||
206 | |||
207 | |||
208 | def extract_foreign_currencies( | ||
209 | currencies: Iterable[str], conversion_fees_account: str) -> Extractor: | ||
210 | currencies_joined = '|'.join(currencies) | ||
211 | currency_regex = re.compile( | ||
212 | r'(?P<amount>\d+(,\d+)?)(?P<currency>' + currencies_joined | ||
213 | + r')\s+(?P<rate>\d+(,\d+)?)$', re.IGNORECASE) | ||
214 | |||
215 | def do_extract(row: Row) -> None: | ||
216 | if currency_match := currency_regex.search(row.comment): | ||
217 | foreign_amount_d = _parse_number(currency_match.group('amount')) | ||
218 | if row.cd == 'T': | ||
219 | foreign_amount_d *= -1 | ||
220 | foreign_currency = currency_match.group('currency') | ||
221 | foreign_amount = Amount( | ||
222 | foreign_amount_d, foreign_currency.upper()) | ||
223 | foreign_rate_d = _parse_number(currency_match.group('rate')) | ||
224 | foreign_rate = Amount( | ||
225 | foreign_rate_d, row.native_amount.currency) | ||
226 | converted_amount = am.mul(foreign_rate, foreign_amount_d) | ||
227 | conversion_fee_amount = am.sub( | ||
228 | converted_amount, row.native_amount) | ||
229 | conversion_fee = Posting( | ||
230 | conversion_fees_account, conversion_fee_amount) | ||
231 | row.conversion = Conversion( | ||
232 | foreign_amount, foreign_rate, conversion_fee) | ||
233 | row.comment = row.comment[:currency_match.start()].strip() | ||
234 | return do_extract | ||
235 | |||
236 | |||
237 | def extract_cards(card_accounts: Dict[str, str]) -> Extractor: | ||
238 | def do_extract(row: Row) -> None: | ||
239 | if row.entry_type.lower() not in [ | ||
240 | 'vásárlás kártyával', | ||
241 | 'bankkártyával kapcs. díj', | ||
242 | ]: | ||
243 | return | ||
244 | if card_match := CARD_REGEX.search(row.comment): | ||
245 | card_number = card_match.group('card_number') | ||
246 | if card_number not in card_accounts: | ||
247 | raise InvalidEntry(f'No account for card {card_number}') | ||
248 | card_account = card_accounts[card_number] | ||
249 | card_date = _parse_card_date(card_match.group('date')) | ||
250 | row.card = Card(card_account, card_date) | ||
251 | row.comment = row.comment[card_match.end():].strip() | ||
252 | else: | ||
253 | raise InvalidEntry( | ||
254 | f'Cannot extract card information from: {row.comment}') | ||
255 | if paypass_match := PAYPASS_REGEX.search(row.comment): | ||
256 | row.comment = row.comment[:paypass_match.start()].strip() | ||
257 | row.tags.add(PAYPASS_TAG) | ||
258 | elif simple_match := PAYPASS_REGEX.search(row.comment): | ||
259 | row.comment = row.comment[:simple_match.start()].strip() | ||
260 | row.tags.add(SIMPLE_TAG) | ||
261 | return do_extract | ||
262 | |||
263 | |||
264 | def extract_sms(sms: str, sms_liability: str, sms_expense: str) -> Extractor: | ||
265 | def do_extract(row: Row) -> None: | ||
266 | if row.entry_type != 'OTPdirekt ÜZENETDÍJ': | ||
267 | return | ||
268 | if sms_match := SMS_REGEX.search(row.comment): | ||
269 | card_account = sms_liability | ||
270 | card_date = _parse_card_date(sms_match.group('date')) | ||
271 | row.card = Card(card_account, card_date) | ||
272 | sms_amount_d = _parse_number(sms_match.group('count')) | ||
273 | foreign_amount = -Amount(sms_amount_d, sms) | ||
274 | foreign_rate = am.div(-row.native_amount, sms_amount_d) | ||
275 | row.conversion = Conversion( | ||
276 | foreign_amount, foreign_rate, None) | ||
277 | row.comment = 'SMS díj' | ||
278 | row.payee = OTP_BANK | ||
279 | row.assign_to_account(sms_expense) | ||
280 | else: | ||
281 | raise InvalidEntry( | ||
282 | f'Cannot parse SMS transaction: {row.comment}') | ||
283 | return do_extract | ||
284 | |||
285 | |||
286 | class Importer(ImporterProtocol): | ||
287 | ''' | ||
288 | Importer for OTP Bank CSV transaction history. | ||
289 | ''' | ||
290 | |||
291 | _log: logging.Logger | ||
292 | _accounts: Dict[str, str] | ||
293 | _extracts: List[Extractor] | ||
294 | |||
295 | def __init__(self, | ||
296 | accounts: Dict[str, str], | ||
297 | extractors: List[Extractor]): | ||
298 | self._log = logging.getLogger(type(self).__qualname__) | ||
299 | self._accounts = {number.replace('-', ''): name for number, name in accounts.items()} | ||
300 | self._extractors = extractors | ||
301 | |||
302 | def identify(self, file: FileMemo) -> bool: | ||
303 | _, extension = path.splitext(file.name) | ||
304 | if extension.lower() != '.csv': | ||
305 | return False | ||
306 | return self._find_account(file) is not None | ||
307 | |||
308 | def _find_account(self, file: FileMemo) -> Optional[str]: | ||
309 | head = file.head().strip().split('\n')[0] | ||
310 | if head.count(';') != 14: | ||
311 | return None | ||
312 | for account_number, account_name in self._accounts.items(): | ||
313 | if head.startswith(f'"{account_number}"'): | ||
314 | return account_name | ||
315 | return None | ||
316 | |||
317 | def file_name(self, file: FileMemo) -> str: | ||
318 | return 'otpbank.csv' | ||
319 | |||
320 | def file_account(self, file: FileMemo) -> str: | ||
321 | account_name = self._find_account(file) | ||
322 | if not account_name: | ||
323 | raise RuntimeError(f'Invalid account number in {file.name}') | ||
324 | return account_name | ||
325 | |||
326 | def file_date(self, file: FileMemo) -> Optional[dt.date]: | ||
327 | ''' | ||
328 | Files account statements according to the booking date of the last | ||
329 | transaction. | ||
330 | ''' | ||
331 | date = None | ||
332 | with open(file.name, 'r') as csv_file: | ||
333 | for row in csv.reader(csv_file, delimiter=';'): | ||
334 | date_str = row[4] | ||
335 | try: | ||
336 | date = _parse_date(date_str) | ||
337 | except InvalidEntry as exc: | ||
338 | self._log.error( | ||
339 | 'Invalid entry in %s when looking for filing date', | ||
340 | file.name, exc_info=exc) | ||
341 | return None | ||
342 | return date | ||
343 | |||
344 | def extract(self, file: FileMemo) -> data.Entries: | ||
345 | file_name = file.name | ||
346 | entries: data.Entries = [] | ||
347 | with open(file_name, 'r') as csv_file: | ||
348 | last_date: Optional[dt.date] = None | ||
349 | last_date_str = "" | ||
350 | count_within_date = 1 | ||
351 | for index, row_str in enumerate(csv.reader(csv_file, delimiter=';')): | ||
352 | try: | ||
353 | row = Row(file_name, index, row_str, self._accounts) | ||
354 | if last_date != row.booking_date: | ||
355 | last_date = row.booking_date | ||
356 | last_date_str = last_date.strftime('%Y-%m-%d') | ||
357 | count_within_date = 1 | ||
358 | else: | ||
359 | count_within_date += 1 | ||
360 | row.links.add(f'otpbank_{last_date_str}_{count_within_date:03}') | ||
361 | self._run_row_extractors(row) | ||
362 | entries.append(row._to_transaction()) | ||
363 | except InvalidEntry as exc: | ||
364 | self._log.warning( | ||
365 | 'Skipping invalid entry %d of %s', | ||
366 | index, file_name, exc_info=exc) | ||
367 | return entries | ||
368 | |||
369 | def _run_row_extractors(self, row: Row): | ||
370 | utils.run_row_extractors(row, self._extractors) | ||