diff options
author | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
commit | a1c2a999e449054d6641bbb633954e45fcd63f90 (patch) | |
tree | 47628c10ded721d66e47b5f87f501293cd8af003 /beancount_extras_kris7t/importers/rules.py | |
parent | Initialize package (diff) | |
download | beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.gz beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.zst beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.zip |
Add plugins and importers from private config
The importers are missing tests, because not having any specifications
for the import formats means we must use real, private data as test inputs
Diffstat (limited to 'beancount_extras_kris7t/importers/rules.py')
-rw-r--r-- | beancount_extras_kris7t/importers/rules.py | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/rules.py b/beancount_extras_kris7t/importers/rules.py new file mode 100644 index 0000000..3890f24 --- /dev/null +++ b/beancount_extras_kris7t/importers/rules.py | |||
@@ -0,0 +1,134 @@ | |||
1 | #!/usr/bin/env python3 | ||
2 | |||
3 | from typing import cast, Dict, List, NamedTuple, Optional, Tuple, Union | ||
4 | import re | ||
5 | |||
6 | from beancount.core.amount import Amount | ||
7 | |||
8 | from beancount_extras_kris7t.importers.utils import Extractor, Row | ||
9 | |||
10 | WILDCARD = re.compile('.*') | ||
11 | |||
12 | |||
13 | class When(NamedTuple): | ||
14 | payee: re.Pattern | ||
15 | text: re.Pattern | ||
16 | amount: Optional[Amount] | ||
17 | |||
18 | |||
19 | def _compile_regex(s: str) -> re.Pattern: | ||
20 | return re.compile(s, re.IGNORECASE) | ||
21 | |||
22 | |||
23 | def when(payee: Optional[Union[re.Pattern, str]] = None, | ||
24 | text: Optional[Union[re.Pattern, str]] = None, | ||
25 | amount: Optional[Amount] = None) -> When: | ||
26 | if not payee and not text: | ||
27 | raise TypeError('at least one of payee and desc must be provided') | ||
28 | if isinstance(payee, str): | ||
29 | payee_regex = _compile_regex(payee) | ||
30 | else: | ||
31 | payee_regex = payee or WILDCARD | ||
32 | if isinstance(text, str): | ||
33 | text_regex = _compile_regex(text) | ||
34 | else: | ||
35 | text_regex = text or WILDCARD | ||
36 | return When(payee_regex, text_regex, amount) | ||
37 | |||
38 | |||
39 | Condition = Union[str, re.Pattern, When] | ||
40 | |||
41 | |||
42 | def _compile_condition(cond: Condition) -> When: | ||
43 | if isinstance(cond, When): | ||
44 | return cond | ||
45 | else: | ||
46 | return when(text=cond) | ||
47 | |||
48 | |||
49 | class let(NamedTuple): | ||
50 | payee: Optional[str] = None | ||
51 | desc: Optional[str] = None | ||
52 | account: Optional[str] = None | ||
53 | flag: Optional[str] = None | ||
54 | tag: Optional[str] = None | ||
55 | |||
56 | |||
57 | Action = Union[str, | ||
58 | Tuple[str, str], | ||
59 | Tuple[str, str, str], | ||
60 | Tuple[str, str, str, str], | ||
61 | let] | ||
62 | |||
63 | |||
64 | def _compile_action(action: Action) -> let: | ||
65 | if isinstance(action, str): | ||
66 | return let(account=action) | ||
67 | if isinstance(action, let): | ||
68 | return action | ||
69 | elif isinstance(action, tuple): | ||
70 | if len(action) == 2: | ||
71 | payee, account = cast(Tuple[str, str], action) | ||
72 | return let(payee=payee, account=account) | ||
73 | elif len(action) == 3: | ||
74 | payee, desc, account = cast(Tuple[str, str, str], action) | ||
75 | return let(payee, desc, account) | ||
76 | else: | ||
77 | flag, payee, desc, account = cast(Tuple[str, str, str, str], action) | ||
78 | return let(payee, desc, account, flag) | ||
79 | else: | ||
80 | raise ValueError(f'Unknown action: {action}') | ||
81 | |||
82 | |||
83 | Rules = Dict[Condition, Action] | ||
84 | CompiledRules = List[Tuple[When, let]] | ||
85 | |||
86 | |||
87 | def _compile_rules(rules: Rules) -> CompiledRules: | ||
88 | return [(_compile_condition(cond), _compile_action(action)) | ||
89 | for cond, action in rules.items()] | ||
90 | |||
91 | |||
92 | def _rule_condition_matches(cond: When, row: Row) -> bool: | ||
93 | if row.payee: | ||
94 | payee_valid = cond.payee.search(row.payee) is not None | ||
95 | else: | ||
96 | payee_valid = cond.payee == WILDCARD | ||
97 | if cond.text == WILDCARD: | ||
98 | text_valid = True | ||
99 | else: | ||
100 | characteristics: List[str] = [] | ||
101 | if row.entry_type: | ||
102 | characteristics.append(row.entry_type) | ||
103 | if row.payee: | ||
104 | characteristics.append(row.payee) | ||
105 | if row.comment: | ||
106 | characteristics.append(row.comment) | ||
107 | row_str = ' '.join(characteristics) | ||
108 | text_valid = cond.text.search(row_str) is not None | ||
109 | amount_valid = not cond.amount or row.transacted_amount == cond.amount | ||
110 | return payee_valid and text_valid and amount_valid | ||
111 | |||
112 | |||
113 | def extract_rules(input_rules: Rules) -> Extractor: | ||
114 | compiled_rules = _compile_rules(input_rules) | ||
115 | |||
116 | def do_extract(row: Row) -> None: | ||
117 | for cond, (payee, desc, account, flag, tag) in compiled_rules: | ||
118 | if not _rule_condition_matches(cond, row): | ||
119 | continue | ||
120 | if payee is not None: | ||
121 | if row.payee == row.comment: | ||
122 | row.comment = '' | ||
123 | row.payee = payee | ||
124 | if desc is not None: | ||
125 | row.comment = desc | ||
126 | if account is not None: | ||
127 | row.assign_to_account(account) | ||
128 | if flag is not None: | ||
129 | row.flag = flag | ||
130 | if tag is not None: | ||
131 | row.tags.add(tag) | ||
132 | if row.postings: | ||
133 | return | ||
134 | return do_extract | ||