aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_extras_kris7t/plugins/selective_implicit_prices.py
blob: 07dc893e8794c6292254eee36529330acbebcbdf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""This plugin synthesizes Price directives for all Postings with a price or
directive or if it is an augmenting posting, has a cost directive.

Price directives will be synthesized only for commodities with the
implicit-prices: TRUE metadata set.
"""
__copyright__ = "Copyright (C) 2015-2017  Martin Blais, " + \
    "2020  Kristóf Marussy <kristof@marussy.com>"
__license__ = "GNU GPLv2"

import collections
from typing import List, Tuple, Set

from beancount.core.data import Commodity, Entries, Transaction
from beancount.core import data
from beancount.core import amount
from beancount.core import inventory
from beancount.core.position import Cost

__plugins__ = ('add_implicit_prices',)


ImplicitPriceError = collections.namedtuple('ImplicitPriceError', 'source message entry')


METADATA_FIELD = "__implicit_prices__"
IMPLICIT_PRICES_META = "implicit-prices"


def fetch_commodities(entries: Entries) -> Tuple[Set[str], List[ImplicitPriceError]]:
    commodities: Set[str] = set()
    errors: List[ImplicitPriceError] = []
    for entry in entries:
        if isinstance(entry, Commodity):
            implicit_prices = entry.meta.get(IMPLICIT_PRICES_META, False)
            if not isinstance(implicit_prices, bool):
                errors.append(ImplicitPriceError(
                    entry.meta,
                    f'{IMPLICIT_PRICES_META} must be Boolean, got {implicit_prices} instead',
                    entry))
            if implicit_prices:
                commodities.add(entry.currency)
    return commodities, errors


def add_implicit_prices(entries: Entries,
                        unused_options_map) -> Tuple[Entries, List[ImplicitPriceError]]:
    """Insert implicitly defined prices from Transactions.

    Explicit price entries are simply maintained in the output list. Prices from
    postings with costs or with prices from Transaction entries are synthesized
    as new Price entries in the list of entries output.

    Args:
      entries: A list of directives. We're interested only in the Transaction instances.
      unused_options_map: A parser options dict.
    Returns:
      A list of entries, possibly with more Price entries than before, and a
      list of errors.
    """
    new_entries: Entries = []
    errors: List[ImplicitPriceError] = []

    commodities, fetch_errors = fetch_commodities(entries)
    errors.extend(fetch_errors)

    # A dict of (date, currency, cost-currency) to price entry.
    new_price_entry_map = {}

    balances = collections.defaultdict(inventory.Inventory)
    for entry in entries:
        # Always replicate the existing entries.
        new_entries.append(entry)

        if isinstance(entry, Transaction):
            # Inspect all the postings in the transaction.
            for posting in entry.postings:
                units = posting.units
                if units.currency not in commodities:
                    continue

                cost = posting.cost

                # Check if the position is matching against an existing
                # position.
                _, booking = balances[posting.account].add_position(posting)

                # Add prices when they're explicitly specified on a posting. An
                # explicitly specified price may occur in a conversion, e.g.
                #      Assets:Account    100 USD @ 1.10 CAD
                # or, if a cost is also specified, as the current price of the
                # underlying instrument, e.g.
                #      Assets:Account    100 HOOL {564.20} @ {581.97} USD
                if posting.price is not None:
                    meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
                    meta[METADATA_FIELD] = "from_price"
                    price_entry = data.Price(meta, entry.date,
                                             units.currency,
                                             posting.price)

                # Add costs, when we're not matching against an existing
                # position. This happens when we're just specifying the cost,
                # e.g.
                #      Assets:Account    100 HOOL {564.20}
                elif (cost is not None and
                      booking != inventory.MatchResult.REDUCED):
                    # TODO(blais): What happens here if the account has no
                    # booking strategy? Do we end up inserting a price for the
                    # reducing leg?  Check.
                    meta = data.new_metadata(entry.meta["filename"], entry.meta["lineno"])
                    meta[METADATA_FIELD] = "from_cost"
                    if isinstance(cost, Cost):
                        price_entry = data.Price(meta, entry.date,
                                                 units.currency,
                                                 amount.Amount(cost.number, cost.currency))
                    else:
                        errors.append(
                                 ImplicitPriceError(
                                     entry.meta,
                                     f'Expected {entry} to have a Cost, got {cost} instead',
                                     entry))
                        price_entry = None
                else:
                    price_entry = None

                if price_entry is not None:
                    key = (price_entry.date,
                           price_entry.currency,
                           price_entry.amount.number,  # Ideally should be removed.
                           price_entry.amount.currency)
                    try:
                        new_price_entry_map[key]

                        # Do not fail for now. We still have many valid use
                        # cases of duplicate prices on the same date, for
                        # example, stock splits, or trades on two dates with
                        # two separate reported prices. We need to figure out a
                        # more elegant solution for this in the long term.
                        # Keeping both for now. We should ideally not use the
                        # number in the de-dup key above.
                        #
                        # dup_entry = new_price_entry_map[key]
                        # if price_entry.amount.number == dup_entry.amount.number:
                        #     # Skip duplicates.
                        #     continue
                        # else:
                        #     errors.append(
                        #         ImplicitPriceError(
                        #             entry.meta,
                        #             "Duplicate prices for {} on {}".format(entry,
                        #                                                    dup_entry),
                        #             entry))
                    except KeyError:
                        new_price_entry_map[key] = price_entry
                        new_entries.append(price_entry)

    return new_entries, errors