''' Importer for Transferwise API transaction history from the command line. ''' __copyright__ = 'Copyright (c) 2020 Kristóf Marussy ' __license__ = 'GNU GPLv2' import datetime as dt import logging import os from typing import Any, Dict, Optional, Tuple, Set import beancount from beancount.core import data from beancount_extras_kris7t.importers.transferwise.transferwise_json import Accounts, \ DATE_FORMAT, Importer LOG = logging.getLogger('importers.transferwise.client') def _parse_date_arg(date_str: str) -> dt.date: return dt.datetime.strptime(date_str, '%Y-%m-%d').date() def _import_config(config_path: str) -> Importer: import runpy config = runpy.run_path(config_path) # type: ignore importer = config['TRANSFERWISE_CONFIG'] if isinstance(importer, Importer): LOG.info('Loaded configuration from %s', config_path) return importer else: raise ValueError(f'Invalid configuration: {config_path}') def _get_reference(transaction: data.Transaction) -> Optional[str]: for link in transaction.links: if link.startswith('transferwise_'): return link[13:] return None def _get_last_transaction_date(ledger_path: str, skip_references: Set[str]) -> Optional[dt.date]: from beancount.parser import parser LOG.info('Checking %s for already imported transactions', ledger_path) entries, _, _ = parser.parse_file(ledger_path) date: Optional[dt.date] = None skip: Set[str] = set() for entry in entries: if isinstance(entry, data.Transaction): reference = _get_reference(entry) if not reference: continue if date is None or date < entry.date: date = entry.date skip.clear() if date == entry.date: skip.add(reference) skip_references.update(skip) return date def _get_date_range(from_date: Optional[dt.date], to_date: dt.date, ledger_path: Optional[str]) -> Tuple[dt.date, dt.date, Set[str]]: skip_references: Set[str] = set() if not from_date and ledger_path: from_date = _get_last_transaction_date(ledger_path, skip_references) if not from_date: from_date = to_date - dt.timedelta(days=365) LOG.info('Fetching transactions from %s to %s', from_date, to_date) return from_date, to_date, skip_references def _get_secrets(importer: Importer, api_key: Optional[str], proxy_uri: Optional[str]) -> Tuple[str, Optional[str]]: import urllib.parse uri_parts: Optional[urllib.parse.SplitResult] if proxy_uri: uri_parts = urllib.parse.urlsplit(proxy_uri) else: uri_parts = None if api_key and (not uri_parts or not uri_parts.username or uri_parts.password): return api_key, proxy_uri from contextlib import closing import secretstorage with closing(secretstorage.dbus_init()) as connection: collection = secretstorage.get_default_collection(connection) if not api_key: items = collection.search_items({ 'profile_id': str(importer.profile_id), 'borderless_account_id': str(importer.borderless_account_id), 'xdg:schema': 'com.marussy.beancount.importer.TransferwiseAPIKey', }) item = next(items, None) if not item: raise ValueError('No API key found in SecretService') LOG.info('Found API key secret "%s" from SecretService', item.get_label()) api_key = item.get_secret().decode('utf-8') if uri_parts and uri_parts.username and not uri_parts.password: host = uri_parts.hostname or uri_parts.netloc items = collection.search_items({ 'host': host, 'port': str(uri_parts.port or 1080), 'user': uri_parts.username, 'xdg:schema': 'org.freedesktop.Secret.Generic', }) item = next(items, None) if item: LOG.info('Found proxy password secret "%s" from SecretService', item.get_label()) password = urllib.parse.quote_from_bytes(item.get_secret()) uri = f'{uri_parts.scheme}://{uri_parts.username}:{password}@{host}' if uri_parts.port: proxy_uri = f'{uri}:{uri_parts.port}' else: proxy_uri = uri else: LOG.info('No proxy password secret was found in SecretService') assert api_key # Make mypy happy return api_key, proxy_uri def _fetch_statements(importer: Importer, from_date: dt.date, to_date: dt.date, api_key: str, proxy_uri: Optional[str]) -> Dict[str, Any]: import json import requests now = dt.datetime.utcnow().time() from_time_str = dt.datetime.combine(from_date, dt.datetime.min.time()).strftime(DATE_FORMAT) to_time_str = dt.datetime.combine(to_date, now).strftime(DATE_FORMAT) uri_prefix = f'https://api.transferwise.com/v3/profiles/{importer.profile_id}/' + \ f'borderless-accounts/{importer.borderless_account_id}/statement.json' + \ f'?intervalStart={from_time_str}&intervalEnd={to_time_str}&type=COMPACT¤cy=' beancount_version = beancount.__version__ # type: ignore # noqa: unused-type-ignore headers = { 'User-Agent': f'Beancount {beancount_version} Transferwise importer {__copyright__}', 'Authorization': f'Bearer {api_key}', } proxy_dict: Dict[str, str] = {} if proxy_uri: proxy_dict['https'] = proxy_uri statements: Dict[str, Any] = {} for currency in importer.currencies: result = requests.get(uri_prefix + currency, headers=headers, proxies=proxy_dict) if result.status_code != 200: LOG.error( 'Fetcing %s statement failed with HTTP status code %d: %s', currency, result.status_code, result.text) else: try: statement = json.loads(result.text) except json.JSONDecodeError as exc: LOG.error('Failed to decode %s statement', currency, exc_info=exc) else: statements[currency] = statement LOG.info('Fetched %s statement', currency) return statements def _print_statements(from_date: dt.date, to_date: dt.date, entries: data.Entries) -> None: from beancount.parser import printer print(f'*** Transferwise from {from_date} to {to_date}', flush=True) printer.print_entries(entries) def _determine_path(dir: str, currency: str, to_date: dt.date) -> str: date_str = to_date.strftime('%Y-%m-%d') simple_path = os.path.join(dir, f'{date_str}.transferwise_{currency}.json') if not os.path.exists(simple_path): return simple_path for i in range(2, 10): path = os.path.join(dir, f'{date_str}.transferwise_{currency}_{i}.json') if not os.path.exists(path): return path raise ValueError(f'Cannot find unused name for {simple_path}') def _archive_statements(documents_path: str, to_date: dt.date, accounts: Accounts, statements: Dict[str, Any]): import json from beancount.core.account import sep for currency, statement in statements.items(): dir = os.path.join(documents_path, *accounts.get_borderless_account(currency).split(sep)) os.makedirs(dir, exist_ok=True) path = _determine_path(dir, currency, to_date) with open(path, 'w') as file: json.dump(statement, file, indent=2) LOG.info('Saved %s statement as %s', currency, path) def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument('--verbose', '-v', action='store_true') parser.add_argument('--from-date', '-f', required=False, type=_parse_date_arg) parser.add_argument('--to-date', '-t', default=dt.date.today(), type=_parse_date_arg) parser.add_argument('--api-key', '-k', required=False, type=str) parser.add_argument('--proxy', '-p', required=False, type=str) parser.add_argument('--archive', '-a', required=False, type=str) parser.add_argument('config', nargs=1) parser.add_argument('ledger', nargs='?') args = parser.parse_args() if args.verbose: log_level = logging.INFO else: log_level = logging.WARN logging.basicConfig(level=log_level) importer = _import_config(args.config[0]) from_date, to_date, skip = _get_date_range(args.from_date, args.to_date, args.ledger) if skip: LOG.info('Skipping %s', skip) api_key, proxy_uri = _get_secrets(importer, args.api_key, args.proxy) statements = _fetch_statements(importer, from_date, to_date, api_key, proxy_uri) if args.archive: _archive_statements(args.archive, to_date, importer.accounts, statements) entries = importer.extract_objects(statements.values(), skip) if entries: _print_statements(from_date, to_date, entries)