aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_extras_kris7t/plugins/templates.py
blob: 7d2d2c134017351bbb1680d4942f93a395b6ec63 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
'''
Plugin that closes an account by transferring its whole balance to another account.
'''
__copyright__ = 'Copyright (c) 2020  Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'

import datetime as dt
from decimal import Decimal
from typing import Any, Dict, List, NamedTuple, Optional, Tuple

from beancount.core import amount
from beancount.core.data import Custom, Directive, Entries, Meta, Transaction, Union
from beancount.core.number import ZERO

__plugins__ = ('apply_templates',)

TEMPLATE_META = 'template'
TEMPLATE_USE_CUSTOM = 'template-use'
TEMPLATE_DELETE_CUSTOM = 'template-delete'
TEMPLATE_TAG_PREFIX = 'template'


Templates = Dict[str, Transaction]


class TemplateError(NamedTuple):
    source: Optional[Meta]
    message: str
    entry: Directive


def _create_transaction(date: dt.date,
                        meta: Meta,
                        template: Transaction,
                        scale_factor: Decimal) -> Transaction:
    return template._replace(
        date=date,
        meta={**template.meta, **meta},
        postings=[posting._replace(units=amount.mul(posting.units, scale_factor))
                  for posting in template.postings])


def _use_template(entry: Custom,
                  templates: Templates) -> Union[Transaction, TemplateError]:
    if len(entry.values) == 0:
        return TemplateError(entry.meta, 'Template name missing', entry)
    if len(entry.values) > 2:
        return TemplateError(
            entry.meta,
            f'Too many {TEMPLATE_USE_CUSTOM} arguments',
            entry)
    template_name = entry.values[0].value
    if not isinstance(template_name, str):
        return TemplateError(
            entry.meta,
            f'Template name must be a string, got {template_name} instead',
            entry)
    template = templates.get(template_name, None)
    if template is None:
        return TemplateError(
            entry.meta,
            f'Unknown template: {template_name}',
            entry)
    if len(entry.values) == 2:
        scale_factor = entry.values[1].value
        if not isinstance(scale_factor, Decimal):
            return TemplateError(
                entry.meta,
                f'Invalid scale factor {scale_factor}',
                entry)
        if scale_factor == ZERO:
            return TemplateError(
                entry.meta,
                f'Scale factor must not be {ZERO}',
                entry)
    else:
        scale_factor = Decimal(1.0)
    return _create_transaction(entry.date, entry.meta, template, scale_factor)


def _add_template(entry: Transaction, templates: Templates) -> Optional[TemplateError]:
    template_name = entry.meta[TEMPLATE_META]
    if not isinstance(template_name, str):
        return TemplateError(
            entry.meta,
            f'{TEMPLATE_META} must be a string, got {template_name} instead',
            entry)
    new_meta = dict(entry.meta)
    del new_meta[TEMPLATE_META]
    templates[template_name] = entry._replace(
        meta=new_meta,
        links={*entry.links, f'{TEMPLATE_TAG_PREFIX}_{template_name}'})
    return None


def _delete_template(entry: Custom, templates: Templates) -> Optional[TemplateError]:
    if len(entry.values) != 1:
        return TemplateError(
            entry.meta,
            f'{TEMPLATE_DELETE_CUSTOM} takes a single argument',
            entry)
    template_name = entry.values[0].value
    if not isinstance(template_name, str):
        return TemplateError(
            entry.meta,
            f'Template name must be a string, got {template_name} instead',
            entry)
    if template_name not in templates:
        return TemplateError(
            entry.meta,
            f'Unknown template: {template_name}',
            entry)
    del templates[template_name]
    return None


def apply_templates(entries: Entries,
                    options_map: Dict[str, Any],
                    config_str: Optional[str] = None) -> \
                    Tuple[Entries, List[TemplateError]]:
    new_entries: Entries = []
    errors: List[TemplateError] = []
    templates: Templates = {}
    for entry in entries:
        if isinstance(entry, Transaction) and TEMPLATE_META in entry.meta:
            result = _add_template(entry, templates)
            if result:
                errors.append(result)
        elif isinstance(entry, Custom):
            if entry.type == TEMPLATE_USE_CUSTOM:
                result = _use_template(entry, templates)
                if isinstance(result, TemplateError):
                    errors.append(result)
                else:
                    new_entries.append(result)
            elif entry.type == TEMPLATE_DELETE_CUSTOM:
                result = _delete_template(entry, templates)
                if result:
                    errors.append(result)
            else:
                new_entries.append(entry)
        else:
            new_entries.append(entry)
    return new_entries, errors