import sys; import os
import time
import hashlib
import json
from pathlib import Path
from lib import (
    download_extract_zip,
    save_json,
    load_json,
    find_valid_row
)
import pandas as pd
from twilio.rest import Client
import numpy as np

Effective Exchange Rates Pipeline

Objective

My goal with this notebook is to set up periodic pipelines to source data on effective exchange rates (EER) from the Bank of International Settlements (BIS). With this data, I will look to create an alerts pipeline that updates me on changes in EERs, in particular anomalous movements that occur on a month-over-month, quarter-over-quarter or year-over-year time frame. This anomaly detection will be used as a key indicator for portfolio balancing efforts.

Why Effective Exchange Rates?

Effective exchange rates are sometimes called "trade weighted exchange rates" reflect the relative strength of a currency. Instead of looking at a single currency pair, the effective exchange rate is a weighted average value where the weights are derived from the trade balances between countries. This is desirable because there are a number of factors that can distort the information contained in conventional exchange rates. The purpose of the EER is to isolate the signal contained in the exchange rate quotes, which is directly tied to trade in goods.

For example, the US dollar (USD) is known to be a pass-through currency for between 60-80% of all currency transactions worldwide. That means that the only reason it is being bought is as a conduit to own another currency. For example, if someone owns Japanese Yen (JPY) and wants Argentine Pesos (ARS), the typical way of accomplishing that is to use JPY to buy USD, then use USD to buy ARS. These kinds of transactions limit the temporal exposure the trader has to fluctuations in ARS, and allows the trader to trade in more highly subscribed markets. However, our objective is to understand the currency's behavior in regards to real goods and services, and so it's desirable to control for the financial behavior in the currency.

Pipelines

Daily Nominal

1) download the daily file from BIS (version it?) (csvdiff vs dvc vs pachyderm) 2) update dataset events table (based on daily file, and version history) 2) update statistics table (based on daily file) a) mom changes b) qoq changes

3) publish alerts (based on statistics)

Concerns

a) Live models will be making decisions based on the underlying data. What if BIS changes the historical data set for some reason?

  • Solution: track differences day-to-day and issue alerts if historical data is altered
  • Issues: how can one get visibility into changes without saving the entire historical dataset? 1) File Hashes (won't know which rows/values changed, but will know something changed 2) Row Hashes (won't know which values changed, but will know some row changed)

Monthly Real

1) download the daily file from BIS 2) update statistics

Hashing procedure

Whole File

This means that we use file hashes (MD5) to track the contents of the historical dataset. 1) Download file 2) Look for previous hash record (data_end_date, contents_hash) 3) Load up new file

  • Compute data_end_date of file. Removing rows without the previous data_end_date requires knowledge of and reliance on source stream update frequency, and if the source stream update frequency breaks (skips a day/week/month, or otherwise), it will create a false positive flag on the next step. To eliminate the false positive flag that occurs if the source stream breaks, we will compute the data_end_date along with the hash of contents.
  • Compute hash of file after removing rows > data_end_date. 5) Check new hash == previous hash a) if different,

Pachyderm solution to versioning reer data:

  • Append to file
  • get new file, get the last row, add old file
  • general understanding (feature comparison, matrix)
  • how do things look between dvc and pachyderm
  • images / cv models
  • speech / nlp

Messaging

Twilio

```

class Texter:
    def __init__(self):
        self.is_setup = False
        self.client = None

    
    def setup(self):
        # data = load_json(fp='/home/fortville/code/.secrets/twilio.json')
        
        self._from = data['from']
        self.client = Client(data['account_sid'], data['auth_token'])
        self.is_setup = True
    
    
    def send_msg(self, to, body):
        if not self.is_setup:
            self.setup()
            self.send_msg(to=to, body=body)
        
        message = self.client.messages.create(
            body="-\n"+body,
            from_=self._from,
            to=to
        )

        
class ReerTexter(Texter):
    
    def __init__(self):
        super().__init__()
        self.tos = ["+13178337850"]


    def send_topN(self, names, values, label):
        out = label
        for i, name in enumerate(names):
            out += '\n{}: {:.2%}'.format(name, values[i])
        
        for to in self.tos:
            self.send_msg(to=to, body=out)
    

class TrackedFile:
    """
    TODO: bootstrapping this process

    """
    
    def __init__(self, datadir, source_url):
        self.source_url = source_url
        self.datadir = datadir
        self.cachefp = self.datadir/'daily.csv.gz'
        self.header = [0,1,2,3,4]
        self.sep = ','
        self.quotechar = '"'
        self.compression = 'gzip'
        self.last_hash = PandasLastHash(datadir=self.datadir)
        self.last_hash.read()
        self.curr_hash = PandasLastHash(datadir=self.datadir)
        self.sms = ReerTexter()
        self.curr_df = self.get_latest()
        self.curr_stats = {}
        
        
    def get_latest(self):
        data = tuple(
            download_extract_zip(
                url=self.source_url,
                header=self.header,
                sep=self.sep,
                quotechar=self.quotechar
            )
        )
        src_fn = data[0][0]
        next_df = data[0][1]
        labels = next_df.columns[0]
        next_df.index = pd.to_datetime(next_df[labels])
        di_perf = pd.date_range(
            start=next_df.index.min(), end=next_df.index.max(), freq="D"
        )
        next_df = next_df.reindex(di_perf)
        next_df.index.name = labels[4]
        del next_df[labels]
        new_labels = list(labels)
        new_labels[4] = 'Code summary'
        next_df.columns.rename(new_labels, level=[0,1,2,3,4], inplace=True)
        
        return next_df
        
        
    def update_trackers(self):
        filt_df = next_df[next_df <= self.last_hash.last_data_end]
        self.curr_hash.compute(df=self.curr_df)
        # filter next_df <= last_data_end
        # save filtered next_df as gzip csv
        # compute hash of filtered_df
        # compare last hash to computed hash
        # save new file
        # update last_hash file


    def update_stats(self):
        # compute stats
        # save json 
        pass
    
    
    def send_alerts(self):
        pass


    def read_from_file(self, fname):
        df = pd.read_csv(
            fname, header=[0,1,2,3,4], sep=self.sep, quotechar=self.quotechar
        )
        df = df[1:]
        df.index = pd.to_datetime(df[df.columns[0]])
        df.index.name = 'Time Period'
        del df[df.columns[0]]
        
        df.columns.names = ['Frequency', 'Type', 'Basket', 'Reference area', 'Code summary']
        return df


class PandasLastHash:
    """
    Keeps track of file metadata:
        file hash
        data end date
    
    TODO: enable row hashing"""
    def __init__(self, datadir):
        self.datadir = datadir
        self.fp = self.datadir/'last_hash.json'
        self.hash = None
        self.date = None
        
        
    def __repr__(self):
        return str(self.__class__) + ": " + str(self.__dict__)
        
    
    def compute(self, df, compression):
        fn = 'tmpdf.csv.gz'
        df.to_csv(self.datadir/fn, compression=compression)
        self.hash = self._compute_md5(fname=self.datadir/fn)
        os.remove(self.datadir/fn)
        self.last_data_end = self.set_date(df=df)
    
    
    def set_date(self, df):
        return df.iloc[-1].name.isoformat()
    
    
    def _compute_md5(self, fname):
        hash_md5 = hashlib.md5()
        with open(fname, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)

        return hash_md5.hexdigest()
    
    
    def write(self):
        save_json(
            fp=self.fp, 
            obj={
                'hash':self.hash, 
                'date':self.date
            }
        )
            
            
    def read(self):        
        data = load_json(fp=self.fp)
        self.hash = data['hash']
        self.date = data['date']


class Stats:
    
    def __init__(self, datadir, ds_hash):
        self.values = {
            'ds_meta': ds_hash,
            'topN': None
        }
        self.datadir = datadir
        self.fp = self.datadir/'_'.join([ds_hash.date, ds_hash.hash])
        
    
    def save(self):
        save_json(fp=self.fp, obj=self.values)
    
    
    def load(self, date):
        self.values = load_json(fp=self.fp)
        
    
    def parse_topN(self, kind):
        topN_list = self.values['topN'][kind]
        names = np.array([name.split(':')[1] for name in topN_list.index.get_level_values(3)])
        values = topN_list.values
        return names, values
    
    
    def get_label(self, kind):
        indices = self.values['topN'][kind].index[0]        
        tmp = []
        for index in indices:
            tmp.append(index.split(':')[1])
        tmp[2] = tmp[2].split(' ')[0]
        a1 = '-'.join(tmp[:3])
        return "On {} the t-{}d on [{}]" \
            .format(
                self.values['topN']['close'].isoformat().split('T')[0], 
                self.values['topN']['period_days'], 
                a1
        )
    
    
    def topN(self, df, breadth, period, N):
        if breadth == 'broad':
            cols = [col for col in df.columns if 'B:Broad (60 economies)' in col]
        
        elif breadth == 'narrow':
            cols = [col for col in df.columns if 'B:Broad (60 economies)' not in col]
            
        filt_df = df[cols]
        
        past_obs = find_valid_row(df=filt_df, tminus_days=period)
        diffs = filt_df.iloc[-1] /  past_obs - 1
        self.values['topN'] = {
            'apps': diffs.sort_values(ascending=False)[:N],
            'deps': diffs.sort_values()[:N],
            'period_days': period,
            'start': past_obs.name,
            'close': filt_df.iloc[-1].name
        }  
    

url = 'https://www.bis.org/statistics/full_webstats_eer_d_dataflow_csv_row.zip'
datadir = Path('/ws/forks/reer')/'artifacts'
fname = datadir/'daily-old.csv.gz'
tf = TrackedFile(datadir=datadir, source_url=url)
df = tf.read_from_file(fname=fname)
tf.last_hash.compute(df=df, compression='gzip')
tf.curr_hash.compute(df=tf.curr_df, compression='gzip')
s = Stats(datadir=datadir, ds_hash=tf.last_hash)
sms = ReerTexter()
for period in [90]:
    s.topN(df=df, breadth='broad', period=period, N=3)
    label = s.get_label(kind='apps')
    anames, avalues = s.parse_topN(kind='apps')
    dnames, dvalues = s.parse_topN(kind='deps')
    sms.send_topN(names=anames, values=avalues, label=label+' appreciations')
    sms.send_topN(names=dnames, values=dvalues, label=label+' depreciations')