diff options
author | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2021-01-25 01:14:28 +0100 |
commit | a1c2a999e449054d6641bbb633954e45fcd63f90 (patch) | |
tree | 47628c10ded721d66e47b5f87f501293cd8af003 /beancount_extras_kris7t/plugins/transfer_accounts.py | |
parent | Initialize package (diff) | |
download | beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.gz beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.tar.zst beancount-extras-kris7t-a1c2a999e449054d6641bbb633954e45fcd63f90.zip |
Add plugins and importers from private config
The importers are missing tests, because not having any specifications
for the import formats means we must use real, private data as test inputs
Diffstat (limited to 'beancount_extras_kris7t/plugins/transfer_accounts.py')
-rw-r--r-- | beancount_extras_kris7t/plugins/transfer_accounts.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/plugins/transfer_accounts.py b/beancount_extras_kris7t/plugins/transfer_accounts.py new file mode 100644 index 0000000..653a54f --- /dev/null +++ b/beancount_extras_kris7t/plugins/transfer_accounts.py | |||
@@ -0,0 +1,188 @@ | |||
1 | ''' | ||
2 | Plugin that splits off postings into a new transaction to simulate settlement dates. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | from collections import defaultdict | ||
8 | import datetime as dt | ||
9 | from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union | ||
10 | |||
11 | from beancount.core import amount, convert | ||
12 | from beancount.core.amount import Amount | ||
13 | from beancount.core.data import Cost, CostSpec, Directive, Entries, Meta, Posting, Open, \ | ||
14 | Transaction | ||
15 | from beancount.core.inventory import Inventory | ||
16 | from beancount.core.number import ZERO | ||
17 | |||
18 | __plugins__ = ('split_entries_via_transfer_accounts',) | ||
19 | |||
20 | TRANSFER_ACCOUNT_META = 'transfer-account' | ||
21 | TRANSFER_DATE_META = 'transfer-date' | ||
22 | TRANSFER_CONVERTED_META = 'transfer-converted' | ||
23 | TRANSFER_CONVERTED_DEFAULT = True | ||
24 | |||
25 | |||
26 | class TransferAccountError(NamedTuple): | ||
27 | source: Optional[Meta] | ||
28 | message: str | ||
29 | entry: Directive | ||
30 | |||
31 | |||
32 | class _OutgoingTransfer(NamedTuple): | ||
33 | accout: str | ||
34 | currency: str | ||
35 | cost: Optional[Union[Cost, CostSpec]] | ||
36 | price: Optional[Amount] | ||
37 | |||
38 | |||
39 | class _IncomingTransfer(NamedTuple): | ||
40 | account: str | ||
41 | date: dt.date | ||
42 | |||
43 | |||
44 | class _IncomingPostings(NamedTuple): | ||
45 | postings: List[Posting] | ||
46 | inventroy: Inventory | ||
47 | |||
48 | |||
49 | class _Splitter: | ||
50 | _entry: Transaction | ||
51 | _processed_entries: Entries | ||
52 | _errors: List[TransferAccountError] | ||
53 | _default_converted: Dict[str, bool] | ||
54 | _processed_postings: List[Posting] | ||
55 | _amounts_to_transfer: Dict[_OutgoingTransfer, Amount] | ||
56 | _new_transactions: Dict[_IncomingTransfer, _IncomingPostings] | ||
57 | |||
58 | def __init__( | ||
59 | self, | ||
60 | entry: Transaction, | ||
61 | processed_entries: Entries, | ||
62 | errors: List[TransferAccountError], | ||
63 | default_converted: Dict[str, bool]): | ||
64 | self._entry = entry | ||
65 | self._processed_entries = processed_entries | ||
66 | self._errors = errors | ||
67 | self._default_converted = default_converted | ||
68 | self._processed_postings = [] | ||
69 | self._amounts_to_transfer = {} | ||
70 | self._new_transactions = defaultdict(lambda: _IncomingPostings([], Inventory())) | ||
71 | |||
72 | def split(self) -> None: | ||
73 | for posting in self._entry.postings: | ||
74 | self._split_posting(posting) | ||
75 | if not self._amounts_to_transfer: | ||
76 | self._processed_entries.append(self._entry) | ||
77 | return | ||
78 | for (account, _, cost, price), units in self._amounts_to_transfer.items(): | ||
79 | if units.number != ZERO: | ||
80 | self._processed_postings.append(Posting(account, units, cost, price, None, None)) | ||
81 | self._processed_entries.append(self._entry._replace(postings=self._processed_postings)) | ||
82 | for (account, date), (postings, inv) in self._new_transactions.items(): | ||
83 | for (units, cost) in inv: | ||
84 | postings.append(Posting(account, -units, cost, None, None, None)) | ||
85 | self._processed_entries.append(self._entry._replace(date=date, postings=postings)) | ||
86 | |||
87 | def _split_posting(self, posting: Posting) -> None: | ||
88 | if not posting.meta: | ||
89 | self._processed_postings.append(posting) | ||
90 | return | ||
91 | transfer_account = posting.meta.pop(TRANSFER_ACCOUNT_META, None) | ||
92 | transfer_date = posting.meta.pop(TRANSFER_DATE_META, None) | ||
93 | transfer_converted = posting.meta.pop(TRANSFER_CONVERTED_META, None) | ||
94 | if transfer_account is None: | ||
95 | if transfer_date is not None: | ||
96 | self._report_error( | ||
97 | f'{TRANSFER_DATE_META} was set but {TRANSFER_ACCOUNT_META} was not') | ||
98 | if transfer_converted is not None: | ||
99 | self._report_error( | ||
100 | f'{TRANSFER_CONVERTED_META} was set but {TRANSFER_ACCOUNT_META} was not') | ||
101 | self._processed_postings.append(posting) | ||
102 | return | ||
103 | if not isinstance(transfer_account, str): | ||
104 | self._report_error( | ||
105 | f'{TRANSFER_ACCOUNT_META} must be a string, got {transfer_account} instead') | ||
106 | self._processed_postings.append(posting) | ||
107 | return | ||
108 | if transfer_date is None: | ||
109 | transfer_date = self._entry.date | ||
110 | elif not isinstance(transfer_date, dt.date): | ||
111 | self._report_error( | ||
112 | f'{TRANSFER_DATE_META} must be a date, got {transfer_date} instead') | ||
113 | transfer_date = self._entry.date | ||
114 | transfer_converted_default = self._default_converted.get( | ||
115 | transfer_account, TRANSFER_CONVERTED_DEFAULT) | ||
116 | if transfer_converted is None: | ||
117 | transfer_converted = transfer_converted_default | ||
118 | elif not isinstance(transfer_converted, bool): | ||
119 | self._report_error( | ||
120 | f'{TRANSFER_CONVERTED_META} must be a Boolean, got {transfer_converted} instead') | ||
121 | transfer_converted = transfer_converted_default | ||
122 | elif posting.price is None and posting.cost is None: | ||
123 | self._report_error( | ||
124 | f'{TRANSFER_CONVERTED_META} was set, but there is no conversion') | ||
125 | assert posting.units | ||
126 | self._split_posting_with_options( | ||
127 | posting, transfer_account, transfer_date, transfer_converted) | ||
128 | |||
129 | def _split_posting_with_options( | ||
130 | self, | ||
131 | posting: Posting, | ||
132 | transfer_account: str, | ||
133 | transfer_date: dt.date, | ||
134 | transfer_converted: bool) -> None: | ||
135 | incoming = _IncomingTransfer(transfer_account, transfer_date) | ||
136 | incoming_postings, inv = self._new_transactions[incoming] | ||
137 | converted_amount = convert.get_weight(posting) | ||
138 | if transfer_converted: | ||
139 | outgoing = _OutgoingTransfer( | ||
140 | transfer_account, posting.units.currency, posting.cost, posting.price) | ||
141 | self._accumulate_outgoing(outgoing, posting.units) | ||
142 | incoming_postings.append(posting._replace(price=None)) | ||
143 | inv.add_amount(posting.units, posting.cost) | ||
144 | else: | ||
145 | outgoing = _OutgoingTransfer(transfer_account, converted_amount.currency, None, None) | ||
146 | self._accumulate_outgoing(outgoing, converted_amount) | ||
147 | incoming_postings.append(posting) | ||
148 | inv.add_amount(converted_amount) | ||
149 | |||
150 | def _accumulate_outgoing(self, outgoing: _OutgoingTransfer, units: Amount) -> None: | ||
151 | current_amount = self._amounts_to_transfer.get(outgoing, None) | ||
152 | if current_amount: | ||
153 | self._amounts_to_transfer[outgoing] = amount.add(current_amount, units) | ||
154 | else: | ||
155 | self._amounts_to_transfer[outgoing] = units | ||
156 | |||
157 | def _report_error(self, message: str) -> None: | ||
158 | self._errors.append(TransferAccountError(self._entry.meta, message, self._entry)) | ||
159 | |||
160 | |||
161 | def split_entries_via_transfer_accounts( | ||
162 | entries: Entries, | ||
163 | options_map: Dict[str, Any], | ||
164 | config_str: Optional[str] = None) -> Tuple[Entries, List[TransferAccountError]]: | ||
165 | default_converted: Dict[str, bool] = {} | ||
166 | errors: List[TransferAccountError] = [] | ||
167 | for entry in entries: | ||
168 | if isinstance(entry, Open): | ||
169 | if not entry.meta: | ||
170 | continue | ||
171 | transfer_converted = entry.meta.get(TRANSFER_CONVERTED_META, None) | ||
172 | if transfer_converted is None: | ||
173 | continue | ||
174 | if not isinstance(transfer_converted, bool): | ||
175 | errors.append(TransferAccountError( | ||
176 | entry.meta, | ||
177 | f'{TRANSFER_CONVERTED_META} must be a Boolean,' + | ||
178 | f' got {transfer_converted} instead', | ||
179 | entry)) | ||
180 | default_converted[entry.account] = transfer_converted | ||
181 | processed_entries: Entries = [] | ||
182 | for entry in entries: | ||
183 | if isinstance(entry, Transaction): | ||
184 | splitter = _Splitter(entry, processed_entries, errors, default_converted) | ||
185 | splitter.split() | ||
186 | else: | ||
187 | processed_entries.append(entry) | ||
188 | return processed_entries, errors | ||