aboutsummaryrefslogblamecommitdiffstats
path: root/beancount_extras_kris7t/importers/transferwise/client.py
blob: 5198122379a3d4eb01646c449707166900503791 (plain) (tree)


























                                                                                        

                                                        





































                                                                                                 
                                     












                                                                            
                                                 










































                                                                                                 
                                     

















                                                                                                
                                                                                        
               
                                                                                             


























































































                                                                                                 
'''
Importer for Transferwise API transaction history from the command line.
'''
__copyright__ = 'Copyright (c) 2020  Kristóf Marussy <kristof@marussy.com>'
__license__ = 'GNU GPLv2'

import datetime as dt
import logging
import os
from typing import Any, Dict, Optional, Tuple, Set

import beancount
from beancount.core import data

from beancount_extras_kris7t.importers.transferwise.transferwise_json import Accounts, \
        DATE_FORMAT, Importer

LOG = logging.getLogger('importers.transferwise.client')


def _parse_date_arg(date_str: str) -> dt.date:
    return dt.datetime.strptime(date_str, '%Y-%m-%d').date()


def _import_config(config_path: str) -> Importer:
    import runpy

    config = runpy.run_path(config_path)  # type: ignore
    importer = config['TRANSFERWISE_CONFIG']
    if isinstance(importer, Importer):
        LOG.info('Loaded configuration from %s', config_path)
        return importer
    else:
        raise ValueError(f'Invalid configuration: {config_path}')


def _get_reference(transaction: data.Transaction) -> Optional[str]:
    for link in transaction.links:
        if link.startswith('transferwise_'):
            return link[13:]
    return None


def _get_last_transaction_date(ledger_path: str, skip_references: Set[str]) -> Optional[dt.date]:
    from beancount.parser import parser

    LOG.info('Checking %s for already imported transactions', ledger_path)
    entries, _, _ = parser.parse_file(ledger_path)
    date: Optional[dt.date] = None
    skip: Set[str] = set()
    for entry in entries:
        if isinstance(entry, data.Transaction):
            reference = _get_reference(entry)
            if not reference:
                continue
            if date is None or date < entry.date:
                date = entry.date
                skip.clear()
            if date == entry.date:
                skip.add(reference)
    skip_references.update(skip)
    return date


def _get_date_range(from_date: Optional[dt.date],
                    to_date: dt.date,
                    ledger_path: Optional[str]) -> Tuple[dt.date, dt.date, Set[str]]:
    skip_references: Set[str] = set()
    if not from_date and ledger_path:
        from_date = _get_last_transaction_date(ledger_path, skip_references)
    if not from_date:
        from_date = to_date - dt.timedelta(days=365)
    LOG.info('Fetching transactions from %s to %s', from_date, to_date)
    return from_date, to_date, skip_references


def _get_secrets(importer: Importer,
                 api_key: Optional[str],
                 proxy_uri: Optional[str]) -> Tuple[str, Optional[str]]:
    import urllib.parse

    uri_parts: Optional[urllib.parse.SplitResult]
    if proxy_uri:
        uri_parts = urllib.parse.urlsplit(proxy_uri)
    else:
        uri_parts = None
    if api_key and (not uri_parts or not uri_parts.username or uri_parts.password):
        return api_key, proxy_uri

    from contextlib import closing

    import secretstorage

    with closing(secretstorage.dbus_init()) as connection:
        collection = secretstorage.get_default_collection(connection)
        if not api_key:
            items = collection.search_items({
                'profile_id': str(importer.profile_id),
                'borderless_account_id': str(importer.borderless_account_id),
                'xdg:schema': 'com.marussy.beancount.importer.TransferwiseAPIKey',
            })
            item = next(items, None)
            if not item:
                raise ValueError('No API key found in SecretService')
            LOG.info('Found API key secret "%s" from SecretService', item.get_label())
            api_key = item.get_secret().decode('utf-8')
        if uri_parts and uri_parts.username and not uri_parts.password:
            host = uri_parts.hostname or uri_parts.netloc
            items = collection.search_items({
                'host': host,
                'port': str(uri_parts.port or 1080),
                'user': uri_parts.username,
                'xdg:schema': 'org.freedesktop.Secret.Generic',
            })
            item = next(items, None)
            if item:
                LOG.info('Found proxy password secret "%s" from SecretService', item.get_label())
                password = urllib.parse.quote_from_bytes(item.get_secret())
                uri = f'{uri_parts.scheme}://{uri_parts.username}:{password}@{host}'
                if uri_parts.port:
                    proxy_uri = f'{uri}:{uri_parts.port}'
                else:
                    proxy_uri = uri
            else:
                LOG.info('No proxy password secret was found in SecretService')
    assert api_key  # Make mypy happy
    return api_key, proxy_uri


def _fetch_statements(importer: Importer,
                      from_date: dt.date,
                      to_date: dt.date,
                      api_key: str,
                      proxy_uri: Optional[str]) -> Dict[str, Any]:
    import json

    import requests

    now = dt.datetime.utcnow().time()
    from_time_str = dt.datetime.combine(from_date, dt.datetime.min.time()).strftime(DATE_FORMAT)
    to_time_str = dt.datetime.combine(to_date, now).strftime(DATE_FORMAT)
    uri_prefix = f'https://api.transferwise.com/v3/profiles/{importer.profile_id}/' + \
        f'borderless-accounts/{importer.borderless_account_id}/statement.json' + \
        f'?intervalStart={from_time_str}&intervalEnd={to_time_str}&type=COMPACT&currency='
    beancount_version = beancount.__version__  # type: ignore # noqa: unused-type-ignore
    headers = {
        'User-Agent': f'Beancount {beancount_version} Transferwise importer {__copyright__}',
        'Authorization': f'Bearer {api_key}',
    }
    proxy_dict: Dict[str, str] = {}
    if proxy_uri:
        proxy_dict['https'] = proxy_uri
    statements: Dict[str, Any] = {}
    for currency in importer.currencies:
        result = requests.get(uri_prefix + currency, headers=headers, proxies=proxy_dict)
        if result.status_code != 200:
            LOG.error(
                'Fetcing %s statement failed with HTTP status code %d: %s',
                currency,
                result.status_code,
                result.text)
        else:
            try:
                statement = json.loads(result.text)
            except json.JSONDecodeError as exc:
                LOG.error('Failed to decode %s statement', currency, exc_info=exc)
            else:
                statements[currency] = statement
                LOG.info('Fetched %s statement', currency)
    return statements


def _print_statements(from_date: dt.date,
                      to_date: dt.date,
                      entries: data.Entries) -> None:
    from beancount.parser import printer
    print(f'*** Transferwise from {from_date} to {to_date}', flush=True)
    printer.print_entries(entries)


def _determine_path(dir: str, currency: str, to_date: dt.date) -> str:
    date_str = to_date.strftime('%Y-%m-%d')
    simple_path = os.path.join(dir, f'{date_str}.transferwise_{currency}.json')
    if not os.path.exists(simple_path):
        return simple_path
    for i in range(2, 10):
        path = os.path.join(dir, f'{date_str}.transferwise_{currency}_{i}.json')
        if not os.path.exists(path):
            return path
    raise ValueError(f'Cannot find unused name for {simple_path}')


def _archive_statements(documents_path: str,
                        to_date: dt.date,
                        accounts: Accounts,
                        statements: Dict[str, Any]):
    import json

    from beancount.core.account import sep

    for currency, statement in statements.items():
        dir = os.path.join(documents_path, *accounts.get_borderless_account(currency).split(sep))
        os.makedirs(dir, exist_ok=True)
        path = _determine_path(dir, currency, to_date)
        with open(path, 'w') as file:
            json.dump(statement, file, indent=2)
        LOG.info('Saved %s statement as %s', currency, path)


def main():
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('--verbose', '-v', action='store_true')
    parser.add_argument('--from-date', '-f', required=False, type=_parse_date_arg)
    parser.add_argument('--to-date', '-t', default=dt.date.today(), type=_parse_date_arg)
    parser.add_argument('--api-key', '-k', required=False, type=str)
    parser.add_argument('--proxy', '-p', required=False, type=str)
    parser.add_argument('--archive', '-a', required=False, type=str)
    parser.add_argument('config', nargs=1)
    parser.add_argument('ledger', nargs='?')
    args = parser.parse_args()
    if args.verbose:
        log_level = logging.INFO
    else:
        log_level = logging.WARN
    logging.basicConfig(level=log_level)
    importer = _import_config(args.config[0])
    from_date, to_date, skip = _get_date_range(args.from_date, args.to_date, args.ledger)
    if skip:
        LOG.info('Skipping %s', skip)
    api_key, proxy_uri = _get_secrets(importer, args.api_key, args.proxy)
    statements = _fetch_statements(importer, from_date, to_date, api_key, proxy_uri)
    if args.archive:
        _archive_statements(args.archive, to_date, importer.accounts, statements)
    entries = importer.extract_objects(statements.values(), skip)
    if entries:
        _print_statements(from_date, to_date, entries)