pydata

Keep Looking, Don't Settle

python multiprocessing and threads 02: multiprocessing.Pool

python的miltiprocessing提供了两种方法实现多进程,一种是multiprocessing.Process,另一种是multiprocessing.Pool.在Pool里面又有两种方法,apply(apply_async) 和 map(map_async)。在这里只比较apply(apply_async) 和 map(map_async)。它们的语法很相似,类似于applymap的用法。

它们之间的总结如下表:

Multi-args Concurrence Blocking Ordered-results
map no yes yes yes
apply yes no yes no
map_async no yes no yes
apply_async yes yes no no

In Python 3, a new function starmap can accept multiple arguments.

主要的区别:

  1. Pool产生多个进程池以后,map会利用所有的进程,多个进程会同时运行;apply会等一个进程好了再运行另一个进程。所以apply会明显的比map慢很多。

  2. map传进去的参数是一个list(一批data);而apply传进去的参数是一个整数(一个data),靠for循环来loop整个list

  3. apply_async会利用多个进程,进程切换不频繁的时候,它的运行速度能跟map差不多(下面第一个例子,在算CRE EL做monte carlo simulaiton的时候,每一个进程run一个simulation,每个simulation会跑大概10分钟,多个进程同时跑多个similation;因为每个进程都会run大概10分钟,所以进程切换不频繁)

run method run time (seconds)
apply 1209.96
apply_async 62.54
map 62.30
map_async 186.73
serial run 299.94
  • 但是如果多进程运行的程序耗时很短,进程切换不停,它会比map慢(下面第二个算factorize的例子,每个进程对某一个整数n算factorize,所以花时几短,算完以后再对另一个整数做同样的事,所以进程会切换不停)
run method run time (seconds)
map 5.56
apply_async 99.41
apply 154.40
  • map_asyncmap没有什么区别

  • apply_asyncmap_async返回的结果可能无序,跟输入data的顺序不一致

  • apply_asyncmap_async需要用get()函数来获取结果

第一个例子,func run的时间会比较长
import multiprocessing as mul
import os, time, random
import pandas as pd
import numpy as np
import Tkinter
import timeit
from multiprocessing import Pool
import os, time, random

## Part 1: cre_calc_engine funciton
rs_dsc_thd = 1.05
rs_ltv_thd = 0.8
ref_dsc_thd = 1.25
ref_ltv_thd = 0.75

def cre_calc_engine(*args, **kvargs):
    pass
    # hide the function here

def calc(seed_id):
    global mon_rate_chg, mon_noi_chg, mon_caprate_chg
    mon_chg = pd.DataFrame(columns = [u'mon_rate_chg', u'mon_noi_chg', u'mon_caprate_chg'])
    np.random.seed(seed = seed_id)  # warning: remove this when do simulation
    mon_chg.mon_rate_chg = np.random.normal(0, 1, 120) / 5000
    mon_chg.mon_noi_chg = np.random.normal(0, 1, 120) / 100
    mon_chg.mon_caprate_chg = np.random.normal(0, 1, 120) / 500
    mon_rate_chg = mon_chg.ix[:, 0]
    mon_noi_chg = mon_chg.ix[:, 1]
    mon_caprate_chg = mon_chg.ix[:, 2]

    fin_los = []
    for i in range(new_final.shape[0]):
        vars = new_final.ix[i, :].tolist()
        final_loss = cre_calc_engine(*vars)
        fin_los.append(final_loss)

    # new_final['final_loss'] = fin_los
    return fin_los

### 1: apply will execute ONE workers in the POOL ##################################
new_final = pd.read_pickle(r'/home/hsong01/work/ifrs9/cre/new_final')

start = timeit.default_timer()
if __name__=='__main__':
    print "parent process %s" %os.getpid()
    jobs = []
    p = Pool(20)
    for loop_i in range(20):
        res = p.apply(calc, args = (loop_i, ))
        jobs.append(res)
    p.close()
    p.join()
    for job in jobs:
        print np.sum(job)
end = timeit.default_timer()
print "run time for function apply is %s" %(end - start)
# run time for function apply is 1209.96147704


### 2: apply_async will execute ALL workers in the POOL but return no order  ######## new_final = pd.read_pickle(r'/home/hsong01/work/ifrs9/cre/new_final')

start = timeit.default_timer()
if __name__=='__main__':
    print "parent process %s" %os.getpid()
    jobs = []
    p = Pool(20)
    for loop_i in range(20):
        res = p.apply_async(calc, args = (loop_i, ))
        jobs.append(res)
    p.close()
    p.join()    
    for job in jobs:
        print np.sum(job.get())
end = timeit.default_timer()
print "run time for function apply_async is %s" %(end - start)
# run time for function apply_async is 62.5425729752


### 3: map will execute ALL workers in the POOL and return in order  #################### new_final = pd.read_pickle(r'/home/hsong01/work/ifrs9/cre/new_final')

start = timeit.default_timer()
if __name__=='__main__':
    print "parent process %s" %os.getpid()
    p = Pool(20)
    jobs = p.map(calc, range(20))
    p.close()
    p.join()
    for job in jobs:
        print np.sum(job)
end = timeit.default_timer()
print "run time for map is %s" %(end - start)
# run time for map is 62.3054320812


### 4: map_async will execute ALL workers in the POOL   ###################################
new_final = pd.read_pickle(r'/home/hsong01/work/ifrs9/cre/new_final')

start = timeit.default_timer()
if __name__=='__main__':
    print "parent process %s" %os.getpid()
    p = Pool(20)
    jobs = p.map_async(calc, range(50))
    p.close()
    p.join()
    for job in jobs.get():
        print np.sum(job)
end = timeit.default_timer()
print "run time for map_async is %s" %(end - start)
# run time for map_async is 186.737658978


### 5: apply will execute ONE workers in the POOL ##################################
new_final = pd.read_pickle(r'/home/hsong01/work/ifrs9/cre/new_final')

start = timeit.default_timer()
sss = [calc(iii) for iii in range(5)]
end = timeit.default_timer()

print "run time for serial run is %s" %(end - start)
# run time for serial run is 299.941666126
第二个例子,func fun的时间比较短
from multiprocessing import Process, Queue, Pool
import math
import timeit

def factorize_naive(n):
    """ A naive factorization method. Take integer 'n', return list of
        factors.
    """
    if n < 2:
        return []
    factors = []
    p = 2

    while True:
        if n == 1:
            return factors
        r = n % p
        if r == 0:
            factors.append(p)
            n = n / p
        elif p * p >= n:
            factors.append(n)
            return factors
        elif p > 2:
            # Advance in steps of 2 over odd numbers
            p += 2
        else:
            # If p == 2, get to 3
            p += 1
    assert False, "unreachable"

# function to record the name, parms, and running time
def time_wrap(f):
    def wrap(*args, **kvargs):
        start = timeit.default_timer()
        result = f(*args, **kvargs)
        end = timeit.default_timer()
        print "the run time for function %s with params %s is %s" %(f.__name__, args[1],  end-start)
        return result
    return wrap


@time_wrap
def map_calc(f, npool):
    s = []
    p = Pool(npool)
    jobs = p.map(factorize_naive, range(1000000))
    p.close()
    p.join()
    for job in jobs:
        s.append(job)
    print s[:10]
    return s

map_calc(factorize_naive, 6)
# the run time for function map_calc is 5.56264591217


@time_wrap
def apply_async_calc(f, npool):
    jobs = []
    s = []
    p = Pool(npool)
    for i in range(1000000):
        each_run = p.apply_async(factorize_naive, args = (i, ))
        jobs.append(each_run)
    p.close()
    p.join()
    for job in jobs:
        s.append(job.get())
    return s

apply_async_calc(factorize_naive, 6)
# the run time for function apply_async_calc with params 6 is 99.409334898


@time_wrap
def apply_calc(f, npool):
    jobs = []
    s = []
    p = Pool(npool)
    for i in range(1000000):
        each_run = p.apply(factorize_naive, args = (i, ))
        jobs.append(each_run)
    p.close()
    p.join()
    for job in jobs:
        s.append(job)
    return s

apply_calc(factorize_naive, 6)
# the run time for function apply_calc with params 6 is 154.398054838