Periodic Monitoring of Effective Exchange Rates using Github Actions
Github actions is a platform commonly used to implement CI/CD pipelines on code stored and get hub.The flexible configuration file for get hub actions enables other use cases as well. This notebook shows how one may use get hub actions for General Cron job execution. In this blog we write a scraper which scrapes the website of the bank of international settlements to obtain information about effective exchange rates. We use Twilio to send messages to subscribers regarding the weekly changes in effective exchange rates.
import sys; import os
import time
import hashlib
import json
from pathlib import Path
from lib import (
download_extract_zip,
save_json,
load_json,
find_valid_row
)
import pandas as pd
from twilio.rest import Client
import numpy as np
Effective Exchange Rates Pipeline
Objective
My goal with this notebook is to set up periodic pipelines to source data on effective exchange rates (EER) from the Bank of International Settlements (BIS). With this data, I will look to create an alerts pipeline that updates me on changes in EERs, in particular anomalous movements that occur on a month-over-month, quarter-over-quarter or year-over-year time frame. This anomaly detection will be used as a key indicator for portfolio balancing efforts.
Why Effective Exchange Rates?
Effective exchange rates are sometimes called "trade weighted exchange rates" reflect the relative strength of a currency. Instead of looking at a single currency pair, the effective exchange rate is a weighted average value where the weights are derived from the trade balances between countries. This is desirable because there are a number of factors that can distort the information contained in conventional exchange rates. The purpose of the EER is to isolate the signal contained in the exchange rate quotes, which is directly tied to trade in goods.
For example, the US dollar (USD) is known to be a pass-through currency for between 60-80% of all currency transactions worldwide. That means that the only reason it is being bought is as a conduit to own another currency. For example, if someone owns Japanese Yen (JPY) and wants Argentine Pesos (ARS), the typical way of accomplishing that is to use JPY to buy USD, then use USD to buy ARS. These kinds of transactions limit the temporal exposure the trader has to fluctuations in ARS, and allows the trader to trade in more highly subscribed markets. However, our objective is to understand the currency's behavior in regards to real goods and services, and so it's desirable to control for the financial behavior in the currency.
Pipelines
Daily Nominal
1) download the daily file from BIS (version it?) (csvdiff vs dvc vs pachyderm) 2) update dataset events table (based on daily file, and version history) 2) update statistics table (based on daily file) a) mom changes b) qoq changes
3) publish alerts (based on statistics)
Concerns
a) Live models will be making decisions based on the underlying data. What if BIS changes the historical data set for some reason?
- Solution: track differences day-to-day and issue alerts if historical data is altered
- Issues: how can one get visibility into changes without saving the entire historical dataset? 1) File Hashes (won't know which rows/values changed, but will know something changed 2) Row Hashes (won't know which values changed, but will know some row changed)
Monthly Real
1) download the daily file from BIS 2) update statistics
Hashing procedure
Whole File
This means that we use file hashes (MD5) to track the contents of the historical dataset. 1) Download file 2) Look for previous hash record (data_end_date, contents_hash) 3) Load up new file
- Compute data_end_date of file. Removing rows without the previous data_end_date requires knowledge of and reliance on source stream update frequency, and if the source stream update frequency breaks (skips a day/week/month, or otherwise), it will create a false positive flag on the next step. To eliminate the false positive flag that occurs if the source stream breaks, we will compute the data_end_date along with the hash of contents.
- Compute hash of file after removing rows > data_end_date. 5) Check new hash == previous hash a) if different,
Pachyderm solution to versioning reer data:
- Append to file
- get new file, get the last row, add old file
- general understanding (feature comparison, matrix)
- how do things look between dvc and pachyderm
- images / cv models
- speech / nlp
Messaging
Twilio
- Twilio pricing $.0075 per message #### Installation ``` # https://www.twilio.com/docs/sms/quickstart/python#install-the-twilio-cli curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash
```
class Texter:
def __init__(self):
self.is_setup = False
self.client = None
def setup(self):
# data = load_json(fp='/home/fortville/code/.secrets/twilio.json')
self._from = data['from']
self.client = Client(data['account_sid'], data['auth_token'])
self.is_setup = True
def send_msg(self, to, body):
if not self.is_setup:
self.setup()
self.send_msg(to=to, body=body)
message = self.client.messages.create(
body="-\n"+body,
from_=self._from,
to=to
)
class ReerTexter(Texter):
def __init__(self):
super().__init__()
self.tos = ["+13178337850"]
def send_topN(self, names, values, label):
out = label
for i, name in enumerate(names):
out += '\n{}: {:.2%}'.format(name, values[i])
for to in self.tos:
self.send_msg(to=to, body=out)
class TrackedFile:
"""
TODO: bootstrapping this process
"""
def __init__(self, datadir, source_url):
self.source_url = source_url
self.datadir = datadir
self.cachefp = self.datadir/'daily.csv.gz'
self.header = [0,1,2,3,4]
self.sep = ','
self.quotechar = '"'
self.compression = 'gzip'
self.last_hash = PandasLastHash(datadir=self.datadir)
self.last_hash.read()
self.curr_hash = PandasLastHash(datadir=self.datadir)
self.sms = ReerTexter()
self.curr_df = self.get_latest()
self.curr_stats = {}
def get_latest(self):
data = tuple(
download_extract_zip(
url=self.source_url,
header=self.header,
sep=self.sep,
quotechar=self.quotechar
)
)
src_fn = data[0][0]
next_df = data[0][1]
labels = next_df.columns[0]
next_df.index = pd.to_datetime(next_df[labels])
di_perf = pd.date_range(
start=next_df.index.min(), end=next_df.index.max(), freq="D"
)
next_df = next_df.reindex(di_perf)
next_df.index.name = labels[4]
del next_df[labels]
new_labels = list(labels)
new_labels[4] = 'Code summary'
next_df.columns.rename(new_labels, level=[0,1,2,3,4], inplace=True)
return next_df
def update_trackers(self):
filt_df = next_df[next_df <= self.last_hash.last_data_end]
self.curr_hash.compute(df=self.curr_df)
# filter next_df <= last_data_end
# save filtered next_df as gzip csv
# compute hash of filtered_df
# compare last hash to computed hash
# save new file
# update last_hash file
def update_stats(self):
# compute stats
# save json
pass
def send_alerts(self):
pass
def read_from_file(self, fname):
df = pd.read_csv(
fname, header=[0,1,2,3,4], sep=self.sep, quotechar=self.quotechar
)
df = df[1:]
df.index = pd.to_datetime(df[df.columns[0]])
df.index.name = 'Time Period'
del df[df.columns[0]]
df.columns.names = ['Frequency', 'Type', 'Basket', 'Reference area', 'Code summary']
return df
class PandasLastHash:
"""
Keeps track of file metadata:
file hash
data end date
TODO: enable row hashing"""
def __init__(self, datadir):
self.datadir = datadir
self.fp = self.datadir/'last_hash.json'
self.hash = None
self.date = None
def __repr__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def compute(self, df, compression):
fn = 'tmpdf.csv.gz'
df.to_csv(self.datadir/fn, compression=compression)
self.hash = self._compute_md5(fname=self.datadir/fn)
os.remove(self.datadir/fn)
self.last_data_end = self.set_date(df=df)
def set_date(self, df):
return df.iloc[-1].name.isoformat()
def _compute_md5(self, fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def write(self):
save_json(
fp=self.fp,
obj={
'hash':self.hash,
'date':self.date
}
)
def read(self):
data = load_json(fp=self.fp)
self.hash = data['hash']
self.date = data['date']
class Stats:
def __init__(self, datadir, ds_hash):
self.values = {
'ds_meta': ds_hash,
'topN': None
}
self.datadir = datadir
self.fp = self.datadir/'_'.join([ds_hash.date, ds_hash.hash])
def save(self):
save_json(fp=self.fp, obj=self.values)
def load(self, date):
self.values = load_json(fp=self.fp)
def parse_topN(self, kind):
topN_list = self.values['topN'][kind]
names = np.array([name.split(':')[1] for name in topN_list.index.get_level_values(3)])
values = topN_list.values
return names, values
def get_label(self, kind):
indices = self.values['topN'][kind].index[0]
tmp = []
for index in indices:
tmp.append(index.split(':')[1])
tmp[2] = tmp[2].split(' ')[0]
a1 = '-'.join(tmp[:3])
return "On {} the t-{}d on [{}]" \
.format(
self.values['topN']['close'].isoformat().split('T')[0],
self.values['topN']['period_days'],
a1
)
def topN(self, df, breadth, period, N):
if breadth == 'broad':
cols = [col for col in df.columns if 'B:Broad (60 economies)' in col]
elif breadth == 'narrow':
cols = [col for col in df.columns if 'B:Broad (60 economies)' not in col]
filt_df = df[cols]
past_obs = find_valid_row(df=filt_df, tminus_days=period)
diffs = filt_df.iloc[-1] / past_obs - 1
self.values['topN'] = {
'apps': diffs.sort_values(ascending=False)[:N],
'deps': diffs.sort_values()[:N],
'period_days': period,
'start': past_obs.name,
'close': filt_df.iloc[-1].name
}
url = 'https://www.bis.org/statistics/full_webstats_eer_d_dataflow_csv_row.zip'
datadir = Path('/ws/forks/reer')/'artifacts'
fname = datadir/'daily-old.csv.gz'
tf = TrackedFile(datadir=datadir, source_url=url)
df = tf.read_from_file(fname=fname)
tf.last_hash.compute(df=df, compression='gzip')
tf.curr_hash.compute(df=tf.curr_df, compression='gzip')
s = Stats(datadir=datadir, ds_hash=tf.last_hash)
sms = ReerTexter()
for period in [90]:
s.topN(df=df, breadth='broad', period=period, N=3)
label = s.get_label(kind='apps')
anames, avalues = s.parse_topN(kind='apps')
dnames, dvalues = s.parse_topN(kind='deps')
sms.send_topN(names=anames, values=avalues, label=label+' appreciations')
sms.send_topN(names=dnames, values=dvalues, label=label+' depreciations')