diff options
Diffstat (limited to 'beancount_extras_kris7t/plugins/transfer_accounts.py')
-rw-r--r-- | beancount_extras_kris7t/plugins/transfer_accounts.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/plugins/transfer_accounts.py b/beancount_extras_kris7t/plugins/transfer_accounts.py new file mode 100644 index 0000000..653a54f --- /dev/null +++ b/beancount_extras_kris7t/plugins/transfer_accounts.py | |||
@@ -0,0 +1,188 @@ | |||
1 | ''' | ||
2 | Plugin that splits off postings into a new transaction to simulate settlement dates. | ||
3 | ''' | ||
4 | __copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>' | ||
5 | __license__ = 'GNU GPLv2' | ||
6 | |||
7 | from collections import defaultdict | ||
8 | import datetime as dt | ||
9 | from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union | ||
10 | |||
11 | from beancount.core import amount, convert | ||
12 | from beancount.core.amount import Amount | ||
13 | from beancount.core.data import Cost, CostSpec, Directive, Entries, Meta, Posting, Open, \ | ||
14 | Transaction | ||
15 | from beancount.core.inventory import Inventory | ||
16 | from beancount.core.number import ZERO | ||
17 | |||
18 | __plugins__ = ('split_entries_via_transfer_accounts',) | ||
19 | |||
20 | TRANSFER_ACCOUNT_META = 'transfer-account' | ||
21 | TRANSFER_DATE_META = 'transfer-date' | ||
22 | TRANSFER_CONVERTED_META = 'transfer-converted' | ||
23 | TRANSFER_CONVERTED_DEFAULT = True | ||
24 | |||
25 | |||
26 | class TransferAccountError(NamedTuple): | ||
27 | source: Optional[Meta] | ||
28 | message: str | ||
29 | entry: Directive | ||
30 | |||
31 | |||
32 | class _OutgoingTransfer(NamedTuple): | ||
33 | accout: str | ||
34 | currency: str | ||
35 | cost: Optional[Union[Cost, CostSpec]] | ||
36 | price: Optional[Amount] | ||
37 | |||
38 | |||
39 | class _IncomingTransfer(NamedTuple): | ||
40 | account: str | ||
41 | date: dt.date | ||
42 | |||
43 | |||
44 | class _IncomingPostings(NamedTuple): | ||
45 | postings: List[Posting] | ||
46 | inventroy: Inventory | ||
47 | |||
48 | |||
49 | class _Splitter: | ||
50 | _entry: Transaction | ||
51 | _processed_entries: Entries | ||
52 | _errors: List[TransferAccountError] | ||
53 | _default_converted: Dict[str, bool] | ||
54 | _processed_postings: List[Posting] | ||
55 | _amounts_to_transfer: Dict[_OutgoingTransfer, Amount] | ||
56 | _new_transactions: Dict[_IncomingTransfer, _IncomingPostings] | ||
57 | |||
58 | def __init__( | ||
59 | self, | ||
60 | entry: Transaction, | ||
61 | processed_entries: Entries, | ||
62 | errors: List[TransferAccountError], | ||
63 | default_converted: Dict[str, bool]): | ||
64 | self._entry = entry | ||
65 | self._processed_entries = processed_entries | ||
66 | self._errors = errors | ||
67 | self._default_converted = default_converted | ||
68 | self._processed_postings = [] | ||
69 | self._amounts_to_transfer = {} | ||
70 | self._new_transactions = defaultdict(lambda: _IncomingPostings([], Inventory())) | ||
71 | |||
72 | def split(self) -> None: | ||
73 | for posting in self._entry.postings: | ||
74 | self._split_posting(posting) | ||
75 | if not self._amounts_to_transfer: | ||
76 | self._processed_entries.append(self._entry) | ||
77 | return | ||
78 | for (account, _, cost, price), units in self._amounts_to_transfer.items(): | ||
79 | if units.number != ZERO: | ||
80 | self._processed_postings.append(Posting(account, units, cost, price, None, None)) | ||
81 | self._processed_entries.append(self._entry._replace(postings=self._processed_postings)) | ||
82 | for (account, date), (postings, inv) in self._new_transactions.items(): | ||
83 | for (units, cost) in inv: | ||
84 | postings.append(Posting(account, -units, cost, None, None, None)) | ||
85 | self._processed_entries.append(self._entry._replace(date=date, postings=postings)) | ||
86 | |||
87 | def _split_posting(self, posting: Posting) -> None: | ||
88 | if not posting.meta: | ||
89 | self._processed_postings.append(posting) | ||
90 | return | ||
91 | transfer_account = posting.meta.pop(TRANSFER_ACCOUNT_META, None) | ||
92 | transfer_date = posting.meta.pop(TRANSFER_DATE_META, None) | ||
93 | transfer_converted = posting.meta.pop(TRANSFER_CONVERTED_META, None) | ||
94 | if transfer_account is None: | ||
95 | if transfer_date is not None: | ||
96 | self._report_error( | ||
97 | f'{TRANSFER_DATE_META} was set but {TRANSFER_ACCOUNT_META} was not') | ||
98 | if transfer_converted is not None: | ||
99 | self._report_error( | ||
100 | f'{TRANSFER_CONVERTED_META} was set but {TRANSFER_ACCOUNT_META} was not') | ||
101 | self._processed_postings.append(posting) | ||
102 | return | ||
103 | if not isinstance(transfer_account, str): | ||
104 | self._report_error( | ||
105 | f'{TRANSFER_ACCOUNT_META} must be a string, got {transfer_account} instead') | ||
106 | self._processed_postings.append(posting) | ||
107 | return | ||
108 | if transfer_date is None: | ||
109 | transfer_date = self._entry.date | ||
110 | elif not isinstance(transfer_date, dt.date): | ||
111 | self._report_error( | ||
112 | f'{TRANSFER_DATE_META} must be a date, got {transfer_date} instead') | ||
113 | transfer_date = self._entry.date | ||
114 | transfer_converted_default = self._default_converted.get( | ||
115 | transfer_account, TRANSFER_CONVERTED_DEFAULT) | ||
116 | if transfer_converted is None: | ||
117 | transfer_converted = transfer_converted_default | ||
118 | elif not isinstance(transfer_converted, bool): | ||
119 | self._report_error( | ||
120 | f'{TRANSFER_CONVERTED_META} must be a Boolean, got {transfer_converted} instead') | ||
121 | transfer_converted = transfer_converted_default | ||
122 | elif posting.price is None and posting.cost is None: | ||
123 | self._report_error( | ||
124 | f'{TRANSFER_CONVERTED_META} was set, but there is no conversion') | ||
125 | assert posting.units | ||
126 | self._split_posting_with_options( | ||
127 | posting, transfer_account, transfer_date, transfer_converted) | ||
128 | |||
129 | def _split_posting_with_options( | ||
130 | self, | ||
131 | posting: Posting, | ||
132 | transfer_account: str, | ||
133 | transfer_date: dt.date, | ||
134 | transfer_converted: bool) -> None: | ||
135 | incoming = _IncomingTransfer(transfer_account, transfer_date) | ||
136 | incoming_postings, inv = self._new_transactions[incoming] | ||
137 | converted_amount = convert.get_weight(posting) | ||
138 | if transfer_converted: | ||
139 | outgoing = _OutgoingTransfer( | ||
140 | transfer_account, posting.units.currency, posting.cost, posting.price) | ||
141 | self._accumulate_outgoing(outgoing, posting.units) | ||
142 | incoming_postings.append(posting._replace(price=None)) | ||
143 | inv.add_amount(posting.units, posting.cost) | ||
144 | else: | ||
145 | outgoing = _OutgoingTransfer(transfer_account, converted_amount.currency, None, None) | ||
146 | self._accumulate_outgoing(outgoing, converted_amount) | ||
147 | incoming_postings.append(posting) | ||
148 | inv.add_amount(converted_amount) | ||
149 | |||
150 | def _accumulate_outgoing(self, outgoing: _OutgoingTransfer, units: Amount) -> None: | ||
151 | current_amount = self._amounts_to_transfer.get(outgoing, None) | ||
152 | if current_amount: | ||
153 | self._amounts_to_transfer[outgoing] = amount.add(current_amount, units) | ||
154 | else: | ||
155 | self._amounts_to_transfer[outgoing] = units | ||
156 | |||
157 | def _report_error(self, message: str) -> None: | ||
158 | self._errors.append(TransferAccountError(self._entry.meta, message, self._entry)) | ||
159 | |||
160 | |||
161 | def split_entries_via_transfer_accounts( | ||
162 | entries: Entries, | ||
163 | options_map: Dict[str, Any], | ||
164 | config_str: Optional[str] = None) -> Tuple[Entries, List[TransferAccountError]]: | ||
165 | default_converted: Dict[str, bool] = {} | ||
166 | errors: List[TransferAccountError] = [] | ||
167 | for entry in entries: | ||
168 | if isinstance(entry, Open): | ||
169 | if not entry.meta: | ||
170 | continue | ||
171 | transfer_converted = entry.meta.get(TRANSFER_CONVERTED_META, None) | ||
172 | if transfer_converted is None: | ||
173 | continue | ||
174 | if not isinstance(transfer_converted, bool): | ||
175 | errors.append(TransferAccountError( | ||
176 | entry.meta, | ||
177 | f'{TRANSFER_CONVERTED_META} must be a Boolean,' + | ||
178 | f' got {transfer_converted} instead', | ||
179 | entry)) | ||
180 | default_converted[entry.account] = transfer_converted | ||
181 | processed_entries: Entries = [] | ||
182 | for entry in entries: | ||
183 | if isinstance(entry, Transaction): | ||
184 | splitter = _Splitter(entry, processed_entries, errors, default_converted) | ||
185 | splitter.split() | ||
186 | else: | ||
187 | processed_entries.append(entry) | ||
188 | return processed_entries, errors | ||