How to best perform Multiprocessing within requests with the python Tornado server?

Rocketman picture Rocketman · Mar 13, 2013 · Viewed 27.6k times · Source

I am using the I/O non-blocking python server Tornado. I have a class of GET requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.

I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).

So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.

I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.

My question is two-fold: How can this current approach be simplified? What pitfalls potentially exist with it?

The Approach

  1. Utilize Tornado's builtin asynchronous decorator which allows a request to stay open and for the ioloop to continue.

  2. Spawn a separate process for "heavy lifting" tasks using python's multiprocessing module. I first attempted to use the threading module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears that mutliprocessing would also take advantage of multicores.

  3. Start a 'watcher' thread in the main ioloop process using the threading module who's job it is to watch a multiprocessing.Queue for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.

  4. Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with time.sleep(0) calls so that other requests continue to get readily processed.

  5. When there is a result in the queue then add a callback from the "watcher" thread using tornado.ioloop.IOLoop.instance().add_callback() which is documented to be the only safe way to call ioloop instances from other threads.

  6. Be sure to then call finish() in the callback to complete the request and hand over a reply.

Below is some sample code showing this approach. multi_tornado.py is the server implementing the above outline and call_multi.py is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET requests followed by 20 fast GET requests. The results are shown for both running with and without the threading turned on.

In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.

In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.

I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:

Is there a simpler way to accomplish this? What monster's may lurk within this approach?

(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)

Sample Code

multi_tornado.py (sample server):

import time
import threading
import multiprocessing
import math

from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop


# run in some other process - put result in q
def heavy_lifting(q):
    t0 = time.time()
    for k in range(2000):
        math.factorial(k)

    t = time.time()
    q.put(t - t0)  # report time to compute in queue


class FastHandler(RequestHandler):
    def get(self):
        res = 'fast result ' + self.get_argument('id')
        print res
        self.write(res)
        self.flush()


class MultiThreadedHandler(RequestHandler):
    # Note:  This handler can be called with threaded = True or False
    def initialize(self, threaded=True):
        self._threaded = threaded
        self._q = multiprocessing.Queue()

    def start_process(self, worker, callback):
        # method to start process and watcher thread
        self._callback = callback

        if self._threaded:
            # launch process
            multiprocessing.Process(target=worker, args=(self._q,)).start()

            # start watching for process to finish
            threading.Thread(target=self._watcher).start()

        else:
            # threaded = False just call directly and block
            worker(self._q)
            self._watcher()

    def _watcher(self):
        # watches the queue for process result
        while self._q.empty():
            time.sleep(0)  # relinquish control if not ready

        # put callback back into the ioloop so we can finish request
        response = self._q.get(False)
        IOLoop.instance().add_callback(lambda: self._callback(response))


class SlowHandler(MultiThreadedHandler):
    @asynchronous
    def get(self):
        # start a thread to watch for
        self.start_process(heavy_lifting, self._on_response)

    def _on_response(self, delta):
        _id = self.get_argument('id')
        res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
        print res
        self.write(res)
        self.flush()
        self.finish()   # be sure to finish request


application = Application([
    (r"/fast", FastHandler),
    (r"/slow", SlowHandler, dict(threaded=False)),
    (r"/slow_threaded", SlowHandler, dict(threaded=True)),
])


if __name__ == "__main__":
    application.listen(8888)
    IOLoop.instance().start()

call_multi.py (client tester):

import sys
from tornado.ioloop import IOLoop
from tornado import httpclient


def run(slow):
    def show_response(res):
        print res.body

    # make 3 "slow" requests on server
    requests = []
    for k in xrange(3):
        uri = 'http://localhost:8888/{}?id={}'
        requests.append(uri.format(slow, str(k + 1)))

    # followed by 20 "fast" requests
    for k in xrange(20):
        uri = 'http://localhost:8888/fast?id={}'
        requests.append(uri.format(k + 1))

    # show results as they return
    http_client = httpclient.AsyncHTTPClient()

    print 'Scheduling Get Requests:'
    print '------------------------'
    for req in requests:
        print req
        http_client.fetch(req, show_response)

    # execute requests on server
    print '\nStart sending requests....'
    IOLoop.instance().start()

if __name__ == '__main__':
    scenario = sys.argv[1]

    if scenario == 'slow' or scenario == 'slow_threaded':
        run(scenario)

Test Results

By running python call_multi.py slow (the blocking behavior):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20

By running python call_multi.py slow_threaded (the desired behavior):

Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20

Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s

Answer

dano picture dano · Aug 8, 2014

If you're willing to use concurrent.futures.ProcessPoolExecutor instead of multiprocessing, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future, so they'll play nicely together out of the box. concurrent.futures is included in Python 3.2+, and has been backported to Python 2.x.

Here's an example:

import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen

def f(a, b, c, blah=None):
    print "got %s %s %s and %s" % (a, b, c, blah)
    time.sleep(5)
    return "hey there"

@gen.coroutine
def test_it():
    pool = ProcessPoolExecutor(max_workers=1)
    fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future
    print("running it asynchronously")
    ret = yield fut
    print("it returned %s" % ret)
    pool.shutdown()

IOLoop.instance().run_sync(test_it)

Output:

running it asynchronously
got 1 2 3 and ok
it returned hey there

ProcessPoolExecutor has a more limited API than multiprocessing.Pool, but if you don't need the more advanced features of multiprocessing.Pool, it's worth using because the integration is so much simpler.