aboutsummaryrefslogblamecommitdiffstats
path: root/beancount_extras_kris7t/plugins/templates.py
blob: 7d2d2c134017351bbb1680d4942f93a395b6ec63 (plain) (tree)















































































































































                                                                                       
'''
Plugin that closes an account by transferring its whole balance to another account.
'''
__copyright__ = 'Copyright (c) 2020  Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'

import datetime as dt
from decimal import Decimal
from typing import Any, Dict, List, NamedTuple, Optional, Tuple

from beancount.core import amount
from beancount.core.data import Custom, Directive, Entries, Meta, Transaction, Union
from beancount.core.number import ZERO

__plugins__ = ('apply_templates',)

TEMPLATE_META = 'template'
TEMPLATE_USE_CUSTOM = 'template-use'
TEMPLATE_DELETE_CUSTOM = 'template-delete'
TEMPLATE_TAG_PREFIX = 'template'


Templates = Dict[str, Transaction]


class TemplateError(NamedTuple):
    source: Optional[Meta]
    message: str
    entry: Directive


def _create_transaction(date: dt.date,
                        meta: Meta,
                        template: Transaction,
                        scale_factor: Decimal) -> Transaction:
    return template._replace(
        date=date,
        meta={**template.meta, **meta},
        postings=[posting._replace(units=amount.mul(posting.units, scale_factor))
                  for posting in template.postings])


def _use_template(entry: Custom,
                  templates: Templates) -> Union[Transaction, TemplateError]:
    if len(entry.values) == 0:
        return TemplateError(entry.meta, 'Template name missing', entry)
    if len(entry.values) > 2:
        return TemplateError(
            entry.meta,
            f'Too many {TEMPLATE_USE_CUSTOM} arguments',
            entry)
    template_name = entry.values[0].value
    if not isinstance(template_name, str):
        return TemplateError(
            entry.meta,
            f'Template name must be a string, got {template_name} instead',
            entry)
    template = templates.get(template_name, None)
    if template is None:
        return TemplateError(
            entry.meta,
            f'Unknown template: {template_name}',
            entry)
    if len(entry.values) == 2:
        scale_factor = entry.values[1].value
        if not isinstance(scale_factor, Decimal):
            return TemplateError(
                entry.meta,
                f'Invalid scale factor {scale_factor}',
                entry)
        if scale_factor == ZERO:
            return TemplateError(
                entry.meta,
                f'Scale factor must not be {ZERO}',
                entry)
    else:
        scale_factor = Decimal(1.0)
    return _create_transaction(entry.date, entry.meta, template, scale_factor)


def _add_template(entry: Transaction, templates: Templates) -> Optional[TemplateError]:
    template_name = entry.meta[TEMPLATE_META]
    if not isinstance(template_name, str):
        return TemplateError(
            entry.meta,
            f'{TEMPLATE_META} must be a string, got {template_name} instead',
            entry)
    new_meta = dict(entry.meta)
    del new_meta[TEMPLATE_META]
    templates[template_name] = entry._replace(
        meta=new_meta,
        links={*entry.links, f'{TEMPLATE_TAG_PREFIX}_{template_name}'})
    return None


def _delete_template(entry: Custom, templates: Templates) -> Optional[TemplateError]:
    if len(entry.values) != 1:
        return TemplateError(
            entry.meta,
            f'{TEMPLATE_DELETE_CUSTOM} takes a single argument',
            entry)
    template_name = entry.values[0].value
    if not isinstance(template_name, str):
        return TemplateError(
            entry.meta,
            f'Template name must be a string, got {template_name} instead',
            entry)
    if template_name not in templates:
        return TemplateError(
            entry.meta,
            f'Unknown template: {template_name}',
            entry)
    del templates[template_name]
    return None


def apply_templates(entries: Entries,
                    options_map: Dict[str, Any],
                    config_str: Optional[str] = None) -> \
                    Tuple[Entries, List[TemplateError]]:
    new_entries: Entries = []
    errors: List[TemplateError] = []
    templates: Templates = {}
    for entry in entries:
        if isinstance(entry, Transaction) and TEMPLATE_META in entry.meta:
            result = _add_template(entry, templates)
            if result:
                errors.append(result)
        elif isinstance(entry, Custom):
            if entry.type == TEMPLATE_USE_CUSTOM:
                result = _use_template(entry, templates)
                if isinstance(result, TemplateError):
                    errors.append(result)
                else:
                    new_entries.append(result)
            elif entry.type == TEMPLATE_DELETE_CUSTOM:
                result = _delete_template(entry, templates)
                if result:
                    errors.append(result)
            else:
                new_entries.append(entry)
        else:
            new_entries.append(entry)
    return new_entries, errors