diff options
author | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
commit | a1c2a999e449054d6641bbb633954e45fcd63f90 (patch) | |
tree | 47628c10ded721d66e47b5f87f501293cd8af003 /beancount_extras_kris7t/plugins/templates.py | |
parent | Initialize package (diff) | |
download | beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.gz beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.zst beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.zip |
Add plugins and importers from private config
The importers are missing tests, because not having any specifications
for the import formats means we must use real, private data as test inputs
Diffstat (limited to 'beancount_extras_kris7t/plugins/templates.py')
-rw-r--r-- | beancount_extras_kris7t/plugins/templates.py | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/plugins/templates.py b/beancount_extras_kris7t/plugins/templates.py new file mode 100644 index 0000000..7d2d2c1 --- /dev/null +++ b/beancount_extras_kris7t/plugins/templates.py | |||
@@ -0,0 +1,144 @@ | |||
1 | ''' | ||
2 | Plugin that closes an account by transferring its whole balance to another account. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | import datetime as dt | ||
8 | from decimal import Decimal | ||
9 | from typing import Any, Dict, List, NamedTuple, Optional, Tuple | ||
10 | |||
11 | from beancount.core import amount | ||
12 | from beancount.core.data import Custom, Directive, Entries, Meta, Transaction, Union | ||
13 | from beancount.core.number import ZERO | ||
14 | |||
15 | __plugins__ = ('apply_templates',) | ||
16 | |||
17 | TEMPLATE_META = 'template' | ||
18 | TEMPLATE_USE_CUSTOM = 'template-use' | ||
19 | TEMPLATE_DELETE_CUSTOM = 'template-delete' | ||
20 | TEMPLATE_TAG_PREFIX = 'template' | ||
21 | |||
22 | |||
23 | Templates = Dict[str, Transaction] | ||
24 | |||
25 | |||
26 | class TemplateError(NamedTuple): | ||
27 | source: Optional[Meta] | ||
28 | message: str | ||
29 | entry: Directive | ||
30 | |||
31 | |||
32 | def _create_transaction(date: dt.date, | ||
33 | meta: Meta, | ||
34 | template: Transaction, | ||
35 | scale_factor: Decimal) -> Transaction: | ||
36 | return template._replace( | ||
37 | date=date, | ||
38 | meta={**template.meta, **meta}, | ||
39 | postings=[posting._replace(units=amount.mul(posting.units, scale_factor)) | ||
40 | for posting in template.postings]) | ||
41 | |||
42 | |||
43 | def _use_template(entry: Custom, | ||
44 | templates: Templates) -> Union[Transaction, TemplateError]: | ||
45 | if len(entry.values) == 0: | ||
46 | return TemplateError(entry.meta, 'Template name missing', entry) | ||
47 | if len(entry.values) > 2: | ||
48 | return TemplateError( | ||
49 | entry.meta, | ||
50 | f'Too many {TEMPLATE_USE_CUSTOM} arguments', | ||
51 | entry) | ||
52 | template_name = entry.values[0].value | ||
53 | if not isinstance(template_name, str): | ||
54 | return TemplateError( | ||
55 | entry.meta, | ||
56 | f'Template name must be a string, got {template_name} instead', | ||
57 | entry) | ||
58 | template = templates.get(template_name, None) | ||
59 | if template is None: | ||
60 | return TemplateError( | ||
61 | entry.meta, | ||
62 | f'Unknown template: {template_name}', | ||
63 | entry) | ||
64 | if len(entry.values) == 2: | ||
65 | scale_factor = entry.values[1].value | ||
66 | if not isinstance(scale_factor, Decimal): | ||
67 | return TemplateError( | ||
68 | entry.meta, | ||
69 | f'Invalid scale factor {scale_factor}', | ||
70 | entry) | ||
71 | if scale_factor == ZERO: | ||
72 | return TemplateError( | ||
73 | entry.meta, | ||
74 | f'Scale factor must not be {ZERO}', | ||
75 | entry) | ||
76 | else: | ||
77 | scale_factor = Decimal(1.0) | ||
78 | return _create_transaction(entry.date, entry.meta, template, scale_factor) | ||
79 | |||
80 | |||
81 | def _add_template(entry: Transaction, templates: Templates) -> Optional[TemplateError]: | ||
82 | template_name = entry.meta[TEMPLATE_META] | ||
83 | if not isinstance(template_name, str): | ||
84 | return TemplateError( | ||
85 | entry.meta, | ||
86 | f'{TEMPLATE_META} must be a string, got {template_name} instead', | ||
87 | entry) | ||
88 | new_meta = dict(entry.meta) | ||
89 | del new_meta[TEMPLATE_META] | ||
90 | templates[template_name] = entry._replace( | ||
91 | meta=new_meta, | ||
92 | links={*entry.links, f'{TEMPLATE_TAG_PREFIX}_{template_name}'}) | ||
93 | return None | ||
94 | |||
95 | |||
96 | def _delete_template(entry: Custom, templates: Templates) -> Optional[TemplateError]: | ||
97 | if len(entry.values) != 1: | ||
98 | return TemplateError( | ||
99 | entry.meta, | ||
100 | f'{TEMPLATE_DELETE_CUSTOM} takes a single argument', | ||
101 | entry) | ||
102 | template_name = entry.values[0].value | ||
103 | if not isinstance(template_name, str): | ||
104 | return TemplateError( | ||
105 | entry.meta, | ||
106 | f'Template name must be a string, got {template_name} instead', | ||
107 | entry) | ||
108 | if template_name not in templates: | ||
109 | return TemplateError( | ||
110 | entry.meta, | ||
111 | f'Unknown template: {template_name}', | ||
112 | entry) | ||
113 | del templates[template_name] | ||
114 | return None | ||
115 | |||
116 | |||
117 | def apply_templates(entries: Entries, | ||
118 | options_map: Dict[str, Any], | ||
119 | config_str: Optional[str] = None) -> \ | ||
120 | Tuple[Entries, List[TemplateError]]: | ||
121 | new_entries: Entries = [] | ||
122 | errors: List[TemplateError] = [] | ||
123 | templates: Templates = {} | ||
124 | for entry in entries: | ||
125 | if isinstance(entry, Transaction) and TEMPLATE_META in entry.meta: | ||
126 | result = _add_template(entry, templates) | ||
127 | if result: | ||
128 | errors.append(result) | ||
129 | elif isinstance(entry, Custom): | ||
130 | if entry.type == TEMPLATE_USE_CUSTOM: | ||
131 | result = _use_template(entry, templates) | ||
132 | if isinstance(result, TemplateError): | ||
133 | errors.append(result) | ||
134 | else: | ||
135 | new_entries.append(result) | ||
136 | elif entry.type == TEMPLATE_DELETE_CUSTOM: | ||
137 | result = _delete_template(entry, templates) | ||
138 | if result: | ||
139 | errors.append(result) | ||
140 | else: | ||
141 | new_entries.append(entry) | ||
142 | else: | ||
143 | new_entries.append(entry) | ||
144 | return new_entries, errors | ||