12.31.2015

Python: Do Not Use Pool.map Method in multiprocessing Module

Python: multiprocessing モジュールの Pool.map を使ったときの罠

 

Pool.map を使った並行処理をバッチ処理などで実行した際、キーボードによる中断(KeyboardInterrupt)をすると
プロセスがハングアップすることがある。

コード
#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
    time.sleep(5)
    print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    return x

pool = Pool(2)
ret = pool.map(f, range(4))
print('[%s] result: %s' % (datetime.now(), ret))
通常の実行例
[2015-12-31 22:10:02.319094] start 0: PoolWorker-1
[2015-12-31 22:10:02.319267] start 1: PoolWorker-2
[2015-12-31 22:10:07.320538] end   1: PoolWorker-2
[2015-12-31 22:10:07.320538] end   0: PoolWorker-1
[2015-12-31 22:10:07.321345] start 2: PoolWorker-2
[2015-12-31 22:10:07.321434] start 3: PoolWorker-1
[2015-12-31 22:10:12.321765] end   2: PoolWorker-2
[2015-12-31 22:10:12.321765] end   3: PoolWorker-1
[2015-12-31 22:10:12.322721] result: [0, 1, 2, 3]
Ctrl-C を押した場合
[2015-12-31 22:05:42.108238] start 0: PoolWorker-1
[2015-12-31 22:05:42.109395] start 1: PoolWorker-2
^CProcess PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
Traceback (most recent call last):
(snip)
    return map(*args)
    time.sleep(5)
  File "./x.py", line 9, in f
KeyboardInterrupt
    time.sleep(5)
KeyboardInterrupt
[2015-12-31 22:05:42.631444] start 2: PoolWorker-3
[2015-12-31 22:05:42.632162] start 3: PoolWorker-4
[2015-12-31 22:05:47.632713] end   3: PoolWorker-4
[2015-12-31 22:05:47.632664] end   2: PoolWorker-3

プログラムが停止せず、kill コマンドでプロセスを終了する必要に迫られる。
何度 Ctrl-C を押しても駄目である。

回避策1

try 節で KeyboardInterrupt をトラップすれば、プログラムのハングアップは防げる。
しかし、中断した後もプログラムが続行してしまう。

#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    try:
        print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
        time.sleep(5)
        print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    except KeyboardInterrupt:
        pass
    return x

pool = Pool(2)
ret = pool.map(f, range(4))
print('[%s] result: %s' % (datetime.now(), ret))

実行例

[2015-12-31 22:21:41.647775] start 0: PoolWorker-1
[2015-12-31 22:21:41.647928] start 1: PoolWorker-2
^C[2015-12-31 22:21:42.716623] start 2: PoolWorker-2
[2015-12-31 22:21:42.716755] start 3: PoolWorker-1
[2015-12-31 22:21:47.717815] end   2: PoolWorker-2
[2015-12-31 22:21:47.717832] end   3: PoolWorker-1
Traceback (most recent call last):
(snip)
    waiter.acquire()
KeyboardInterrupt
回避策2

map の代わりに map_async を使えば、問題を回避できる。この場合は即座にプログラムが終了する。
ただし、結果を利用する際には get メソッドとともにタイムアウトの時間(秒数)を指定する必要がある。

#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
    time.sleep(5)
    print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    return x

pool = Pool(2)
p = pool.map_async(f, range(4))
ret = p.get(86400)
print('[%s] result: %s' % (datetime.now(), ret))

実行例

[2015-12-31 22:24:32.482984] start 0: PoolWorker-1
[2015-12-31 22:24:32.483816] start 1: PoolWorker-2
^CTraceback (most recent call last):
(snip)
    time.sleep(5)
KeyboardInterrupt

 

References

0 件のコメント:

コメントを投稿