python3的concurrent.futures有一些便利,可以在一个with语句块里多进程。 python2可以自己摸索一下multiprocessing 偷懒的话可以参考我下面的代码(从我自己写函数库里扒出来的), 实现的是一个比较通用的进程池链(把几个进程池串联起来)。 后面附一个使用示例。 要睡觉没时间慢慢解释了,看得懂就用,看不懂就算了... 文件 parallel.py # -*- coding: utf-8 -*- # # """ Helper functions and templates for multiprocessing. http://docs.python.org/2/library/multiprocessing.html#examples http://hg.python.org/cpython/file/3.3/Lib/concurrent/futures/pr http://www.ibm.com/developerworks/aix/library/au-threadingpytho """ __all__ = [ 'ProcessPoolsChain', ] import os, sys, time from multiprocessing import Manager, Process, Pool, Pipe, Queue def worker (func, inque, outque): """ Worker process. """ for index, item in iter(inque.get, '__STOP__'): # the 2nd form of iter: # the callable is called until it returns the sentinel output = func(item) outque.put((index, output)) class ProcessPoolsChain (object): """ Asynchronous worker-process pools chained by queues. """ def __init__ (self, *funcs_workers): """ Constructor for ProcessPoolsChain. e.g. ProcessPoolsChain( (func1, num1), (func2, num2) ) means num1 workers for func1, num2 for func2. """ self.numpools = len(funcs_workers) self.numworkerslist = [] self.queues = [Queue() for _ in xrange(self.numpools+1)] for i, (func, numworkers) in enumerate(funcs_workers): self.numworkerslist.append(numworkers) for _ in xrange(numworkers): Process(target=worker, args=( func, self.queues[i], self.queues[i+1] )).start() def feed (self, inputs): """ Feeds a sequence of data into the (first) input queue, and obtains a sequence of outputs that match the input order. """ numinputs = len(inputs) outputs = [None for _ in xrange(numinputs)] for index, item in enumerate(inputs): self.queues[0].put((index, item)) # get results finished = set() while len(finished) < len(outputs): index, result = self.queues[-1].get() # blocked outputs[index] = result finished.add(index) return outputs def stop (self): """ Stops the whole pools chain. """ for i in xrange(self.numpools): numworkers = self.numworkerslist[i] for j in xrange(numworkers): self.queues[i].put('__STOP__') 示例 #!/usr/bin/env python # -*- coding: utf-8 -*- # import sys import time import urllib2 from BeautifulSoup import BeautifulSoup # The worker functions must be placed on the module level, because # the multiprocessing API have to import this module def urlchunk (host): url = urllib2.urlopen(host) chunk = url.read() return chunk def datamine (chunk): soup = BeautifulSoup(chunk) return soup.findAll(['title']) # print '__name__==', __name__ # '__parents_main__' if imported by multiprocessing if __name__ == '__main__': # IMPORTANT NOTICE: the construction of ProcessPoolsChain must not carry # '__main__' as __name__, because the multiprocessing API will import # this module, with __name__=='__parents_main__'. # Putting this code on the module level would spawn infinite # number of processes!! import parallel ppc = parallel.ProcessPoolsChain( (urlchunk, 4), (datamine, 2) ) t0 = time.time() urls = [ "http://google.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com", "http://www.mit.edu", "http://www.cs.umn.edu", ] results = ppc.feed(urls) ppc.stop() for i, res in enumerate(results): print i, res print('Done. Time elapsed: %.2f.' % (time.time() - t0)) # test 1 single pool ppc1 = parallel.ProcessPoolsChain( (urlchunk, 4) ) results1 = ppc1.feed(urls) ppc1.stop() for i, chunk in enumerate(results1): print i, len(chunk)