multiprocessing の基本

サブプロセスを使用する最も簡単な方法は対象関数と共に Process オブジェクトをインスタンス化することで、その処理を開始させるために start() を呼び出してください。

import multiprocessing

def worker():
    """worker function"""
    print 'Worker'
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

その出力結果は実行した順番通りかどうかよく分かりませんが “Worker” という単語を5回表示します。

$ python multiprocessing_simple.py
Worker
Worker
Worker
Worker
Worker

実行する処理を引数で渡してプロセスを生成できるので本当にかなり便利です。 threading とは違い multiprocessingProcess へ引数を渡すには、その引数は pickle でシリアライズ可能でなければなりません。このサンプルは出力結果がもう少し面白くなるように各ワーカーへ数値を渡します。

import multiprocessing

def worker(num):
    """thread worker function"""
    print 'Worker:', num
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

引数として渡された整数は各ワーカーでメッセージに含めて表示されます。

$ python multiprocessing_simpleargs.py
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

インポート可能な対象関数

threadingmultiprocessing のサンプルの違いの1つは multiprocessing のサンプルで使用されている __main__ の追加の保護機構です。新しいプロセスが開始されるために、子プロセスは対象関数を実装したスクリプトファイルをインポートできる必要があります。そのアプリケーションの __main__ をチェックして main 処理をラッピングすることで、そのモジュールをインポートされてそれぞれの子プロセスで再起的に実行されないことを保証します。別の方法としては、別のスクリプトファイルから対象関数をインポートすることです。

例えば、このメインプログラムは次のようになります。

import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=multiprocessing_import_worker.worker)
        jobs.append(p)
        p.start()

分割したモジュールで定義されているワーカー関数を使用します。

def worker():
    """worker function"""
    print 'Worker'
    return

そして前のセクションで紹介した最初のサンプルと同じ出力を表示します。

$ python multiprocessing_import_main.py
Worker
Worker
Worker
Worker
Worker

カレントプロセスを決定する

プロセスに識別子や名前を付けたりすることは全く不要で面倒なだけです。それぞれの Process インスタンスはプロセスの生成時に変更可能な名前とデフォルト値を持っています。名前付けプロセスはそういったプロセスを追跡するのに便利で、特に同時に実行中のプロセスが複数の型を持つアプリケーションで有効です。

import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(2)
    print name, 'Exiting'

def my_service():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(3)
    print name, 'Exiting'

if __name__ == '__main__':
    service = multiprocessing.Process(name='my_service', target=my_service)
    worker_1 = multiprocessing.Process(name='worker 1', target=worker)
    worker_2 = multiprocessing.Process(target=worker) # use default name

    worker_1.start()
    worker_2.start()
    service.start()

デバッグ出力は行単位にカレントプロセスの名前が表示されます。名前に Process-3 がある行は名前が付けられていないプロセス worker_2 に対応します。

$ python multiprocessing_names.py
worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting

デーモンプロセス

デフォルトでは、メインプログラムは全ての子プロセスが終了するまで終了しません。終了からメインプログラムをブロッキングせずにバックグラウンドプロセスを開始するのは便利なときがあります。このようなサービスは、ワーカーと相互にやり取りする簡単な方法がない可能性がある、もしくはデータを破損したり失ったりせずに処理の途中で終了させるようなサービスです(例えば、サービス監視ツールの “心電図” のような処理を生成するタスク)。

デーモンとしてプログラムをマークするには daemon 属性にブーリアン値を設定してください。デフォルトでは、プロセスはデーモンにならないのでデーモンモードに切り替えるために True を渡します。

import multiprocessing
import time
import sys

def daemon():
    p = multiprocessing.current_process()
    print 'Starting:', p.name, p.pid
    sys.stdout.flush()
    time.sleep(2)
    print 'Exiting :', p.name, p.pid
    sys.stdout.flush()

def non_daemon():
    p = multiprocessing.current_process()
    print 'Starting:', p.name, p.pid
    sys.stdout.flush()
    print 'Exiting :', p.name, p.pid
    sys.stdout.flush()

if __name__ == '__main__':
    d = multiprocessing.Process(name='daemon', target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name='non-daemon', target=non_daemon)
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

全ての非デーモンプロセス(メインプログラムも含む)はデーモンプロセスが2秒間のスリープから復帰する前に終了するので、その出力結果はデーモンプロセスから表示される “Exiting” メッセージを含んでいません。

$ python multiprocessing_daemon.py
Starting: daemon 71087
Starting: non-daemon 71088
Exiting : non-daemon 71088

デーモンプロセスは、実行プロセスが孤児として残ってしまわないようにメインプログラムが終了する前に自動的に終了します。プログラムを実行するときに表示したプロセス ID の値を覚えておき ps のようなコマンドでそのプロセスを調べることで検証することができます。

プロセスを待つ

あるプロセスが処理を実行した後で完全に終了するまで待つには join() メソッドを使用してください。

import multiprocessing
import time
import sys

def daemon():
    print 'Starting:', multiprocessing.current_process().name
    time.sleep(2)
    print 'Exiting :', multiprocessing.current_process().name

def non_daemon():
    print 'Starting:', multiprocessing.current_process().name
    print 'Exiting :', multiprocessing.current_process().name

if __name__ == '__main__':
    d = multiprocessing.Process(name='daemon', target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name='non-daemon', target=non_daemon)
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

メインプロセスは join() で終了するデーモンを待つので、今回は “Exiting” メッセージが表示されます。

$ python multiprocessing_daemon_join.py
Starting: non-daemon
Exiting : non-daemon
Starting: daemon
Exiting : daemon

デフォルトでは join() は無限にブロッキングします。さらにタイムアウト(小数はプロセスが非アクティブになるのを待つ秒数)を引数で渡すこともできます。もしプロセスがタイムアウトの時間内で完了しなかったら、いずれにしても join() から返されます。

import multiprocessing
import time
import sys

def daemon():
    print 'Starting:', multiprocessing.current_process().name
    time.sleep(2)
    print 'Exiting :', multiprocessing.current_process().name

def non_daemon():
    print 'Starting:', multiprocessing.current_process().name
    print 'Exiting :', multiprocessing.current_process().name

if __name__ == '__main__':
    d = multiprocessing.Process(name='daemon', target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name='non-daemon', target=non_daemon)
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print 'd.is_alive()', d.is_alive()
    n.join()

渡されたそのタイムアウトの時間はデーモンがスリープしている時間よりも短いので、そのプロセスは join() が返された後もまだ “実行中” です。

$ python multiprocessing_daemon_join_timeout.py
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True

プロセスを終了する

プロセスへ終了シグナルを送る poison pill メソッドを使用するのは良い方法ではありますが(multiprocessing-queues を参照)、もしプロセスがハングアップ又はデッドロックした場合、強制的にプロセスを kill できるとさらに便利です。プロセスオブジェクトで terminate() を呼び出すと子プロセスを kill します。

import multiprocessing
import time

def slow_worker():
    print 'Starting worker'
    time.sleep(0.1)
    print 'Finished worker'

if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print 'BEFORE:', p, p.is_alive()
    
    p.start()
    print 'DURING:', p, p.is_alive()
    
    p.terminate()
    print 'TERMINATED:', p, p.is_alive()

    p.join()
    print 'JOINED:', p, p.is_alive()

Note

プロセスの終了を反映するために、オブジェクトのステータスを更新するバックグラウンド機構のために終了後に join() することが重要です。

$ python multiprocessing_terminate.py
BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stopped[SIGTERM])> False

プロセスの終了ステータス

プロセスの終了が exitcode 属性を経由してアクセスされるときにステータスコードが生成されます。

exitcode の値とは、

  • == 0 – エラーが発生しなかった
  • > 0 – エラーが発生して、その値で終了した
  • < 0 – プロセスが -1 * exitcode のシグナルで kill された
import multiprocessing
import sys
import time

def exit_error():
    sys.exit(1)

def exit_ok():
    return

def return_value():
    return 1

def raises():
    raise RuntimeError('There was an error!')

def terminated():
    time.sleep(3)

if __name__ == '__main__':
    jobs = []
    for f in [exit_error, exit_ok, return_value, raises, terminated]:
        print 'Starting process for', f.func_name
        j = multiprocessing.Process(target=f, name=f.func_name)
        jobs.append(j)
        j.start()
        
    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print '%s.exitcode = %s' % (j.name, j.exitcode)

例外が発生するプロセスは自動的に exitcode が 1 になります。

$ python multiprocessing_exitcode.py
Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
Process raises:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python
2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python
2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "multiprocessing_exitcode.py", line 24, in raises
    raise RuntimeError('There was an error!')
RuntimeError: There was an error!
exit_error.exitcode = 1
exit_ok.exitcode = 0
return_value.exitcode = 0
raises.exitcode = 1
terminated.exitcode = -15

ロギング

並列処理の不具合をデバッグするとき multiprocessing が提供した内部オブジェクトへアクセスできるとデバッグがやり易いです。 log_to_stderr() という、ロギングを有効にする便利なモジュールレベルの関数があります。その関数は logging を使用してロガーオブジェクトを設定して、ログメッセージが標準エラー経由で送られるようにハンドラを追加します。

import multiprocessing
import logging
import sys

def worker():
    print 'Doing some work'
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()
    

デフォルトでは、ログレベルはメッセージを生成しない NOTSET に設定されています。あなたが望む詳細レベルを渡してロガーを初期化してください。

$ python multiprocessing_log_to_stderr.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

直接的にロガーを操作する(その設定レベルを変更する、又はハンドラを追加する)には get_logger() を使用してください。

import multiprocessing
import logging
import sys

def worker():
    print 'Doing some work'
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

multiprocessing を使用して、ロガーは logging の設定ファイル API を通して設定することもできます。

$ python multiprocessing_get_logger.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

プロセスをサブクラス化する

独立したプロセスのジョブを開始する最も簡単な方法は Process クラスを使用して対象関数を引数で渡すことですが、カスタムサブクラスを使用することもできます。

import multiprocessing

class Worker(multiprocessing.Process):

    def run(self):
        print 'In %s' % self.name
        return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

派生クラスは処理を行うために run() をオーバライドすべきです。

$ python multiprocessing_subclass.py
In Worker-1
In Worker-2
In Worker-3
In Worker-4
In Worker-5
Bookmark and Share