プロセス間通信

スレッドのように、マルチプロセスの一般的な用途は並列で実行する複数のワーカーにジョブを分割することです。複数プロセスの効率的に使用するには普通はプロセス間通信を必要とします。そのため、その処理は分割されて結果が集約されます。

プロセスへメッセージを渡す

multiprocessing でプロセス間通信を行う最も簡単な方法はメッセージを渡したり、返したりする Queue を使用することです。pickle でシリアライズ可能なオブジェクトは Queue を経由して渡すことができます。

import multiprocessing

class MyFancyClass(object):
    
    def __init__(self, name):
        self.name = name
    
    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print 'Doing something fancy in %s for %s!' % (proc_name, self.name)


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    
    queue.put(MyFancyClass('Fancy Dan'))
    
    # ワーカーが終了するまで待つ
    queue.close()
    queue.join_thread()
    p.join()
    

この短いサンプルは1つのワーカーに対して1つのメッセージを渡しているだけで、メイン処理はワーカーの終了を待ちます。

$ python multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan!

もっと複雑なサンプルとして JoinableQueue からデータを取り出して結果を親プロセスへ返す複数ワーカーを管理する方法を紹介します。 poison pill テクニックはワーカーを停止するために使用されます。実際のタスク設定後、メインプログラムはワーカー毎に “stop” 値をジョブキューへ追加します。ワーカーは特別な値に遭遇したときにその処理ループから脱出します。メインプロセスは、その結果を処理する前に全タスクの終了を待つためにタスクキューの join() メソッドを使用します。

import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill は終了を意味します
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # この処理に少し時間がかかることを意図しています
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
    # コミュニケーションキューを作成する
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # consumers 処理を開始する
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()
    
    # ジョブをキューへ入れる
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))
    
    # 各 consumer へ poison pill を追加する
    for i in xrange(num_consumers):
        tasks.put(None)

    # 全てのタスクの終了を待つ
    tasks.join()
    
    # 結果を表示し始める
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1

ジョブは順番にキューへ追加されますが、追加されたジョブの実行は並列化されるのでジョブが完了する順番については保証されません。

$ python -u multiprocessing_producer_consumer.py
Creating 16 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-9: 8 * 8
Consumer-10: 9 * 9
Consumer-11: Exiting
Consumer-12: Exiting
Consumer-13: Exiting
Consumer-14: Exiting
Consumer-15: Exiting
Consumer-16: Exiting
Consumer-5: Exiting
Consumer-4: Exiting
Consumer-2: Exiting
Consumer-1: Exiting
Consumer-6: Exiting
Consumer-3: Exiting
Consumer-7: Exiting
Consumer-10: Exiting
Consumer-8: Exiting
Consumer-9: Exiting
Result: 4 * 4 = 16
Result: 3 * 3 = 9
Result: 1 * 1 = 1
Result: 0 * 0 = 0
Result: 5 * 5 = 25
Result: 2 * 2 = 4
Result: 6 * 6 = 36
Result: 9 * 9 = 81
Result: 7 * 7 = 49
Result: 8 * 8 = 64

プロセス間でシグナルを送る

イベント クラスはプロセス間で状態に関する情報を通信する最も簡単な方法です。イベントは状態のセット/アンセットを切り替えることができます。Event オブジェクトを使用して、オプションのタイムアウト値でアンセットからセットへの変更イベントを待つことができます。

import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print 'wait_for_event: starting'
    e.wait()
    print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print 'wait_for_event_timeout: starting'
    e.wait(t)
    print 'wait_for_event_timeout: e.is_set()->', e.is_set()


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', 
                                 target=wait_for_event,
                                 args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='non-block', 
                                 target=wait_for_event_timeout, 
                                 args=(e, 2))
    w2.start()

    print 'main: waiting before calling Event.set()'
    time.sleep(3)
    e.set()
    print 'main: event is set'

wait() がタイムアウトするとき、イベントはエラーなく返します。呼び出し側は is_set() を使用してイベントの状態をチェックする責任があります。

$ python -u multiprocessing_event.py
main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True

リソースへのアクセスを制御する

複数のプロセス間で1つのリソースを共有する必要があるような状況で Lock は競合アクセスを避けるために使用されます。

import multiprocessing
import sys

def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')
        
def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()

lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with, args=(lock, sys.stdout))
nw = multiprocessing.Process(target=worker_no_with, args=(lock, sys.stdout))

w.start()
nw.start()

w.join()
nw.join()

このサンプルでは、2つのプロセスがロックを使用して出力ストリームへのアクセスを同期しないとコンソールへのメッセージはごちゃ混ぜになる可能性があります。

$ python multiprocessing_lock.py
Lock acquired via with
Lock acquired directly

同期オペレーション

Condition オブジェクトは、たとえ独立したプロセスであっても、ワークフローの一部の処理は並列に実行されるように同期させて、その他の処理は連続的に実行させます。

import multiprocessing
import time

def stage_1(cond):
    """perform first stage of work, then notify stage_2 to continue"""
    name = multiprocessing.current_process().name
    print 'Starting', name
    with cond:
        print '%s done and ready for stage 2' % name
        cond.notify_all()

def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print 'Starting', name
    with cond:
        cond.wait()
        print '%s running' % name

if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,))
    s2_clients = [
        multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,))
        for i in range(1, 3)
        ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

このサンプルでは、2つのプロセスはジョブの2番目の stage を並列で実行しますが、1番目の stage が実行された後でのみです。

$ python multiprocessing_condition.py
Starting s1
s1 done and ready for stage 2
Starting stage_2[1]
stage_2[1] running
Starting stage_2[2]
stage_2[2] running

リソースへの並列アクセスを制御する

アクセスできる全体の数を制限しながら、複数のワーカーから1つのリソースへ同時にアクセスできると便利なときがあります。例えば、コネクションプールは同時接続数の最大数を制限したり、ネットワークアプリケーションは同時ダウンロード数の最大数を制限します。 Semaphore はそういったコネクション数を制御する方法の1つです。

import random
import multiprocessing
import time

class ActivePool(object):
    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
    def __str__(self):
        with self.lock:
            return str(self.active)

def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print 'Now running: %s' % str(pool)
        time.sleep(random.random())
        pool.makeInactive(name)

if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(target=worker, name=str(i), args=(s, pool))
        for i in range(10)
        ]

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()
        print 'Now running: %s' % str(pool)

このサンプルでは ActivePool クラスはどのプロセスがあるタイミングで実行中かを追跡する簡単で便利な方法を提供します。実際のリソースプールは、おそらくコネクションか、その他の値を新たにアクティブなプロセスへ割り当てます。そして、アクティブなプロセスに割り当てられた処理が完了したときに、そのコネクションか何らかの値を再利用します。ここでサンプルのプールは、3つのプロセスのみが並列実行しているということを示すためにアクティブなプロセス名を保持するために使用されます。

$ python multiprocessing_semaphore.py
Now running: ['0', '1', '2']
Now running: ['0', '1', '2']
Now running: ['0', '1', '3']
Now running: ['0', '1', '2']
Now running: ['1', '4', '5']
Now running: ['4', '6', '7']
Now running: ['4', '5', '6']
Now running: ['1', '3', '4']
Now running: ['4', '6', '8']
Now running: ['4', '8', '9']
Now running: ['1', '3', '4']
Now running: ['4', '5', '6']
Now running: ['4', '5', '6']
Now running: ['4', '5', '6']
Now running: ['8', '9']
Now running: ['8', '9']
Now running: ['8', '9']
Now running: ['8', '9']
Now running: ['9']
Now running: []

共有状態を管理する

先ほどのサンプルでは、アクティブなプロセスのリストは Manager によって作成されるリストオブジェクトの特別な型から ActivePool インスタンスで保持されます。 Manager はその全てのユーザ間で共有する情報を調整する責任があります。

import multiprocessing

def worker(d, key, value):
    d[key] = value

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))
             for i in range(10) 
             ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print 'Results:', d

マネージャ経由でリストを作成することで全てのプロセスで共有されて更新されます。ディクショナリもサポートされます。

$ python multiprocessing_manager_dict.py
Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}

共有ネームスペース

ディクショナリやリストに加えて Manager は共有 Namespace を作成することができます。

import multiprocessing

def producer(ns, event):
    ns.value = 'This is the value'
    event.set()

def consumer(ns, event):
    try:
        value = ns.value
    except Exception, err:
        print 'Before event, consumer got:', str(err)
    event.wait()
    print 'After event, consumer got:', ns.value

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=producer, args=(namespace, event))
    c = multiprocessing.Process(target=consumer, args=(namespace, event))
    
    c.start()
    p.start()
    
    c.join()
    p.join()

class:Namespace へ追加された名前付きの値は Namespace インスタンスを受け取る全てのクライアントから見えます。

$ python multiprocessing_namespaces.py
Before event, consumer got: 'Namespace' object has no attribute 'value'
After event, consumer got: This is the value

Namespace に追加された可変コンテンツを 更新 しても自動的に伝播 しない ということを知っておくことが重要です。

import multiprocessing

def producer(ns, event):
    ns.my_list.append('This is the value') # DOES NOT UPDATE GLOBAL VALUE!
    event.set()

def consumer(ns, event):
    print 'Before event, consumer got:', ns.my_list
    event.wait()
    print 'After event, consumer got:', ns.my_list

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []
    
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=producer, args=(namespace, event))
    c = multiprocessing.Process(target=consumer, args=(namespace, event))
    
    c.start()
    p.start()
    
    c.join()
    p.join()

リストを更新するために、再度ネームスペースオブジェクトへそのリストをアタッチしてください。

$ python multiprocessing_namespaces_mutable.py
Before event, consumer got: []
After event, consumer got: []

プロセスプール

Pool クラスは、対象処理が分解されて独立したワーカーに分散される単純な状況でワーカー数を指定して管理するために使用されます。そのジョブからの返り値は集められてリストとして返されます。pool 引数はタスクプロセスを開始するときに実行する関数やプロセスの数です(1つの子プロセスにつき1回実行される)。

import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print 'Starting', multiprocessing.current_process().name

if __name__ == '__main__':
    inputs = list(range(10))
    print 'Input   :', inputs
    
    builtin_outputs = map(do_calculation, inputs)
    print 'Built-in:', builtin_outputs
    
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks

    print 'Pool    :', pool_outputs

map() メソッドの結果は、並列に個々のタスクが実行されること以外は機能的に組み込み関数の map() と機能的には同じです。プールは並列にその入力を処理するので close()join() は、適切なクリーンアップを保証するためにメインプロセスにタスクプロセスを同期するために使用されます。

$ python multiprocessing_pool.py
Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Starting PoolWorker-11
Starting PoolWorker-12
Starting PoolWorker-13
Starting PoolWorker-14
Starting PoolWorker-15
Starting PoolWorker-16
Starting PoolWorker-1
Starting PoolWorker-2
Starting PoolWorker-4
Starting PoolWorker-3
Starting PoolWorker-5
Starting PoolWorker-6
Starting PoolWorker-8
Starting PoolWorker-7
Starting PoolWorker-9
Starting PoolWorker-10
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

デフォルトでは Pool は指定された数のワーカープロセスを作成します。そして、ジョブがなくなるまでそのワーカープロセスへジョブを渡します。 maxtasksperchild パラメータをセットすることは、数個のタスクが終了した後でワーカープロセスを再起動するようにプールへ伝えます。これは長時間実行中のワーカーがそれ以上にシステムリソースを消費しないようにするために行われます。

import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print 'Starting', multiprocessing.current_process().name

if __name__ == '__main__':
    inputs = list(range(10))
    print 'Input   :', inputs
    
    builtin_outputs = map(do_calculation, inputs)
    print 'Built-in:', builtin_outputs
    
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                maxtasksperchild=2,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    
    print 'Pool    :', pool_outputs

プールは割り当てたタスクを完了したときに、もし処理が無かったとしてもワーカーを再起動します。この出力では、8個のワーカーが作成されますが、10個のタスクしかないので、それぞれのワーカーのうち2つが同時に完了します。

$ python multiprocessing_pool_maxtasksperchild.py
Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Starting PoolWorker-11
Starting PoolWorker-12
Starting PoolWorker-13
Starting PoolWorker-14
Starting PoolWorker-15
Starting PoolWorker-16
Starting PoolWorker-1
Starting PoolWorker-2
Starting PoolWorker-3
Starting PoolWorker-4
Starting PoolWorker-5
Starting PoolWorker-6
Starting PoolWorker-7
Starting PoolWorker-8
Starting PoolWorker-9
Starting PoolWorker-10
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Bookmark and Share