Data Versioning with DVC on a Realistic Prototyping Dataset
Data versioning is the white rhino of data science. If only versioning huge datasets was as painless as versioning code. At my last company, Sentient Science we used versioned buckets in s3, but their main purpose was to make sure that mistakes could not happen silently -- at least there would be a record of it and we could go back and inspect. Results reproducability was hardly achieved, and it involved sharing a hosted notebook configured by the principal investigator. Enter DVC, which means to enable data versioning through a git-like workflow. In this post, we evaluate DVC in the context of a reasonably sized deep learning use case.
- Discussion
- Objective
- Design of delta compression for large datasets
- Code
- Model Validation
- Conclusions
- Citations
Discussion
"In fact, I'm a huge proponent of designing your code around the data, rather than the other way around, and I think it's one of the reasons git has been fairly successful" - Torvalds, Linus (2006-06-27)
In his 2007 Google talk, Torvalds discussed his horrible experience with existing version control systems (VCSs), and also the way he evaluated the existing VCSs (e.g. cps, svn, bitkeeper, etc..) He explained that every existing VCS failed the following criteria in some way:
- Not distributed, not worth using: distribution means "you do not have 1 central location that keeps track of your data"
- commit changes without disturbing others
- trust your data without trusting everyone. anyone can push
- work without needing an internet connection
- release engineering
- Perform badly, not worth using
- Guarentee that input comes back out guarenteed the same.
- Memory corruption
- Disk corruption https://youtu.be/4XpnKHJAok8
W. Trevor King's questioning of dat
"Why not just use git?" (2014), calling for further evidence that git
is not sufficient in the reasons listed on the dat
docs. Namely King writes: Why are dat commits lighter weight? Or does dat
not have commits at all?:
-
git status
can take minutes or hours - Git only stores full history
What is data, isn't code also data? For us, data is any dataset that is too large to use git on it. I personally have 10 deep learning projects where there are complicated discussions of how to source the data and reproduce it. There are 2 reasons why my projects are tough to reproduce: (1) the data is not easy to get (2) the steps of the process are not easy to reproduce
Objective
- Reproducible data science (Shareability vs Reproducibility)
- commit my data files
- commit my code
- push code and data to remote(s)
- share repo with a friend
- they pull and reproduce
- Trustless collaboration
- branch dataset
- edit dataset
- resolve merge conflicts
- commit and push to remote
- preview changes (diff)
- submit dataset for review
- compare old to new
- are there restrictions / access controls / etc..?
- merge dataset
Should the models data (e.g. weights) be combined with datasets (e.g. large training datasets, low latency streaming observations)? What is the role of a model and model pipeline in the data version control? I think that the criteria used to accept a merge of a new model is so different from accepting changes to a dataset, that they should be considered separate.
2 topics:
- dataset version control
- model version control
Current tools are not data versioning. They are pipeline versioning. Both dvc and pachyderm couple models with data and call it a pipeline.
Design of delta compression for large datasets
The issue is that the diffing at a row level requires non-trivial time. In that time, new pushes can occur. You can represent the files as chunks to speed this up and distribute the compute across nodes. dvc push
- make exact copy on remote(s)
- if a delta compression job is in process, compare it to the last push
- if a delta compression job is not in process, start one
- complete the diff calc job -> save the diffs
- delete dataset copy
Dan@Pach (10 Feb 2021)
Yeah this is one of the big problems with pushing and pulling real data sets to Github. It’s big and it won’t scale because as the data gets bigger and bigger transferring that data becomes quickly untenable. Some background history on Pachyderm:We explored both the external metadata approach (recording data that changed but not enforcing immutability) and using Git/Github instead of building our own Git like system. We quickly discovered the limitations of scale. Without immutability you have nothing. You can record that I added or deleted 1000 JPEGs to a directory to an external server but as soon as that data changes to a state that you can’t replicate the metadata is worthless. Imagine I do 50 training runs on 1000 JPEGs and then I alter them by crunching them down from 1024x768 to 500x500. My metadata server still tells me I ran 50 training runs on the 1000 Jpegs at 1024x768 but now I don’t have those files so that data is useless. I can’t do those training runs again unless I can restore from backup. The copy-on-write filesystem of Pachyderm is essential to doing data version control right. You need to record metadata and take a snapshot of changes at the same time to have real reproducibility.
Using Git is another failure state. It wasn’t designed for lots of data. It was designed for code, aka hand-built logic. Data is not code. Code is usually much smaller and allows people to push and pull and check things out and have local copies. For data, the reason Pachyderm choose to have a centralized data system is to deal with the data transfer problem. Pachyderm can cache files locally but usually doesn’t unless you specifically ask it to because you don’t want to be pushing and pulling massive data sets around, as you are discovering now. You want to work on datasets directly in the cloud but appear to be working locally. This is a big difference in terms of design approach. DVC’s approach is good for toy data sets but it won’t work in the real world with real datasets. You’re just working with 1.3 GB but now imagine you’re working with 100 GB of data and 5 other data scientists. Now you’re all keeping a local 100GB copy of data and pushing and pulling it around and trying to keep it in sync. Awful experience that won’t work no matter how hard any team tries.
I think if you are focusing on all the things we discussed that would be brilliant. What I would absolutely love to see is to show a use case where immutability is not optional. Run a training on a dataset with DVC, than overwrite the files and now try to recreate the model. I’d also like you to focus on the problem of scale and what it would be like to push and pull a big dataset locally, especially with limited laptop space and a distributed team.
%matplotlib inline
from pathlib import Path
import random
import traceback
from bs4 import BeautifulSoup
import numpy as np
from fastai.basics import *
from fastai.vision import models
from fastai.vision.all import *
from fastai.metrics import *
from fastai.data.all import *
from fastai.callback import *
import wandb
from fastai.callback.wandb import *
import ray
from prcvd.tabular import get_tags_in_order, get_attribute
def get_y_fn(fp):
l_str = str(img_to_l[fp])
out = l_str \
.replace('labels', 'labels_int') \
.replace('png', 'tif')
return out
Data Versioning
Using the tool called DVC, we will version the relevant artifacts. These files are too large to version inside git because pushing to the github remote would be expensive. The input dataset is large so it cannot be stored on the root volume of the instance we're running the experiments. The experiment instance has a separate volume configured for this purpose. DVC encapsulates the steps required to source data, so the result of our data versioning implementation will be to enable a new data scientist to reproduce the last training environment commited by simply:
git clone https://github.com/prcvd/blog.git
cd blog
# git checkout whatever-branch
dvc pull
dvc repro
That new data scientist can then use the same dvc
commands dvc add
, dvc commit
and dvc push
to ensure their changes to the dataset are recorded in the git repo for the project.
Key outcomes to explore
-
share the project with simple steps where project now includes:
- dataset
- code
- evaluation
- pipeline files
-
utilize branches to protect the master version of datasets (along with code)
The inputs we want to version are:
- the labels (training.xml)
- the pre-processed data
- label definitions
The outputs we want to version are:
- the trained model
!dvc add --external /ws/data/skin-tone/headsegmentation_dataset_ccncsa/training.xml
!git add training.xml.dvc
!git commit -m 'first commit'
!dvc remote add -d aws s3://mlops-datavc/face-profile
!dvc push
!dvc add --external /ws/data/skin-tone/headsegmentation_dataset_ccncsa/labels
!git add labels.dvc
!git commit -m 'first commit.'
path = Path("/ws/data/skin-tone/headsegmentation_dataset_ccncsa")
xml_file = path/'training.xml'
test_name = "test"
tags_to_track = ['srcimg', 'labelimg']
tags = get_tags_in_order(xml_file=xml_file, tags_to_track=tags_to_track)
srcimg_name = [
get_attribute(elem=srcimg, tag='srcimg', attrib='name')
for srcimg in tags['srcimg']
]
labelimg_name = [
get_attribute(elem=labelimg, tag='labelimg', attrib='name')
for labelimg in tags['labelimg']
]
pairs = []
for i, srcimg in enumerate(srcimg_name):
pairs.append({
'srcimg': srcimg,
'labelimg': labelimg_name[i]
})
fnames = [path/pair['srcimg'] for pair in pairs]
lnames = [path/pair['labelimg'] for pair in pairs]
img_to_l = {fname: lnames[i] for i, fname in enumerate(fnames)}
header = ('R','G','B','L')
mutiny_labels = [
(0,0,0,'Background/undefined'),
(255,0,0,'Lips'),
(0,255,0,'Eyes'),
(0,0,255,'Nose'),
(255,255,0,'Hair'),
(0,255,255,'Ears'),
(255,0,255,'Eyebrows'),
(255,255,255,'Teeth'),
(128,128,128,'General face'),
(255,192,192,'Facial hair'),
(0,128,128,'Specs/sunglasses'),
(255, 128, 128, '')
]
mutiny_labels = pd.DataFrame(mutiny_labels, columns=header)
mutiny_labels['I'] = mutiny_labels.index
label_map = {
(rec['R'], rec['G'], rec['B']): rec['I']
for rec in mutiny_labels.to_dict('records')
}
int_to_label = {
rec['I']: rec['L']
for rec in mutiny_labels.to_dict('records')
}
codes = mutiny_labels.L.values
# codes = np.append(codes, ['Error'])
name2id = {v:k for k,v in enumerate(codes)}
!dvc add --external /ws/data/skin-tone/headsegmentation_dataset_ccncsa/labels
Trying to recreate my actual steps, I decided instead of having 2 separate directories for labels that I would just use the single directory and update the files to match the required format for the dataloader. That was accomplished by doing:
dvc add labels
cd /ws/data/skin-tone/headsegmentation_dataset_ccncsa/
cp -r labels labels-archive
rm -rf labels
cp -r labels_int labels
I noticed that dvc seemed to be pretty sluggish when I tried running a dvc diff
. It ran in two steps:
1) computing new hashes
2) some unknown step that took minutes where my shell was just blank
Forget it, I just pushed. So dvc push
seemed also sluggish, maybe taking 2 or 3x what I think it should from past experience. Here's the timer data from the dvc push
after adding the 1.3GB of labels files.
real 28m58.997s
user 5m29.589s
sys 0m26.556s
Now just to check my memory because I haven't pushed to s3
in a bit, I timed the base s3 cp with aws-cli
:
time aws s3 cp labels s3://mlops-datavc/face-profile-labels-test --recursive
real 28m39.026s
user 3m27.958s
sys 0m21.944s
It also occurs to me that I am pushing from my 100Mbps connection in Indiana to Virginia (us-east-1). So, this makes sense.
Interestingly, the cache created by dvc found in s3://mlops-datavc/face-profile
is highly similar in consistency to the "raw upload" found in s3://mlops-datavc/face-profile-labels-test
, but not exactly the same:
Source
s3://mlops-datavc/face-profile/
Total number of objects
15,074
Total size
1.2 GB
s3://mlops-datavc/face-profile-labels-test
Total number of objects: 15,260
Total size: 1.2 GB
Technically, the dvc cache should contain one additional file labels.xml
which was added to the tracked files, however what we see is that the dvc cache is approximately the same size as the raw cache in s3, but it contains 186 fewer files. We suspect this is because there are duplicate images in the label dataset.
Q: What if I wanted to set up continuous benchmarks on the dvc push
transfer rate for this pipleline?
What is going on with .dvc/cache
?
- how are the folders defined?
- folders: 00,01,..,09,0a,0b,...,0f,10,..,1f,...90,...,9f,f0,...ff,?
- files named by md5 hash (see ETag vs key)
- How do file get allocated to folders?
-
does the cache locally match the remote file structure?
- yes, except dvc removes duplicate files and renames files by md5 hash
-
does dvc keep the correspondence between the filename and the md5 hash? where is that kept?
- sqllite (referred to as the "state db"). see issue 3366 -- waiting for response.
- What if the local file structure changes and I do a
dvc pull
- According to issue 2676, dvc default is to trust the remote hash (replace? waiting for dvc response) and has the option to compute hashes at every turn (paranoid mode, old default). Changing this default decreased pull latency by approximately a factor of 2.
- Compression
- Am I getting any compression natively? what format is it?
- There is no compression that is implemented automatically. waiting for response on whether it's configurable
- Am I getting any delta compression on the data?
- create file; add it to tracker
- compare the cache size of a single value alteration vs cache size of a full file change.
- if delta compression is applied, at which level is it? (block, row, value)
- one cache per project, is it possible to split off caches? Different caches for different pipelines?
- Encapsulating each pipeline so that
dvc pull/push
does not pull unnecessary data - Enabling different settings on different caches files for example, one might want to adopt different conditions for determining whether a file is duplicate (see issue 1676
- Encapsulating each pipeline so that
- What happens when I pull on top of local changes?
Waiting:
Correspondence between md5 hash and filename https://dvc.org/doc/user-guide/project-structure/internal-files#structure-of-the-cache-directory
remote compression https://github.com/iterative/dvc/issues/1239
Transitioning from another approach to versioning datasets:
- versioned buckets
- pachyderm
- versioned databases
Recommendations: 1) interoperate (at least ingress)
- make it super easy to ingest data from everywhere, and that includes history (not just the current state)
- versioned s3 buckets
- versioned sql dbs
- versioned dvc repos
- versioned everything!!
2) interoperate on egress? Would go a long way with the community.
Now I need to add the full training dataset to the version control. This will take a while...
The base images share a root with the label directories, so there are 2 choices:
- move the labels out into their own structure. This requires updating the training.xml file to reflect the new location of the label files (or baking it into the code)
2) adding files to the tracker individually instead of full directory at a time. To do this, we can loop over the srcimg instances in the
training.xml
file.
Since we have a lot of experience adding whole directories, I want to see how adding individual files works at this scale.
We already have the list of srcimg in an object. One limitation of dvc is that it doesn't have a very good python api. So I am going to write a little subprocess
function to add a single file to the dvc tracker.
class VCRemote:
def __init__(self, cloud, shortname, uri, masterbranch='master'):
self.cloud = cloud
self.shortname = uri
self.uri = uri
self.masterbranch = master
# TODO: check if it exists and can be reached
class VCPipeline:
def __init__(self, name, logger):
"""
name: must be unique
"""
self.name = name
self.remotes = {}
self.logger = logger
self.branchname = None
def _make_repo_url(self, ):
pass
def _initialize(self):
"""e.g. git configure; git init"""
# set up credentials - identity
# set up repo
pass
def _add(self, key):
"""e.g. git add <key>"""
pass
def _commit(self, msg):
"""e.g. git commit -m '<msg>'"""
pass
def _push(self,):
"""e.g. git push"""
pass
def _status(self,):
"""e.g. git status"""
pass
def _merge(self, mergebranch):
"""e.g. git merge <mergebranch>"""
pass
def _checkout(self, branchname):
"""git checkout branchname"""
pass
def _create_branch(self, branchname, push=False):
"""e.g. git checkout -b <branchname>"""
# TODO: check if branch exists, if so, just check it out
pass
def _rm_local(self, deletebranch):
"""e.g. git branch -d <deletebranch>"""
pass
def _rm_remote(self, deletebranch):
"""e.g. git push origin --delete <deletebranch>"""
pass
def rm_branch(self, deletebranch, local=True, remote=False)
"""git branch -d <branchname>"""
if deletebranch == self.branchname:
currentbranch=self.branchname
self.switch_branch(
branchname=self.masterbranch, create_on_fail=False
)
if local:
self._rm_local(deletebranch=deletebranch)
if remote:
self._rm_remote(deletebranch=deletebranch)
def switch_branch(self, branchname, create_on_fail=True):
"""User-facing function to change branches."""
try:
self._checkout(branchname=branchname)
except Exception as e:
if create_on_fail:
self.create_branch(branchname=branchname)
else:
raise e
self.branchname = branchname
def update_and_return(self, updatebranch):
currentbranch = self.branchname
self.switch_branch(branchname=mergebranch)
self.pull()
self.switch_branch(branchname=currentbranch)
def merge(self, mergebranch, update_all=True):
"""e.g. git merge <mergebranch>"""
if update_all:
self.update_and_return(updatebranch=mergebranch)
self._merge(mergebranch=mergebranch)
def add_remote(self):
if shortname in self.remotes:
raise Exception(
'Remote shortname, {} already exists'
.format(shortname)
)
self.remotes[shortname] = VCRemote(
cloud=cloud, shortname=shortname, uri=uri
)
class GithubPipeline(VCPipeline):
def __init__(self, name, logger=None):
"""
name: must be unique
"""
import subprocess
super().__init__(name=name, logger=logger)
def _initialize(self, username, password):
"""e.g. git configure; git init"""
# set up credentials - identity, auth
# get or set up repo
# check it works
pass
def _add(self, key):
"""e.g. git add <key>"""
pass
def _commit(self, msg):
"""e.g. git commit -m '<msg>'"""
pass
def _push(self,):
"""e.g. git push"""
pass
def _status(self,):
"""e.g. git status"""
pass
def _merge(self, mergebranch):
"""e.g. git merge <mergebranch>"""
pass
def _checkout(self, branchname):
"""git checkout branchname"""
pass
def _create_branch(self, branchname, push=False):
"""e.g. git checkout -b <branchname>"""
# TODO: check if branch exists, if so, just check it out
pass
def _rm_local(self, deletebranch):
"""e.g. git branch -d <deletebranch>"""
pass
def _rm_remote(self, deletebranch):
"""e.g. git push origin --delete <deletebranch>"""
pass
# Pep374 talking about version control objects in python development
# https://www.python.org/dev/peps/pep-0374/#id1
import os
from pathlib import Path
from git import Repo
import git
def get_or_create_git_repo(path:Path, create_new=False):
try:
repo = Repo(path)
except git.InvalidGitRepositoryError:
if create_new:
repo = Repo(path, bare=True)
else:
return None
return repo
repopath = Path(os.getcwd()).parent
repo = get_or_create_git_repo(path=repopath)
repo_reader = repo.config_reader()
repo_reader.
defaults.use_cuda = True
number_of_the_seed = 2020
random.seed(number_of_the_seed)
set_seed(number_of_the_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
monitor_training="valid_loss"
comp_training=np.less
monitor_evaluating="dice"
comp_evaluating=np.greater
patience=2
size = 448
bs = 12
valid_pct=0.35
dls = SegmentationDataLoaders.from_label_func(
path,
bs=bs,
valid_pct=valid_pct,
fnames=fnames,
label_func=get_y_fn,
codes=codes,
item_tfms=[Resize((size,size),)],
batch_tfms=[Normalize.from_stats(*imagenet_stats)]
)
wandb.init()
learn = unet_learner(dls, resnet34, cbs=WandbCallback())
learn.fine_tune(1)
learn.freeze() # Freezing the backbone
Epoch, Batch Size and Learning Rate
An epoch in model training is the number of forward passes, one per image in the "batch". The batch is a randomly chosen shuffle from the training set. In this case, the forward pass results are accounted for before a backward pass is completed. Additionally, FastAI attempts to load all the images in the batch into GPU memory at the beginning of the epoch, thus there is a certain batch size sufficiently large that will cause the GPU to run out of memory. For us, that was 12 images. For segmentation models on R, G, B encoded images, the labels i.e. segmentation mask could increase the GPU memory required by 1/3 for each batch because there is one additional channel per pixel; 4 instead of 3. That is assuming an 8 bit integer is used to specify the R, G and B channels, as well as the label. This is not necessarily the case because we don't really need an 8 bit integer because we have only about 10 different target values (Skin, Nose, etc..) So we could do with less, but the smallest integer type provided by torch
is 8-bit.13 GPU memory reduction is likely the reason that the PILMask
needs to be a single integer. We can see in this case that if we were to load the mask as it was provided by Mut1ny, as its own R, G, B encoded image, it would roughly double the memory required with an addition 3 8-bit integers representing the R,G and B components of the mask.
The batch size is set by the one training the model, and that defines the number of images that are evaluated per epoch. In this process, we chose the batch size that was maximized given that it didn't run out of memory. The batch size chosen, 12 images, seemed to seldom run itself out of memory, after they were resized to 256 px by 256 px. (There was some additional load on the GPU because it was running some other processes as well, which would spawn themselves unpredictably)
Note: A forward pass consists of a model prediction from input -> output and the backward pass consists of the gradient descent step where model weights and biases are updated as a function of the gradients of the node activation functions, and the learning rate. The most intuitive explanation of this process I have heard is Jeremy Howard's explanation of Stochastic Gradient Descent12
Increasing Loss(Learning Rate)
As I trained the model, I would evaluate the Loss to Learning Rate chart periodically (that is lr_find
). I noticed with interest that the loss was always increasing with the learning rate, even from the first epochs run. One open question is why there are no decreasing segments of the Loss to Learning Rate Chart like we see in the FastAI tutorial6. In that, Jeremy Howard instructs users to look for regions where the Loss is decreasing as a function of Learning Rate, but on this chart it's hard to find that.
Choosing a Learning Rate
Given each epoch took around 25 minutes to complete, I was only able to do limited experimentation with the learning rate.
learn.lr_find() # find learning rate
learn.recorder.plot_loss() # plot learning rate graph
lrs = slice(10e-6, 10e-5)
learn.fit_one_cycle(12, lrs)