''' Plugin that splits off postings into a new transaction to simulate settlement dates. ''' __copyright__ = 'Copyright (c) 2020 Kristóf Marussy ' __license__ = 'GNU GPLv2' from collections import defaultdict import datetime as dt from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union from beancount.core import amount, convert from beancount.core.amount import Amount from beancount.core.data import Cost, CostSpec, Directive, Entries, Meta, Posting, Open, \ Transaction from beancount.core.inventory import Inventory from beancount.core.number import ZERO __plugins__ = ('split_entries_via_transfer_accounts',) TRANSFER_ACCOUNT_META = 'transfer-account' TRANSFER_DATE_META = 'transfer-date' TRANSFER_CONVERTED_META = 'transfer-converted' TRANSFER_CONVERTED_DEFAULT = True class TransferAccountError(NamedTuple): source: Optional[Meta] message: str entry: Directive class _OutgoingTransfer(NamedTuple): accout: str currency: str cost: Optional[Union[Cost, CostSpec]] price: Optional[Amount] class _IncomingTransfer(NamedTuple): account: str date: dt.date class _IncomingPostings(NamedTuple): postings: List[Posting] inventroy: Inventory class _Splitter: _entry: Transaction _processed_entries: Entries _errors: List[TransferAccountError] _default_converted: Dict[str, bool] _processed_postings: List[Posting] _amounts_to_transfer: Dict[_OutgoingTransfer, Amount] _new_transactions: Dict[_IncomingTransfer, _IncomingPostings] def __init__( self, entry: Transaction, processed_entries: Entries, errors: List[TransferAccountError], default_converted: Dict[str, bool]): self._entry = entry self._processed_entries = processed_entries self._errors = errors self._default_converted = default_converted self._processed_postings = [] self._amounts_to_transfer = {} self._new_transactions = defaultdict(lambda: _IncomingPostings([], Inventory())) def split(self) -> None: for posting in self._entry.postings: self._split_posting(posting) if not self._amounts_to_transfer: self._processed_entries.append(self._entry) return for (account, _, cost, price), units in self._amounts_to_transfer.items(): if units.number != ZERO: self._processed_postings.append(Posting(account, units, cost, price, None, None)) self._processed_entries.append(self._entry._replace(postings=self._processed_postings)) for (account, date), (postings, inv) in self._new_transactions.items(): for (units, cost) in inv: postings.append(Posting(account, -units, cost, None, None, None)) self._processed_entries.append(self._entry._replace(date=date, postings=postings)) def _split_posting(self, posting: Posting) -> None: if not posting.meta: self._processed_postings.append(posting) return transfer_account = posting.meta.pop(TRANSFER_ACCOUNT_META, None) transfer_date = posting.meta.pop(TRANSFER_DATE_META, None) transfer_converted = posting.meta.pop(TRANSFER_CONVERTED_META, None) if transfer_account is None: if transfer_date is not None: self._report_error( f'{TRANSFER_DATE_META} was set but {TRANSFER_ACCOUNT_META} was not') if transfer_converted is not None: self._report_error( f'{TRANSFER_CONVERTED_META} was set but {TRANSFER_ACCOUNT_META} was not') self._processed_postings.append(posting) return if not isinstance(transfer_account, str): self._report_error( f'{TRANSFER_ACCOUNT_META} must be a string, got {transfer_account} instead') self._processed_postings.append(posting) return if transfer_date is None: transfer_date = self._entry.date elif not isinstance(transfer_date, dt.date): self._report_error( f'{TRANSFER_DATE_META} must be a date, got {transfer_date} instead') transfer_date = self._entry.date transfer_converted_default = self._default_converted.get( transfer_account, TRANSFER_CONVERTED_DEFAULT) if transfer_converted is None: transfer_converted = transfer_converted_default elif not isinstance(transfer_converted, bool): self._report_error( f'{TRANSFER_CONVERTED_META} must be a Boolean, got {transfer_converted} instead') transfer_converted = transfer_converted_default elif posting.price is None and posting.cost is None: self._report_error( f'{TRANSFER_CONVERTED_META} was set, but there is no conversion') assert posting.units self._split_posting_with_options( posting, transfer_account, transfer_date, transfer_converted) def _split_posting_with_options( self, posting: Posting, transfer_account: str, transfer_date: dt.date, transfer_converted: bool) -> None: incoming = _IncomingTransfer(transfer_account, transfer_date) incoming_postings, inv = self._new_transactions[incoming] converted_amount = convert.get_weight(posting) if transfer_converted: outgoing = _OutgoingTransfer( transfer_account, posting.units.currency, posting.cost, posting.price) self._accumulate_outgoing(outgoing, posting.units) incoming_postings.append(posting._replace(price=None)) inv.add_amount(posting.units, posting.cost) else: outgoing = _OutgoingTransfer(transfer_account, converted_amount.currency, None, None) self._accumulate_outgoing(outgoing, converted_amount) incoming_postings.append(posting) inv.add_amount(converted_amount) def _accumulate_outgoing(self, outgoing: _OutgoingTransfer, units: Amount) -> None: current_amount = self._amounts_to_transfer.get(outgoing, None) if current_amount: self._amounts_to_transfer[outgoing] = amount.add(current_amount, units) else: self._amounts_to_transfer[outgoing] = units def _report_error(self, message: str) -> None: self._errors.append(TransferAccountError(self._entry.meta, message, self._entry)) def split_entries_via_transfer_accounts( entries: Entries, options_map: Dict[str, Any], config_str: Optional[str] = None) -> Tuple[Entries, List[TransferAccountError]]: default_converted: Dict[str, bool] = {} errors: List[TransferAccountError] = [] for entry in entries: if isinstance(entry, Open): if not entry.meta: continue transfer_converted = entry.meta.get(TRANSFER_CONVERTED_META, None) if transfer_converted is None: continue if not isinstance(transfer_converted, bool): errors.append(TransferAccountError( entry.meta, f'{TRANSFER_CONVERTED_META} must be a Boolean,' + f' got {transfer_converted} instead', entry)) default_converted[entry.account] = transfer_converted processed_entries: Entries = [] for entry in entries: if isinstance(entry, Transaction): splitter = _Splitter(entry, processed_entries, errors, default_converted) splitter.split() else: processed_entries.append(entry) return processed_entries, errors