Hogwild!? Implementing Async SGD in Python

Hogwild! is asynchronous stochastic gradient descent algorithm. The Hogwild! approach utilizes “lock-free” gradient updates. For a machine learning model, this means that the weights of a model are updated by multiple processes at the same time with the possibility of overwriting each other. In this post, we will use the multiprocessing library to implement Hogwild! in Python for training a linear regression model.

In order to not bog this post down with excessive details, I’m going to assume that the reader is familiar with the concepts of (stochastic) gradient descent and what asynchronous programming looks like.

Hogwild! Explained

Stochastic gradient descent is an iterative algorithm. If one wanted to speed it up, they would have to move the iterative calculation to a faster processor (or say, a GPU). Hogwild! is an approach to parallelize SGD in order to scale SGD to quickly train on even larger data sets. The basic algorithm of Hogwild! is deceptively simple. The only pre-req is that all processors on the machine can read and update the weights of your algorithm (i.e. of your neural network or linear regression) at the same time. In short, the weights are stored in shared memory with atomic component-wise addition. The individual processors’ SGD update code is described below in adapted from the original paper:

loop
    sample an example (X_i, y_i) from your training set
    evaluate the gradient descent step using (X_i, y_i)
     perform the update step component-wise
end loop

So, we run this code on multiple processors updating the same model weights at the same time. You can see where the name Hogwild! comes from– the algorithm is like watching a bunch of farm animals bucking around at random, stepping on each other’s toes.

There are a few conditions for this algorithm to work, but for our purposes, one way to phrase a (usually) sufficient condition is that the gradient updates are sparse, meaning that there are only a few non-zero elements of the gradient estimate used in SGD. This is likely to occur when your data itself being sparse. A sparse update means that it’s unlikely for the different processors to step on each other’s toes, i.e. trying to access the same weight-component to update at the same time. However, collisions do occur, but this has been observed to act as a form of regularization in practice.

As an exercise, we will implement linear regression using Hogwild! for training. I have prepared a git repo with the full implemetation adhering the sklearn’s API, and I will use pieces of that code here.

Linear Regression Refresher

To make sure we’re all on the same page, here’s the terminology I will use in the subsequent sections. Our linear regression model will take as input and output a real number via

\begin{equation} f(x) = w \cdot x, \end{equation} where is our vector of weights for our model. We will use squared error as our loss function, which for a single example is written as \begin{equation} \ell(x,y) = ( f(x)-y )^2 = ( w\cdot x - y)^2. \end{equation}

To train our model via SGD, we need to calculate the gradient update step so that we may update the weights via: \begin{equation} w_{t+1} = w_t - \lambda G_w (x,y), \end{equation} where is the learning rate and is an estimate of satisfying: \begin{equation} \mathbb{E}[G_w (x,y)] = \nabla_w \ell(x,y). \end{equation}

In particular,

\begin{equation} G_w (x,y):= -2\, (w\cdot x - y)\, x \in \mathbb{R}^n \end{equation}

Generating our Training Data

First, we quickly generate our training data. We will assume our data was generated by a function of the same form as our model, namely a dot product between a weight vector and our input vector . This true weight vector I will call the “real ”.

import scipy.sparse

n=10 # number of features
m=20000 # number of training examples

X = scipy.sparse.random(m,n, density=.2).toarray() # Guarantees sparse grad updates
real_w = np.random.uniform(0,1,size=(n,1)) # Define our true weight vector

X = X/X.max() # Normalizing for training

y = np.dot(X,real_w)

High Level Approach

The multiprocessing library allows you to spin up additional processes to run code in parallel. We will use the multiprocessing.Pool’s map function to calculate and apply the gradient update step asynchronously. The key components of the algorithm is that the weight vector is accessible to all processes at the same time and can be accessed without any locking.

Lock-free Shared Memory in Python

First, we need to define the weight vector in shared memory that can be accessed without locks. We will have to use the “sharedctypes.Array” class from multiprocessing for this functionality. We will further use numpy’s frombuffer function to make it accessible from a numpy array.

from multiprocessing.sharedctypes import Array
from ctypes import c_double
import numpy as np

coef_shared = Array(c_double, 
        (np.random.normal(size=(n,1)) * 1./np.sqrt(n)).flat,
        lock=False) # Hogwild!
w = np.frombuffer(coef_shared)
w = w.reshape((n,1)) 

For the final parallelization code, we will use multiprocessing.Pool to map our gradient update out to several workers. We will need to expose this vector to the workers so this will work.

Gradient Update

Our ultimate goal is to perform the gradient update in parallel. To do this, we need to define what this update is. In the git repo, I took a much more sophisticated approach to expose our weight vector to multiple processes. To avoid over-complicating this explanation with Python technicalities, I’m going to just use a global variable to take advantage of how multiprocessing.Pool.map works. The map function copies over the entire namespace of the function to a new process, and as I need to expose a single to all workers, this will be sufficient.

# The calculation has been adjusted to allow for mini-batches
learning_rate = .001
def mse_gradient_step(X_y_tuple):
    global w # Only for instructive purposes!
    X, y = X_y_tuple # Required for how multiprocessing.Pool.map works
    
    # Calculate the gradient
    err = y.reshape((len(y),1))-np.dot(X,w)
    grad = -2.*np.dot(np.transpose(X),err)/ X.shape[0]

    # Update the nonzero weights one at a time
    for index in np.where(abs(grad) > .01)[0]:
        coef_shared[index] -= learning_rate*grad[index,0]

Preparing the examples for multiprocessing.Pool

We are going to have to cut the training examples into tuples, one per example, to pass to our workers. We will also be reshaping them to adhere to the way the gradient step is written above. Code is included to allow you to use mini-batches via the batch_size variable.

batch_size=1
examples=[None]*int(X.shape[0]/float(batch_size))
for k in range(int(X.shape[0]/float(batch_size))):
    Xx = X[k*batch_size : (k+1)*batch_size,:].reshape((batch_size,X.shape[1]))
    yy = y[k*batch_size : (k+1)*batch_size].reshape((batch_size,1))
    examples[k] = (Xx, yy) 

The Asynchronous Bit

Parallel programming can be very difficult. low-level code, but the multiprocessing library has abstracted the details away from us. This makes this final piece of code underwhelming as a big reveal. Anyway, after the definitions above, the final code for Hogwild! is:

from multiprocessing import Pool

# Training with Hogwild!
p = Pool(5)  
p.map(mse_gradient_step, examples)

print('Loss function on the training set:', np.mean(abs(y-np.dot(X,w))))
print('Difference from the real weight vector:', abs(real_w-w).sum())
Loss function on the training set: 0.0014203406038
Difference from the real weight vector: 0.0234173242317

Importance of Asynchronous Methods

As you probably know, deep learning is pretty popular right now. The neural networks employed to solve deep learning problems are trained via stochastic gradient descent. Deep learning implies large data sets (or, additionally some type of interactive environment in the case of reinforcement learning). As the scale increases, training neural networks becomes slower and more cumbersome. So, parallelizing the training of neural networks is a big deal.

There are other approaches to parallelizing SGD. Google has popularized Downpour SGD among others. Downpour SGD basically keeps the weights on a central hub where several model instances on different pieces of the data send updates and retrieve refreshed rates. This idea has been extended in the case of reinforcement learning and seen to provide surprising improvements in performance and a decrease in training time. In particular, the A3C model uses asynchronous agents playing multiple game instances who publish gradient updates to a central hub, along with a few other tweaks. This has been seen to alleviate some of the time correlation issues found in reinforcement learning that causes divergence, which was previously tackled via experience replay.

The point is asynchronous methods are here to stay, and who knows what new advances may come from them in the next few years. At present, there are not many mathematical proof pertaining to the accuracy increases seen for asynchronous methods. Indeed, many of the approaches are not theoretically justified, but their adoption has been driven by impressive practical results. For mathematicians, it will be interesting to see what new mathematics arise from trying to explain these methods formally.

Written on November 18, 2017