diff options
author | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
commit | a1c2a999e449054d6641bbb633954e45fcd63f90 (patch) | |
tree | 47628c10ded721d66e47b5f87f501293cd8af003 /beancount_extras_kris7t/importers/hetzner_pdf.py | |
parent | Initialize package (diff) | |
download | beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.gz beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.zst beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.zip |
Add plugins and importers from private config
The importers are missing tests, because not having any specifications
for the import formats means we must use real, private data as test inputs
Diffstat (limited to 'beancount_extras_kris7t/importers/hetzner_pdf.py')
-rw-r--r-- | beancount_extras_kris7t/importers/hetzner_pdf.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/hetzner_pdf.py b/beancount_extras_kris7t/importers/hetzner_pdf.py new file mode 100644 index 0000000..7eb89b6 --- /dev/null +++ b/beancount_extras_kris7t/importers/hetzner_pdf.py | |||
@@ -0,0 +1,136 @@ | |||
1 | ''' | ||
2 | Importer for Hetzner PDF invoices. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | import datetime as dt | ||
8 | import logging | ||
9 | import re | ||
10 | from typing import cast, Iterable, Optional | ||
11 | |||
12 | from beancount.core import amount as am, data | ||
13 | from beancount.core.amount import Amount | ||
14 | from beancount.core.flags import FLAG_OKAY, FLAG_WARNING | ||
15 | from beancount.core.number import D | ||
16 | from beancount.ingest.cache import _FileMemo as FileMemo | ||
17 | from beancount.ingest.importer import ImporterProtocol | ||
18 | |||
19 | from pdfminer.high_level import extract_pages | ||
20 | from pdfminer.layout import LTPage, LTTextContainer | ||
21 | |||
22 | from beancount_extras_kris7t.importers.utils import MISSING_AMOUNT | ||
23 | |||
24 | INVOICE_REGEX = re.compile( | ||
25 | r'.*Hetzner_(?P<date>\d{4}-\d{2}-\d{2})_(?P<number>R\d+)\.pdf$', re.IGNORECASE) | ||
26 | AMOUNT_REGEX = re.compile(r'Amount due: € (?P<amount>\d+(\.\d+)?)', re.IGNORECASE) | ||
27 | BALANCE_REGEX = re.compile( | ||
28 | 'The amount has been charged to the credit balance on your client credit account.', | ||
29 | re.IGNORECASE) | ||
30 | CARD_REGEX = re.compile( | ||
31 | 'The invoice amount will soon be debited from your credit card.', | ||
32 | re.IGNORECASE) | ||
33 | MIXED_REGEX = re.compile( | ||
34 | r'The amount of € (?P<balance>\d+(\.\d+)?) has been charged to the credit balance ' + | ||
35 | r'on your client credit account. The remaining amount of € (?P<card>\d(\.\d+)?) ' + | ||
36 | 'will be debited by credit card in the next few days.', | ||
37 | re.IGNORECASE) | ||
38 | |||
39 | |||
40 | def _extract_match(pages: Iterable[LTPage], pattern: re.Pattern) -> Optional[re.Match]: | ||
41 | for page in pages: | ||
42 | for element in page: | ||
43 | if isinstance(element, LTTextContainer): | ||
44 | text = element.get_text().strip().replace('\n', ' ') | ||
45 | if match := pattern.match(text): | ||
46 | return match | ||
47 | return None | ||
48 | |||
49 | |||
50 | class Importer(ImporterProtocol): | ||
51 | ''' | ||
52 | Importer for Hetzner PDF invoices. | ||
53 | ''' | ||
54 | |||
55 | _log: logging.Logger | ||
56 | _liability: str | ||
57 | _credit_balance: str | ||
58 | _expense: str | ||
59 | |||
60 | def __init__(self, liability: str, credit_balance: str, expense: str): | ||
61 | self._log = logging.getLogger(type(self).__qualname__) | ||
62 | self._liability = liability | ||
63 | self._credit_balance = credit_balance | ||
64 | self._expense = expense | ||
65 | |||
66 | def identify(self, file: FileMemo) -> bool: | ||
67 | return INVOICE_REGEX.match(file.name) is not None | ||
68 | |||
69 | def file_name(self, file: FileMemo) -> str: | ||
70 | if match := INVOICE_REGEX.match(file.name): | ||
71 | number = match.group('number') | ||
72 | return f'Hetzner_{number}.pdf' | ||
73 | else: | ||
74 | raise RuntimeError(f'Not an invoice: {file.name}') | ||
75 | |||
76 | def file_account(self, file: FileMemo) -> str: | ||
77 | if INVOICE_REGEX.match(file.name) is None: | ||
78 | raise RuntimeError(f'Not an invoice: {file.name}') | ||
79 | else: | ||
80 | return self._liability | ||
81 | |||
82 | def file_date(self, file: FileMemo) -> dt.date: | ||
83 | if match := INVOICE_REGEX.match(file.name): | ||
84 | date_str = match.group('date') | ||
85 | return dt.datetime.strptime(date_str, '%Y-%m-%d').date() | ||
86 | else: | ||
87 | raise RuntimeError(f'Not an invoice: {file.name}') | ||
88 | |||
89 | def extract(self, file: FileMemo) -> data.Entries: | ||
90 | if match := INVOICE_REGEX.match(file.name): | ||
91 | invoice_number = match.group('number') | ||
92 | else: | ||
93 | self._log.warn('Not an invoice: %s', file.name) | ||
94 | return [] | ||
95 | date = self.file_date(file) | ||
96 | pages = [page for page in cast(Iterable[LTPage], extract_pages(file.name))] | ||
97 | if match := _extract_match(pages, AMOUNT_REGEX): | ||
98 | number = D(match.group('amount')) | ||
99 | assert number is not None | ||
100 | amount = Amount(number, 'EUR') | ||
101 | else: | ||
102 | self._log.warn('Not amount found in %s', file.name) | ||
103 | return [] | ||
104 | postings = [] | ||
105 | flag = FLAG_OKAY | ||
106 | if _extract_match(pages, BALANCE_REGEX) is not None: | ||
107 | postings.append( | ||
108 | data.Posting(self._credit_balance, -amount, None, None, None, None)) | ||
109 | elif _extract_match(pages, CARD_REGEX) is not None: | ||
110 | postings.append( | ||
111 | data.Posting(self._liability, -amount, None, None, None, None)) | ||
112 | elif match := _extract_match(pages, MIXED_REGEX): | ||
113 | balance_number = D(match.group('balance')) | ||
114 | assert balance_number is not None | ||
115 | balance_amount = Amount(balance_number, 'EUR') | ||
116 | postings.append( | ||
117 | data.Posting(self._credit_balance, -balance_amount, None, None, None, None)) | ||
118 | card_number = D(match.group('card')) | ||
119 | assert card_number is not None | ||
120 | card_amount = Amount(card_number, 'EUR') | ||
121 | postings.append( | ||
122 | data.Posting(self._liability, -card_amount, None, None, None, None)) | ||
123 | if am.add(balance_amount, card_amount) != amount: | ||
124 | self._log.warn('Payments do not cover total amount in %s', file.name) | ||
125 | flag = FLAG_WARNING | ||
126 | else: | ||
127 | self._log.warn('Unknown payment method in %s', file.name) | ||
128 | flag = FLAG_WARNING | ||
129 | if flag == FLAG_OKAY: | ||
130 | amount = MISSING_AMOUNT | ||
131 | postings.append( | ||
132 | data.Posting(self._expense, amount, None, None, None, None)) | ||
133 | return [ | ||
134 | data.Transaction(data.new_metadata(file.name, 0), date, flag, 'Hetzner', 'Invoice', | ||
135 | set(), {f'hetzner_{invoice_number}'}, postings) | ||
136 | ] | ||