'''
Importer for Transferwise API transaction history from the command line.
'''
__copyright__ = 'Copyright (c) 2020 Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'
import datetime as dt
import logging
import os
from typing import Any, Dict, Optional, Tuple, Set
import beancount
from beancount.core import data
from beancount_extras_kris7t.importers.transferwise.transferwise_json import Accounts, \
DATE_FORMAT, Importer
LOG = logging.getLogger('importers.transferwise.client')
def _parse_date_arg(date_str: str) -> dt.date:
return dt.datetime.strptime(date_str, '%Y-%m-%d').date()
def _import_config(config_path: str) -> Importer:
import runpy
config = runpy.run_path(config_path) # type: ignore
importer = config['TRANSFERWISE_CONFIG']
if isinstance(importer, Importer):
LOG.info('Loaded configuration from %s', config_path)
return importer
else:
raise ValueError(f'Invalid configuration: {config_path}')
def _get_reference(transaction: data.Transaction) -> Optional[str]:
for link in transaction.links:
if link.startswith('transferwise_'):
return link[13:]
return None
def _get_last_transaction_date(ledger_path: str, skip_references: Set[str]) -> Optional[dt.date]:
from beancount.parser import parser
LOG.info('Checking %s for already imported transactions', ledger_path)
entries, _, _ = parser.parse_file(ledger_path)
date: Optional[dt.date] = None
skip: Set[str] = set()
for entry in entries:
if isinstance(entry, data.Transaction):
reference = _get_reference(entry)
if not reference:
continue
if date is None or date < entry.date:
date = entry.date
skip.clear()
if date == entry.date:
skip.add(reference)
skip_references.update(skip)
return date
def _get_date_range(from_date: Optional[dt.date],
to_date: dt.date,
ledger_path: Optional[str]) -> Tuple[dt.date, dt.date, Set[str]]:
skip_references: Set[str] = set()
if not from_date and ledger_path:
from_date = _get_last_transaction_date(ledger_path, skip_references)
if not from_date:
from_date = to_date - dt.timedelta(days=365)
LOG.info('Fetching transactions from %s to %s', from_date, to_date)
return from_date, to_date, skip_references
def _get_secrets(importer: Importer,
api_key: Optional[str],
proxy_uri: Optional[str]) -> Tuple[str, Optional[str]]:
import urllib.parse
uri_parts: Optional[urllib.parse.SplitResult]
if proxy_uri:
uri_parts = urllib.parse.urlsplit(proxy_uri)
else:
uri_parts = None
if api_key and (not uri_parts or not uri_parts.username or uri_parts.password):
return api_key, proxy_uri
from contextlib import closing
import secretstorage
with closing(secretstorage.dbus_init()) as connection:
collection = secretstorage.get_default_collection(connection)
if not api_key:
items = collection.search_items({
'profile_id': str(importer.profile_id),
'borderless_account_id': str(importer.borderless_account_id),
'xdg:schema': 'com.marussy.beancount.importer.TransferwiseAPIKey',
})
item = next(items, None)
if not item:
raise ValueError('No API key found in SecretService')
LOG.info('Found API key secret "%s" from SecretService', item.get_label())
api_key = item.get_secret().decode('utf-8')
if uri_parts and uri_parts.username and not uri_parts.password:
host = uri_parts.hostname or uri_parts.netloc
items = collection.search_items({
'host': host,
'port': str(uri_parts.port or 1080),
'user': uri_parts.username,
'xdg:schema': 'org.freedesktop.Secret.Generic',
})
item = next(items, None)
if item:
LOG.info('Found proxy password secret "%s" from SecretService', item.get_label())
password = urllib.parse.quote_from_bytes(item.get_secret())
uri = f'{uri_parts.scheme}://{uri_parts.username}:{password}@{host}'
if uri_parts.port:
proxy_uri = f'{uri}:{uri_parts.port}'
else:
proxy_uri = uri
else:
LOG.info('No proxy password secret was found in SecretService')
assert api_key # Make mypy happy
return api_key, proxy_uri
def _fetch_statements(importer: Importer,
from_date: dt.date,
to_date: dt.date,
api_key: str,
proxy_uri: Optional[str]) -> Dict[str, Any]:
import json
import requests
now = dt.datetime.utcnow().time()
from_time_str = dt.datetime.combine(from_date, dt.datetime.min.time()).strftime(DATE_FORMAT)
to_time_str = dt.datetime.combine(to_date, now).strftime(DATE_FORMAT)
uri_prefix = f'https://api.transferwise.com/v3/profiles/{importer.profile_id}/' + \
f'borderless-accounts/{importer.borderless_account_id}/statement.json' + \
f'?intervalStart={from_time_str}&intervalEnd={to_time_str}&type=COMPACT¤cy='
beancount_version = beancount.__version__ # type: ignore # noqa: unused-type-ignore
headers = {
'User-Agent': f'Beancount {beancount_version} Transferwise importer {__copyright__}',
'Authorization': f'Bearer {api_key}',
}
proxy_dict: Dict[str, str] = {}
if proxy_uri:
proxy_dict['https'] = proxy_uri
statements: Dict[str, Any] = {}
for currency in importer.currencies:
result = requests.get(uri_prefix + currency, headers=headers, proxies=proxy_dict)
if result.status_code != 200:
LOG.error(
'Fetcing %s statement failed with HTTP status code %d: %s',
currency,
result.status_code,
result.text)
else:
try:
statement = json.loads(result.text)
except json.JSONDecodeError as exc:
LOG.error('Failed to decode %s statement', currency, exc_info=exc)
else:
statements[currency] = statement
LOG.info('Fetched %s statement', currency)
return statements
def _print_statements(from_date: dt.date,
to_date: dt.date,
entries: data.Entries) -> None:
from beancount.parser import printer
print(f'*** Transferwise from {from_date} to {to_date}', flush=True)
printer.print_entries(entries)
def _determine_path(dir: str, currency: str, to_date: dt.date) -> str:
date_str = to_date.strftime('%Y-%m-%d')
simple_path = os.path.join(dir, f'{date_str}.transferwise_{currency}.json')
if not os.path.exists(simple_path):
return simple_path
for i in range(2, 10):
path = os.path.join(dir, f'{date_str}.transferwise_{currency}_{i}.json')
if not os.path.exists(path):
return path
raise ValueError(f'Cannot find unused name for {simple_path}')
def _archive_statements(documents_path: str,
to_date: dt.date,
accounts: Accounts,
statements: Dict[str, Any]):
import json
from beancount.core.account import sep
for currency, statement in statements.items():
dir = os.path.join(documents_path, *accounts.get_borderless_account(currency).split(sep))
os.makedirs(dir, exist_ok=True)
path = _determine_path(dir, currency, to_date)
with open(path, 'w') as file:
json.dump(statement, file, indent=2)
LOG.info('Saved %s statement as %s', currency, path)
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--from-date', '-f', required=False, type=_parse_date_arg)
parser.add_argument('--to-date', '-t', default=dt.date.today(), type=_parse_date_arg)
parser.add_argument('--api-key', '-k', required=False, type=str)
parser.add_argument('--proxy', '-p', required=False, type=str)
parser.add_argument('--archive', '-a', required=False, type=str)
parser.add_argument('config', nargs=1)
parser.add_argument('ledger', nargs='?')
args = parser.parse_args()
if args.verbose:
log_level = logging.INFO
else:
log_level = logging.WARN
logging.basicConfig(level=log_level)
importer = _import_config(args.config[0])
from_date, to_date, skip = _get_date_range(args.from_date, args.to_date, args.ledger)
if skip:
LOG.info('Skipping %s', skip)
api_key, proxy_uri = _get_secrets(importer, args.api_key, args.proxy)
statements = _fetch_statements(importer, from_date, to_date, api_key, proxy_uri)
if args.archive:
_archive_statements(args.archive, to_date, importer.accounts, statements)
entries = importer.extract_objects(statements.values(), skip)
if entries:
_print_statements(from_date, to_date, entries)