Kesinの知見置き場

知見を共有していきたいじゃないですか

Python multiprocessing

前回Pythonでの並列化についてちょっと書いているうちに興味が湧いてきたのでもう少し掘り下げてみます。
Python2.6から標準モジュールになったmultiprocessingのリファレンスから並列に実行させるクラスについて色々試してみました。プロセッサ間の連携や共有メモリは使うアテがないので後回し。
Pool.mapの()の使い勝手がいい感じ。

プログラム

#coding: utf-8
from multiprocessing import Process, Pool
import multiprocessing
import time
import os

def print_time():
    tm = time.localtime(time.time())
    return time.strftime("%M:%S", tm)

def sum_print(a, b, c):
    time.sleep(1)
    print "pid: %d, sum: %d, time: %s " % (os.getpid(), a+b+c, print_time())
    #return "pid: %d, sum: %d, time: %s " % (os.getpid(), a+b+c, print_time())
    
def sum(a, b, c):
    time.sleep(1)
    return "pid: %d, sum: %d, time: %s " % (os.getpid(), a+b+c, print_time())

def print_square(x):
    time.sleep(1)
    print "pid: %d, square: %d, time: %s" % (os.getpid(), x*x, print_time())

def square(x):
    time.sleep(1)
    return "pid: %d, square: %d, time: %s" % (os.getpid(), x*x, print_time())

if __name__ == '__main__':
    print "CPU Core: " + str(multiprocessing.cpu_count())
    print "Process:"
    sum_p = Process(target=sum_print, args=(2, 3, 4))
    square_p = Process(target=print_square, args=(5,))
    sum_p.start()
    square_p.start()
    sum_p.join()
    square_p.join()
    
    pool = Pool()
    print "Pool: apply()"
    for i in xrange(3, 6):
        list = [i-1, i, i+1]
        #applyはAsyncResultを返すまでブロックする
        pool.apply(sum_print, list)
        
    print "Pool: apply_async()"
    result_list = []
    for i in xrange(3, 6):
        list = [i-1, i, i+1]
        #apply_asyncは非同期でAsyncResultを返す
        result_list.append(pool.apply_async(sum, list))
    for result in result_list:
        print result.get()
        
    print "Pool: map()"
    list = range(1, 5)
    pool.map(print_square, list)
    print "map()が終わるまでブロックされる"
    
    print "Pool: map_async()"
    result_list = pool.map_async(square, list)
    print "map_async()はブロックされない"
    print "result_list: Ready? " + str(result_list.ready())
    #get(None)は結果を受け取れるまでブロックする
    print result_list.get(None)
    
    print "Pool: imap()"
    #関数を呼び出すイテレーターを作成する
    it = pool.imap(print_square, list)
    #関数を呼び出す
    it.next()
    time.sleep(1)
    it.next()
    time.sleep(1)
    
    print "Pool: map() chunksize=3"
    #chunksizeを指定するとまとめて一つのプロセスに割り当てる
    list = range(1, 10)
    result_list = pool.map(square, list, 3)
    for result in result_list:
        print result

実行結果

CPU Core: 2
Process:
pid: 13301, sum: 9, time: 21:22 
pid: 13302, square: 25, time: 21:22
Pool: apply()
pid: 13307, sum: 9, time: 21:23 
pid: 13308, sum: 12, time: 21:24 
pid: 13307, sum: 15, time: 21:25 
Pool: apply_async()
pid: 13308, sum: 9, time: 21:26 
pid: 13307, sum: 12, time: 21:26 
pid: 13308, sum: 15, time: 21:27 
Pool: map()
pid: 13308, square: 4, time: 21:28
pid: 13307, square: 1, time: 21:28
pid: 13307, square: 16, time: 21:29
pid: 13308, square: 9, time: 21:29
map()が終わるまでブロックされる
Pool: map_async()
map_async()はブロックされない
result_list: Ready? False
['pid: 13307, square: 1, time: 21:30', 'pid: 13308, square: 4, time: 21:30', 'pid: 13308, square: 9, time: 21:31', 'pid: 13307, square: 16, time: 21:31']
Pool: imap()
pid: 13307, square: 1, time: 21:32
pid: 13308, square: 4, time: 21:32
pid: 13308, square: 16, time: 21:33
pid: 13307, square: 9, time: 21:33
Pool: map() chunksize=3
pid: 13308, square: 1, time: 21:35
pid: 13308, square: 4, time: 21:36
pid: 13308, square: 9, time: 21:37
pid: 13307, square: 16, time: 21:35
pid: 13307, square: 25, time: 21:36
pid: 13307, square: 36, time: 21:37
pid: 13307, square: 49, time: 21:38
pid: 13307, square: 64, time: 21:39
pid: 13307, square: 81, time: 21:40