'''
Plugin that splits off postings into a new transaction to simulate settlement dates.
'''
__copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'
from collections import defaultdict
import datetime as dt
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
from beancount.core import amount, convert
from beancount.core.amount import Amount
from beancount.core.data import Cost, CostSpec, Directive, Entries, Meta, Posting, Open, \
Transaction
from beancount.core.inventory import Inventory
from beancount.core.number import ZERO
__plugins__ = ('split_entries_via_transfer_accounts',)
TRANSFER_ACCOUNT_META = 'transfer-account'
TRANSFER_DATE_META = 'transfer-date'
TRANSFER_CONVERTED_META = 'transfer-converted'
TRANSFER_CONVERTED_DEFAULT = True
class TransferAccountError(NamedTuple):
source: Optional[Meta]
message: str
entry: Directive
class _OutgoingTransfer(NamedTuple):
accout: str
currency: str
cost: Optional[Union[Cost, CostSpec]]
price: Optional[Amount]
class _IncomingTransfer(NamedTuple):
account: str
date: dt.date
class _IncomingPostings(NamedTuple):
postings: List[Posting]
inventroy: Inventory
class _Splitter:
_entry: Transaction
_processed_entries: Entries
_errors: List[TransferAccountError]
_default_converted: Dict[str, bool]
_processed_postings: List[Posting]
_amounts_to_transfer: Dict[_OutgoingTransfer, Amount]
_new_transactions: Dict[_IncomingTransfer, _IncomingPostings]
def __init__(
self,
entry: Transaction,
processed_entries: Entries,
errors: List[TransferAccountError],
default_converted: Dict[str, bool]):
self._entry = entry
self._processed_entries = processed_entries
self._errors = errors
self._default_converted = default_converted
self._processed_postings = []
self._amounts_to_transfer = {}
self._new_transactions = defaultdict(lambda: _IncomingPostings([], Inventory()))
def split(self) -> None:
for posting in self._entry.postings:
self._split_posting(posting)
if not self._amounts_to_transfer:
self._processed_entries.append(self._entry)
return
for (account, _, cost, price), units in self._amounts_to_transfer.items():
if units.number != ZERO:
self._processed_postings.append(Posting(account, units, cost, price, None, None))
self._processed_entries.append(self._entry._replace(postings=self._processed_postings))
for (account, date), (postings, inv) in self._new_transactions.items():
for (units, cost) in inv:
postings.append(Posting(account, -units, cost, None, None, None))
self._processed_entries.append(self._entry._replace(date=date, postings=postings))
def _split_posting(self, posting: Posting) -> None:
if not posting.meta:
self._processed_postings.append(posting)
return
transfer_account = posting.meta.pop(TRANSFER_ACCOUNT_META, None)
transfer_date = posting.meta.pop(TRANSFER_DATE_META, None)
transfer_converted = posting.meta.pop(TRANSFER_CONVERTED_META, None)
if transfer_account is None:
if transfer_date is not None:
self._report_error(
f'{TRANSFER_DATE_META} was set but {TRANSFER_ACCOUNT_META} was not')
if transfer_converted is not None:
self._report_error(
f'{TRANSFER_CONVERTED_META} was set but {TRANSFER_ACCOUNT_META} was not')
self._processed_postings.append(posting)
return
if not isinstance(transfer_account, str):
self._report_error(
f'{TRANSFER_ACCOUNT_META} must be a string, got {transfer_account} instead')
self._processed_postings.append(posting)
return
if transfer_date is None:
transfer_date = self._entry.date
elif not isinstance(transfer_date, dt.date):
self._report_error(
f'{TRANSFER_DATE_META} must be a date, got {transfer_date} instead')
transfer_date = self._entry.date
transfer_converted_default = self._default_converted.get(
transfer_account, TRANSFER_CONVERTED_DEFAULT)
if transfer_converted is None:
transfer_converted = transfer_converted_default
elif not isinstance(transfer_converted, bool):
self._report_error(
f'{TRANSFER_CONVERTED_META} must be a Boolean, got {transfer_converted} instead')
transfer_converted = transfer_converted_default
elif posting.price is None and posting.cost is None:
self._report_error(
f'{TRANSFER_CONVERTED_META} was set, but there is no conversion')
assert posting.units
self._split_posting_with_options(
posting, transfer_account, transfer_date, transfer_converted)
def _split_posting_with_options(
self,
posting: Posting,
transfer_account: str,
transfer_date: dt.date,
transfer_converted: bool) -> None:
incoming = _IncomingTransfer(transfer_account, transfer_date)
incoming_postings, inv = self._new_transactions[incoming]
converted_amount = convert.get_weight(posting)
if transfer_converted:
outgoing = _OutgoingTransfer(
transfer_account, posting.units.currency, posting.cost, posting.price)
self._accumulate_outgoing(outgoing, posting.units)
incoming_postings.append(posting._replace(price=None))
inv.add_amount(posting.units, posting.cost)
else:
outgoing = _OutgoingTransfer(transfer_account, converted_amount.currency, None, None)
self._accumulate_outgoing(outgoing, converted_amount)
incoming_postings.append(posting)
inv.add_amount(converted_amount)
def _accumulate_outgoing(self, outgoing: _OutgoingTransfer, units: Amount) -> None:
current_amount = self._amounts_to_transfer.get(outgoing, None)
if current_amount:
self._amounts_to_transfer[outgoing] = amount.add(current_amount, units)
else:
self._amounts_to_transfer[outgoing] = units
def _report_error(self, message: str) -> None:
self._errors.append(TransferAccountError(self._entry.meta, message, self._entry))
def split_entries_via_transfer_accounts(
entries: Entries,
options_map: Dict[str, Any],
config_str: Optional[str] = None) -> Tuple[Entries, List[TransferAccountError]]:
default_converted: Dict[str, bool] = {}
errors: List[TransferAccountError] = []
for entry in entries:
if isinstance(entry, Open):
if not entry.meta:
continue
transfer_converted = entry.meta.get(TRANSFER_CONVERTED_META, None)
if transfer_converted is None:
continue
if not isinstance(transfer_converted, bool):
errors.append(TransferAccountError(
entry.meta,
f'{TRANSFER_CONVERTED_META} must be a Boolean,' +
f' got {transfer_converted} instead',
entry))
default_converted[entry.account] = transfer_converted
processed_entries: Entries = []
for entry in entries:
if isinstance(entry, Transaction):
splitter = _Splitter(entry, processed_entries, errors, default_converted)
splitter.split()
else:
processed_entries.append(entry)
return processed_entries, errors