aboutsummaryrefslogtreecommitdiffstats
path: root/beancount_extras_kris7t/importers/transferwise/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'beancount_extras_kris7t/importers/transferwise/client.py')
-rw-r--r--beancount_extras_kris7t/importers/transferwise/client.py236
1 files changed, 236 insertions, 0 deletions
diff --git a/beancount_extras_kris7t/importers/transferwise/client.py b/beancount_extras_kris7t/importers/transferwise/client.py
new file mode 100644
index 0000000..a4b6629
--- /dev/null
+++ b/beancount_extras_kris7t/importers/transferwise/client.py
@@ -0,0 +1,236 @@
1'''
2Importer for Transferwise API transaction history from the command line.
3'''
4__copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>'
5__license__ = 'GNU GPLv2'
6
7import datetime as dt
8import logging
9import os
10from typing import Any, Dict, Optional, Tuple, Set
11
12import beancount
13from beancount.core import data
14
15from beancount_extras_kris7t.importers.transferwise.transferwise_json import Accounts, \
16 DATE_FORMAT, Importer
17
18LOG = logging.getLogger('importers.transferwise.client')
19
20
21def _parse_date_arg(date_str: str) -> dt.date:
22 return dt.datetime.strptime(date_str, '%Y-%m-%d').date()
23
24
25def _import_config(config_path: str) -> Importer:
26 import runpy
27
28 config = runpy.run_path(config_path)
29 importer = config['TRANSFERWISE_CONFIG'] # type: ignore
30 if isinstance(importer, Importer):
31 LOG.info('Loaded configuration from %s', config_path)
32 return importer
33 else:
34 raise ValueError(f'Invalid configuration: {config_path}')
35
36
37def _get_reference(transaction: data.Transaction) -> Optional[str]:
38 for link in transaction.links:
39 if link.startswith('transferwise_'):
40 return link[13:]
41 return None
42
43
44def _get_last_transaction_date(ledger_path: str, skip_references: Set[str]) -> Optional[dt.date]:
45 from beancount.parser import parser
46
47 LOG.info('Checking %s for already imported transactions', ledger_path)
48 entries, _, _ = parser.parse_file(ledger_path)
49 date: Optional[dt.date] = None
50 skip: Set[str] = set()
51 for entry in entries:
52 if isinstance(entry, data.Transaction):
53 reference = _get_reference(entry)
54 if not reference:
55 continue
56 if date is None or date < entry.date:
57 date = entry.date
58 skip.clear()
59 if date == entry.date:
60 skip.add(reference)
61 skip_references.update(skip)
62 return date
63
64
65def _get_date_range(from_date: Optional[dt.date],
66 to_date: dt.date,
67 ledger_path: Optional[str]) -> Tuple[dt.date, dt.date, Set[str]]:
68 skip_references = set()
69 if not from_date and ledger_path:
70 from_date = _get_last_transaction_date(ledger_path, skip_references)
71 if not from_date:
72 from_date = to_date - dt.timedelta(days=365)
73 LOG.info('Fetching transactions from %s to %s', from_date, to_date)
74 return from_date, to_date, skip_references
75
76
77def _get_secrets(importer: Importer,
78 api_key: Optional[str],
79 proxy_uri: Optional[str]) -> Tuple[str, Optional[str]]:
80 import urllib.parse
81
82 if proxy_uri:
83 uri_parts = urllib.parse.urlsplit(proxy_uri)
84 else:
85 uri_parts = None
86 if api_key and (not uri_parts or not uri_parts.username or uri_parts.password):
87 return api_key, proxy_uri
88
89 from contextlib import closing
90
91 import secretstorage
92
93 with closing(secretstorage.dbus_init()) as connection:
94 collection = secretstorage.get_default_collection(connection)
95 if not api_key:
96 items = collection.search_items({
97 'profile_id': str(importer.profile_id),
98 'borderless_account_id': str(importer.borderless_account_id),
99 'xdg:schema': 'com.marussy.beancount.importer.TransferwiseAPIKey',
100 })
101 item = next(items, None)
102 if not item:
103 raise ValueError('No API key found in SecretService')
104 LOG.info('Found API key secret "%s" from SecretService', item.get_label())
105 api_key = item.get_secret().decode('utf-8')
106 if uri_parts and uri_parts.username and not uri_parts.password:
107 host = uri_parts.hostname or uri_parts.netloc
108 items = collection.search_items({
109 'host': host,
110 'port': str(uri_parts.port or 1080),
111 'user': uri_parts.username,
112 'xdg:schema': 'org.freedesktop.Secret.Generic',
113 })
114 item = next(items, None)
115 if item:
116 LOG.info('Found proxy password secret "%s" from SecretService', item.get_label())
117 password = urllib.parse.quote_from_bytes(item.get_secret())
118 uri = f'{uri_parts.scheme}://{uri_parts.username}:{password}@{host}'
119 if uri_parts.port:
120 proxy_uri = f'{uri}:{uri_parts.port}'
121 else:
122 proxy_uri = uri
123 else:
124 LOG.info('No proxy password secret was found in SecretService')
125 assert api_key # Make pyright happy
126 return api_key, proxy_uri
127
128
129def _fetch_statements(importer: Importer,
130 from_date: dt.date,
131 to_date: dt.date,
132 api_key: str,
133 proxy_uri: Optional[str]) -> Dict[str, Any]:
134 import json
135
136 import requests
137
138 now = dt.datetime.utcnow().time()
139 from_time_str = dt.datetime.combine(from_date, dt.datetime.min.time()).strftime(DATE_FORMAT)
140 to_time_str = dt.datetime.combine(to_date, now).strftime(DATE_FORMAT)
141 uri_prefix = f'https://api.transferwise.com/v3/profiles/{importer.profile_id}/' + \
142 f'borderless-accounts/{importer.borderless_account_id}/statement.json' + \
143 f'?intervalStart={from_time_str}&intervalEnd={to_time_str}&type=COMPACT&currency='
144 headers = {
145 'User-Agent': f'Beancount {beancount.__version__} Transferwise importer {__copyright__}',
146 'Authorization': f'Bearer {api_key}',
147 }
148 proxy_dict: Dict[str, str] = {}
149 if proxy_uri:
150 proxy_dict['https'] = proxy_uri
151 statements: Dict[str, Any] = {}
152 for currency in importer.currencies:
153 result = requests.get(uri_prefix + currency, headers=headers, proxies=proxy_dict)
154 if result.status_code != 200:
155 LOG.error(
156 'Fetcing %s statement failed with HTTP status code %d: %s',
157 currency,
158 result.status_code,
159 result.text)
160 else:
161 try:
162 statement = json.loads(result.text)
163 except json.JSONDecodeError as exc:
164 LOG.error('Failed to decode %s statement', currency, exc_info=exc)
165 else:
166 statements[currency] = statement
167 LOG.info('Fetched %s statement', currency)
168 return statements
169
170
171def _print_statements(from_date: dt.date,
172 to_date: dt.date,
173 entries: data.Entries) -> None:
174 from beancount.parser import printer
175 print(f'*** Transferwise from {from_date} to {to_date}', flush=True)
176 printer.print_entries(entries)
177
178
179def _determine_path(dir: str, currency: str, to_date: dt.date) -> str:
180 date_str = to_date.strftime('%Y-%m-%d')
181 simple_path = os.path.join(dir, f'{date_str}.transferwise_{currency}.json')
182 if not os.path.exists(simple_path):
183 return simple_path
184 for i in range(2, 10):
185 path = os.path.join(dir, f'{date_str}.transferwise_{currency}_{i}.json')
186 if not os.path.exists(path):
187 return path
188 raise ValueError(f'Cannot find unused name for {simple_path}')
189
190
191def _archive_statements(documents_path: str,
192 to_date: dt.date,
193 accounts: Accounts,
194 statements: Dict[str, Any]):
195 import json
196
197 from beancount.core.account import sep
198
199 for currency, statement in statements.items():
200 dir = os.path.join(documents_path, *accounts.get_borderless_account(currency).split(sep))
201 os.makedirs(dir, exist_ok=True)
202 path = _determine_path(dir, currency, to_date)
203 with open(path, 'w') as file:
204 json.dump(statement, file, indent=2)
205 LOG.info('Saved %s statement as %s', currency, path)
206
207
208def main():
209 import argparse
210
211 parser = argparse.ArgumentParser()
212 parser.add_argument('--verbose', '-v', action='store_true')
213 parser.add_argument('--from-date', '-f', required=False, type=_parse_date_arg)
214 parser.add_argument('--to-date', '-t', default=dt.date.today(), type=_parse_date_arg)
215 parser.add_argument('--api-key', '-k', required=False, type=str)
216 parser.add_argument('--proxy', '-p', required=False, type=str)
217 parser.add_argument('--archive', '-a', required=False, type=str)
218 parser.add_argument('config', nargs=1)
219 parser.add_argument('ledger', nargs='?')
220 args = parser.parse_args()
221 if args.verbose:
222 log_level = logging.INFO
223 else:
224 log_level = logging.WARN
225 logging.basicConfig(level=log_level)
226 importer = _import_config(args.config[0])
227 from_date, to_date, skip = _get_date_range(args.from_date, args.to_date, args.ledger)
228 if skip:
229 LOG.info('Skipping %s', skip)
230 api_key, proxy_uri = _get_secrets(importer, args.api_key, args.proxy)
231 statements = _fetch_statements(importer, from_date, to_date, api_key, proxy_uri)
232 if args.archive:
233 _archive_statements(args.archive, to_date, importer.accounts, statements)
234 entries = importer.extract_objects(statements.values(), skip)
235 if entries:
236 _print_statements(from_date, to_date, entries)