diff options
Diffstat (limited to 'beancount_extras_kris7t/importers/utils.py')
-rw-r--r-- | beancount_extras_kris7t/importers/utils.py | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/utils.py b/beancount_extras_kris7t/importers/utils.py new file mode 100644 index 0000000..f0a8134 --- /dev/null +++ b/beancount_extras_kris7t/importers/utils.py | |||
@@ -0,0 +1,125 @@ | |||
1 | ''' | ||
2 | Utilities for custom importers. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | from abc import ABC, abstractmethod | ||
8 | import datetime as dt | ||
9 | from decimal import Decimal | ||
10 | from typing import cast, Callable, Iterable, List, NamedTuple, Optional, Set, TypeVar, Union | ||
11 | |||
12 | from beancount.core import amount as am, data | ||
13 | from beancount.core.amount import Amount | ||
14 | from beancount.core.flags import FLAG_OKAY, FLAG_WARNING | ||
15 | from beancount.core.number import D, ZERO | ||
16 | |||
17 | MISSING_AMOUNT = cast(Amount, None) | ||
18 | COMMENT_META = 'import-raw-comment' | ||
19 | PAYEE_META = 'import-raw-payee' | ||
20 | |||
21 | |||
22 | class InvalidEntry(Exception): | ||
23 | pass | ||
24 | |||
25 | |||
26 | class Posting(NamedTuple): | ||
27 | account: str | ||
28 | amount: Amount | ||
29 | |||
30 | |||
31 | class Row(ABC): | ||
32 | entry_type: Optional[str] | ||
33 | payee: Optional[str] | ||
34 | comment: str | ||
35 | meta: data.Meta | ||
36 | flag: str | ||
37 | tags: Set[str] | ||
38 | links: Set[str] | ||
39 | _postings: Optional[List[Posting]] | ||
40 | |||
41 | def __init__(self, | ||
42 | file_name: str, | ||
43 | line_number: int, | ||
44 | entry_type: Optional[str], | ||
45 | payee: Optional[str], | ||
46 | comment: str): | ||
47 | self.entry_type = entry_type | ||
48 | self.payee = payee | ||
49 | self.comment = comment | ||
50 | self.meta = data.new_metadata(file_name, line_number) | ||
51 | self.flag = FLAG_OKAY | ||
52 | self.tags = set() | ||
53 | self.links = set() | ||
54 | self._postings = None | ||
55 | |||
56 | @property | ||
57 | @abstractmethod | ||
58 | def transacted_amount(self) -> Amount: | ||
59 | pass | ||
60 | |||
61 | @property | ||
62 | def transacted_currency(self) -> str: | ||
63 | return self.transacted_amount.currency | ||
64 | |||
65 | @property | ||
66 | def postings(self) -> Optional[List[Posting]]: | ||
67 | return self._postings | ||
68 | |||
69 | def assign_to_accounts(self, *postings: Posting) -> None: | ||
70 | if self.done: | ||
71 | raise InvalidEntry('Transaction is alrady done processing') | ||
72 | self._postings = list(postings) | ||
73 | if not self._postings: | ||
74 | raise InvalidEntry('Not assigned to any accounts') | ||
75 | head, *rest = self._postings | ||
76 | sum = head.amount | ||
77 | for posting in rest: | ||
78 | sum = am.add(sum, posting.amount) | ||
79 | if sum != self.transacted_amount: | ||
80 | self.flag = FLAG_WARNING | ||
81 | |||
82 | def assign_to_account(self, account: str) -> None: | ||
83 | self.assign_to_accounts(Posting(account, self.transacted_amount)) | ||
84 | |||
85 | @property | ||
86 | def done(self) -> bool: | ||
87 | return self._postings is not None | ||
88 | |||
89 | |||
90 | Extractor = Callable[[Row], None] | ||
91 | TRow = TypeVar('TRow', bound=Row) | ||
92 | |||
93 | |||
94 | def run_row_extractors(row: TRow, extractors: Iterable[Callable[[TRow], None]]) -> None: | ||
95 | for extractor in extractors: | ||
96 | extractor(row) | ||
97 | if row.done: | ||
98 | return | ||
99 | |||
100 | |||
101 | def extract_unknown(expenses_account: str, income_account: str) -> Extractor: | ||
102 | def do_extract(row: Row) -> None: | ||
103 | if row.transacted_amount.number < ZERO: | ||
104 | row.assign_to_account(expenses_account) | ||
105 | else: | ||
106 | row.assign_to_account(income_account) | ||
107 | row.flag = FLAG_WARNING | ||
108 | return do_extract | ||
109 | |||
110 | |||
111 | def parse_date(date_str: str, format_string: str) -> dt.date: | ||
112 | try: | ||
113 | return dt.datetime.strptime(date_str, format_string).date() | ||
114 | except ValueError as exc: | ||
115 | raise InvalidEntry(f'Cannot parse date: {date_str}') from exc | ||
116 | |||
117 | |||
118 | def parse_number(in_amount: Union[str, int, float, Decimal]) -> Decimal: | ||
119 | try: | ||
120 | value = D(in_amount) | ||
121 | except ValueError as exc: | ||
122 | raise InvalidEntry(f'Cannot parse number: {in_amount}') from exc | ||
123 | if value is None: | ||
124 | raise InvalidEntry(f'Parse number returned None: {in_amount}') | ||
125 | return value | ||