28 March 2006

Asynchronous APIs

I have a server program where there are a dozen or so different objects called service objects. At run time, each of these gets initialized and may have its own thread(s) and other resources. To shut down the server cleanly, I have to shut down these services:

    for svc in activeServiceObjects:
        svc.shutdown()
    exit(0)

(Actually my program is C++, but it would work the same way in Python.)

It would be faster if I could shut them down all at the same time:

    for obj in activeServiceObjects:
        obj.startShutdown()

    waitForAllToFinish(activeServiceObjects)
    exit(0)

Unfortunately these objects have no startShutdown() method. There's just shutdown(). But I noticed that internally, quite a few of the shutdown() methods are actually doing something like this:

    def shutdown(self):
        self.workerThread.setStatus(SHUTDOWN_REQUESTED)
        self.workerThread.join()

The actual work to shut down the object happens in a separate thread. But the method blocks, because that's how shutdown() is defined to behave.

Let's stop here. I think code often evolves this way: an API gets designed; people realize the API can take a long time to run; someone decides to implement an asynchronous (that is, nonblocking) version of the API.

There is no standard way to signify “this API has both blocking and nonblocking versions”, so the nonblocking version shows up as a separate method, often several of them. And if you want to call the asynchronous version, you need to look it up in the docs and figure out how it works. Wouldn't it be nice if you could just specify when you call a method, “nonblocking call, please”, and the compiler would figure out what to do with that? And when you implement a function, wouldn't it be nice if you could write either the synchronous or the asynchronous version, and have the language autogenerate the other one if needed? This would reduce API clutter and mental overhead. It would make asynchronous functionality easier to use and more readily available.

It turns out both these things are actually pretty easy to do. The first relevant idea is the future. Here is an idea whose time has come. A future is simply an object that represents the result of an asynchronously executing call. Futures provide the standard API for asynchronous versions of APIs. The asynchronous version should always work just like the synchronous version, except that it returns a future.

(By the way, Java 1.5 has a standard interface for futures: java.util.concurrent.Future.)

Futures are easy to implement in a lot of languages. But the rest of it (allowing the user to call a function either synchronously or asynchronously on a whim; and allowing the programmer to implement only one version and have the other autogenerated) is an exercise in metaprogramming. Python is particularly well-suited to that sort of thing.

Below: a complete implementation. The syntax for a normal blocking function call is just the usual fn(arg1, arg2, ...). The syntax for an asynchronous call is async(fn, arg1, arg2, ...). I think this syntax is fairly nice. I would happily write async(my_socket.read, 100) if it worked.

The async() function handles the case where the function is synchronous but needs to be called asynchronously. What about the opposite case: a function is implemented in asynchronously but needs to be called synchronously? This one is just as easy. The function just needs to be marked with the @asyncToSync decorator, like this:

    @asyncToSync
    def shutdown(self):
        self.workerThread.setStatus(SHUTDOWN_REQUESTED)
        return self.workerThread.getFuture()

I'm still shaking my head over how cool Python is. Here's the code.

""" futures.py - Futures and asynchronous function calls for Python. """

import threading

class Future:
    """ A future is a proxy for the result of a function call that
    may not have finished running yet.

    When you call async(), it returns a future representing the
        result of the function you're calling asynchronously.  You can
    call the get() method to block until the call finishes.  When
    the call does finish, get() returns its return value.  If the
    call fails with an exception, get() re-throws that exception.
    """

    def __init__(self):
        self._done_event = threading.Event()
        self._exc_info = None
        self._value = None

    def is_set(self):
        """ Returns true if this future has completed.
        If this is true, get() will not block.
        """
        return self._done_event.isSet()

    def get(self):
        """ Get the result of this future.

        If neither set_value() nor set_error() has been called, this
        blocks until one of them is called.

        This returns a value if set_value() was called;
        it throws an exception if set_error() was called.
        """
        self._done_event.wait()
        if self._exc_info is not None:
            t, v, tb = self._exc_info
            raise t, v, tb
        else:
            return self._value

    def set_value(self, v):
        if self._value is not None:
            raise ValueError("This future already has a value.")
        if self._exc_info is not None:
            raise ValueError("This future has already failed and can't be set.")
        self._value = v
        self._done_event.set()

    def set_error(self, exc_info):
        if self._value is not None:
            raise ValueError("This future already has a value.")
        if self._exc_info is not None:
            raise ValueError("This future has already failed.")
        self._exc_info = exc
        self._done_event.set()


def async(fn, *args, **kwargs):
    """ Call the function fn in a separate thread.
    Return a future that will receive the result of the call.
    """

    if hasattr(fn, '__async__'):
        return fn.__async__(*args, **kwargs)

    result = Future()

    def async_processing():
        try:
            rv = fn(*args, **kwargs)
        except:
            result.set_error(sys.exc_info())
        else:
            result.set_value(rv)

    threading.Thread(target=async_processing).start()
    return result


class AsyncAwareFunction:
    """ A function that has an efficient asynchronous implementation.

    This is a wrapper for a function that works asynchronously.

    The wrapper has a __call__ method that lets the user call the function
    synchronously--that is, the Future.get() call on the result is
    automatically added.  But async() can be used to get the underlying
    asynchronous behavior.
    """

    def __init__(self, fn):
        """ fn should be a function that returns a future. """
        self.fn = fn
        self.__name__ = fn.__name__
        self.__doc__ = fn.__doc__

        fn.__name__ += '_async'

    def __call__(self, *args, **kwargs):
        """ Call the function and wait for it to complete. """
        return self.fn(*args, **kwargs).get()

    def __async__(self, *args, **kwargs):
        """ Call the function and don't wait for it to complete.
        This should return a future.
        """
        return self.fn(*args, **kwargs)

def asyncToSync(f):
    """ Decorator.  Use this on functions implemented asynchronously.
    This wraps them to run synchronously unless called via async().
    """
    return AsyncAwareFunction(f)

No comments: