Numerical coroutines

Python 3.5 has new syntax for coroutines. Like many of python3's new features (unicode strings? who uses non-English languages in astronomy?), it is not immediately obvious that coroutines are of any use to people like me who mostly write relatively simple imperative scripts to wrangle large amounts of data or long computations. But cores aren't getting any faster, so we're going to have to start writing parallel code if we're going to take advantage of new machines. Now, if you want to take a gigantic Fourier transform, or do algebra on enormous matrices, you are going to need a witch design you a parallel algorithm. But our code is often full of easy opportunities for parallelism that we don't take advantage of because it's awkward. So I keep my eye out for ways to take advantage of this easy parallelism. I have found one, in the combination of coroutines and futures, and I'd like to describe it with a worked example. Maybe it'll sell you on some of these cool new language features!

The problem I'm going to work through is numerical quadrature: you have a function, f, provided by the user, and you want to calculate the area under f between a and b. The assumption is that f is expensive to calculate, so that most of the time is spent evaluating f; I want to parallelize that. The algorithm I'm going to use is adaptive: it computes an estimate of the integral and an estimate of the error, and if that error is too large it subdivides the interval and runs itself on the two sub-intervals. This is a very common structure to quadrature algorithms; they mostly vary in how the basic estimates are computed. I'm going to use the very simple adaptive Simpson's rule (and an implementation based on the one in Wikipedia); you'd get better results with Gauss-Kronrod or Clenshaw-Curtis integration.

There is some scope for parallelism within the basic estimate: if you evaluate the function at a collection of points then combine the values with weights to get an estimate of the integral, then you don't care what order the function evaluations happen in, so they might as well be done in parallel. This is concurrency, that is, pieces of our program that are independent enough of each other that the order in which they are executed does not affect the result. In an ideal world, we would have a language that allowed us to express concurrency, and the runtime would decide where parallelism could speed things up. Unfortunately, the only construct I can think of in python for expressing concurrency is numpy arithmetic: if you write a+b to add two arrays, you are saying you don't care about the order in which the additions happen. And indeed, they may get parallelized using SIMD instructions on the processor. So it's helpful for langauges to be able to express concurrency.

Unfortunately, adaptive Simpson's rule only evaluates the function at five points in each interval, and three of them were evaluated previously, so you don't get much concurrency for free. But when you subdivide the interval, the two sub-interval calculations are independent of each other. (Leaving aside the issue of global error control.) So there is some concurrency possible there. Unfortunately, it's not as simple as sending out a vector of calculations and waiting for them all to come back; there's a recursive algorithm you'd have to rearrange drastically to make this work. But python's coroutines provide a reasonable way to do this.

A coroutine is essentially a function (a routine) that can yield control and be resumed later. If this sounds like python's generators, well, yes, python's coroutines are implemented on top of generators. But there's also an element of cooperative multitasking: coroutines can yield control when they're waiting for something and resume it when the something is ready. In our case, the something they're waiting for is a function evaluation, which we will ship off to a pool of parallel processes to actually run. This way if one recursive branch is waiting for some function evaluations to come back, the other branches can keep going.

Using coroutines in python is still a little awkward; they are supported through the asyncio module (in python 3.4), and the new syntax is available only in python 3.5. Nevertheless they are a useful tool for numerical computing, and the code isn't hard to read:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import concurrent.futures
import asyncio
import time
import numpy as np

@asyncio.coroutine
def pmap(f, xs, loop=None):
    fs = [asyncio.async(f(*x), loop=loop) for x in xs]
    yield from asyncio.wait(fs)
    return [f.result() for f in fs]



class Integrator:
    def __init__(self, f, cores=None):
        self.f = f
        self.loop = asyncio.get_event_loop()
        self.pool = concurrent.futures.ProcessPoolExecutor(cores)
        # This cache stores coroutines (partially or fully completed
        # calculations of f(x)). If you want a value, wait for the
        # coroutine to finish (it may have already) and then take
        # its result.
        self._f_cache = {}

    def integrate_interval(self, a, b, atol=1e-10):
        return self.loop.run_until_complete(
            self._integrate_interval(a, b, atol))

    @asyncio.coroutine
    def _f(self, x):
        if x not in self._f_cache:
            self._f_cache[x] = self.loop.run_in_executor(self.pool, self.f, x)
        return (yield from asyncio.wait_for(self._f_cache[x],
                                            timeout=None, loop=self.loop))

    @asyncio.coroutine
    def _simpson(self, a, b):
        c = (a+b)/2
        h3 = np.abs(b-a)/6
        fa, fb, fc = yield from pmap(self._f, [(a,), (b,), (c,)])
        return h3*(fa+4*fc+fb)

    @asyncio.coroutine
    def _integrate_interval(self, a, b, atol):
        c = (a+b)/2
        sl, sa, sr = yield from pmap(self._simpson, [(a,c), (a,b), (c,b)])
        if np.abs(sl+sr-sa)<=15*atol:
            return sl + sr + (sl+sr-sa)/15
        else:
            rl, rr = yield from pmap(self._integrate_interval, [(a,c,atol/2),
                                                                (c,b,atol/2)])
            return rl+rr


if __name__=='__main__':
    def f(x):
        time.sleep(1)
        print(x)
        return np.sin(x)

    now = time.time()
    I = Integrator(f, cores=64)
    r = I.integrate_interval(0, np.pi)
    print(r,len(I._f_cache),time.time()-now)

This example program is of course not a full-fledged robust integrator, but with 64 "cores" it is able to achieve a speed-up of about 40 for the (simple) test problem - quite respectable, really, and the algorithm can be written in a quite natural way. You can do the same thing, more or less, using threads: drop all the coroutine annotations and the yield froms, and implement pmap with concurrent.futures.Future objects instead. This works on older pythons, and it doesn't require extra syntax. But you then have to worry about access to shared objects like the cache: could two jobs be spawned to compute the same point? So you need locking. With coroutines you know they will not be interrupted except at a yield from. There is also a headache with concurrent.futures: because the futures are executed in a finite number of threads, if all the working threads are blocked waiting for futures that aren't running, you can deadlock. So you need an unlimited number of threads, which concurrent.futures does not appear to provide.

No comments: