aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_extras_kris7t/importers/rules.py
blob: 3890f24d048cef534143f40a51640206f64224b9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3

from typing import cast, Dict, List, NamedTuple, Optional, Tuple, Union
import re

from beancount.core.amount import Amount

from beancount_extras_kris7t.importers.utils import Extractor, Row

WILDCARD = re.compile('.*')


class When(NamedTuple):
    payee: re.Pattern
    text: re.Pattern
    amount: Optional[Amount]


def _compile_regex(s: str) -> re.Pattern:
    return re.compile(s, re.IGNORECASE)


def when(payee: Optional[Union[re.Pattern, str]] = None,
         text: Optional[Union[re.Pattern, str]] = None,
         amount: Optional[Amount] = None) -> When:
    if not payee and not text:
        raise TypeError('at least one of payee and desc must be provided')
    if isinstance(payee, str):
        payee_regex = _compile_regex(payee)
    else:
        payee_regex = payee or WILDCARD
    if isinstance(text, str):
        text_regex = _compile_regex(text)
    else:
        text_regex = text or WILDCARD
    return When(payee_regex, text_regex, amount)


Condition = Union[str, re.Pattern, When]


def _compile_condition(cond: Condition) -> When:
    if isinstance(cond, When):
        return cond
    else:
        return when(text=cond)


class let(NamedTuple):
    payee: Optional[str] = None
    desc: Optional[str] = None
    account: Optional[str] = None
    flag: Optional[str] = None
    tag: Optional[str] = None


Action = Union[str,
               Tuple[str, str],
               Tuple[str, str, str],
               Tuple[str, str, str, str],
               let]


def _compile_action(action: Action) -> let:
    if isinstance(action, str):
        return let(account=action)
    if isinstance(action, let):
        return action
    elif isinstance(action, tuple):
        if len(action) == 2:
            payee, account = cast(Tuple[str, str], action)
            return let(payee=payee, account=account)
        elif len(action) == 3:
            payee, desc, account = cast(Tuple[str, str, str], action)
            return let(payee, desc, account)
        else:
            flag, payee, desc, account = cast(Tuple[str, str, str, str], action)
            return let(payee, desc, account, flag)
    else:
        raise ValueError(f'Unknown action: {action}')


Rules = Dict[Condition, Action]
CompiledRules = List[Tuple[When, let]]


def _compile_rules(rules: Rules) -> CompiledRules:
    return [(_compile_condition(cond), _compile_action(action))
            for cond, action in rules.items()]


def _rule_condition_matches(cond: When, row: Row) -> bool:
    if row.payee:
        payee_valid = cond.payee.search(row.payee) is not None
    else:
        payee_valid = cond.payee == WILDCARD
    if cond.text == WILDCARD:
        text_valid = True
    else:
        characteristics: List[str] = []
        if row.entry_type:
            characteristics.append(row.entry_type)
        if row.payee:
            characteristics.append(row.payee)
        if row.comment:
            characteristics.append(row.comment)
        row_str = ' '.join(characteristics)
        text_valid = cond.text.search(row_str) is not None
    amount_valid = not cond.amount or row.transacted_amount == cond.amount
    return payee_valid and text_valid and amount_valid


def extract_rules(input_rules: Rules) -> Extractor:
    compiled_rules = _compile_rules(input_rules)

    def do_extract(row: Row) -> None:
        for cond, (payee, desc, account, flag, tag) in compiled_rules:
            if not _rule_condition_matches(cond, row):
                continue
            if payee is not None:
                if row.payee == row.comment:
                    row.comment = ''
                row.payee = payee
            if desc is not None:
                row.comment = desc
            if account is not None:
                row.assign_to_account(account)
            if flag is not None:
                row.flag = flag
            if tag is not None:
                row.tags.add(tag)
            if row.postings:
                return
    return do_extract