threading – スレッドによる並列処理を管理する

目的:thread モジュールを使用して複数スレッドの実行を簡単に管理する
利用できるバージョン:1.5.2 以上

threading モジュールは、スレッドを扱う thread の低レベルな機能をさらに簡単で pythonic に提供します。スレッドを使用すると、プログラムは1つのプロセス空間で複数の処理を並行に実行できます。

Thread オブジェクト

Thread を使用する最も簡単な方法は、実行させる関数と共に Thread クラスをインスタンス化して、処理を開始させるために start() を呼び出します。

import threading

def worker():
    """thread worker function"""
    print 'Worker'
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

それぞれのスレッドで出力される "Worker" が5行出力されます。

$ python threading_simple.py

Worker
Worker
Worker
Worker
Worker

スレッドを生成して、そのスレッドへどんな処理を実行させるのかを引数で渡せるのは便利です。このサンプルは数字を渡して、それぞれのスレッドが引数で渡された数を表示します。

import threading

def worker(num):
    """thread worker function"""
    print 'Worker: %s' % num
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

引数の数字は、それぞれのスレッドが出力するメッセージに含まれています。

    $ python -u threading_simpleargs.py

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

カレントスレッドを決める

引数を使用してスレッドを識別したり名前付けしたりすることは、面倒であり不要です。それぞれの Thread インスタンスは、デフォルトで名前をもっていて、スレッド生成時に変更できます。スレッドに名前を付けることは、別の処理を扱う複数のサービスをもつサーバプロセスで便利です。

import threading
import time

def worker():
    print threading.currentThread().getName(), 'Starting'
    time.sleep(2)
    print threading.currentThread().getName(), 'Exiting'

def my_service():
    print threading.currentThread().getName(), 'Starting'
    time.sleep(3)
    print threading.currentThread().getName(), 'Exiting'

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # デフォルト名を使う

w.start()
w2.start()
t.start()

デバッグ出力にカレントスレッドの名前が含まれます。スレッド名のカラムに "Thread-1" がある行は、名前を付けていない w2 に対応するスレッドです。

$ python -u threading_names.py
worker Thread-1 Starting
Startingmy_service Starting

Thread-1 workerExiting
 Exiting
my_service Exiting

ほとんどのプログラムは、デバッグに print を使用しません。 logging モジュールは、 %(threadName)s というフォーマッタを使用して全てのログメッセージにスレッド名を組み込みます。ログメッセージにスレッド名を含めると、ソースを見返してトレースするのが簡単になります。

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
                    )

def worker():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

def my_service():
    logging.debug('Starting')
    time.sleep(3)
    logging.debug('Exiting')

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # デフォルト名を使う

w.start()
w2.start()
t.start()

logging もまたスレッドセーフなので、別のスレッドからのメッセージは区別して出力します。

$ python threading_names_log.py
[DEBUG] (worker    ) Starting
[DEBUG] (Thread-1  ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker    ) Exiting
[DEBUG] (Thread-1  ) Exiting
[DEBUG] (my_service) Exiting

デーモンと非デーモンスレッド

これまでのサンプルプログラムは、全てのスレッドの処理が完了するまで明示的に終了するのを待っていました。終了しなくてもメインプログラムをブロッキングすることなく daemon としてスレッドを生成するときがあります。デーモンスレッドを使用することは、スレッドを中断する簡単な方法がない、もしくはデータを破損したり失ったりせずに処理の途中でスレッドを終了させるようなサービスで便利です(例えば、サービス監視ツールで “心電図” を生成するスレッド)。スレッドをデーモンにするには、その setDaemon() メソッドにブーリアン値を渡して呼び出します。デフォルトでは、スレッドはデーモンではないので、デーモンモードを有効にするために True を渡します。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

デーモンスレッドは2秒間の sleep から起きる前に全ての非デーモンスレッドは終了するので、結果の出力には、デーモンスレッドの "Exiting" のメッセージが含まれていないことに注意してください。

$ python threading_daemon.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

デーモンスレッドの処理が完了するまで待つには、 join() メソッドを使用してください。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

join() でデーモンスレッドの終了を待つことで "Exiting" メッセージを出力する機会ができました。

$ python threading_daemon_join.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

デフォルトでは join() は無期限にブロックします。また引数で timeout (スレッドが非アクティブになるために待つ秒を小数で表す)も渡せます。スレッドが timeout で設定した時間内に完了しなくても join() が返されます。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join(1)
print 'd.isAlive()', d.isAlive()
t.join()

渡された timeout は、デーモンスレッドが sleep する時間よりも小さいので、 join() が返された後でもスレッドはまだ “生きています” 。

$ python threading_daemon_join_timeout.py
(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
d.isAlive() True

全てのスレッドを列挙する

メインプロセスが終了する前に、全てのデーモンスレッドが完了していることを保証するためにそういったデーモンスレッドの明示的なハンドラを保持する必要はありません。 enumerate() は、アクティブな Thread インスタンスのリストを返します。このリストには、カレントスレッドも含まれています。カレントスレッドを join() することはできないので(デッドロックを発生させる)、カレントスレッドは読み飛ばさなければなりません。

import random
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def worker():
    """thread worker function"""
    t = threading.currentThread()
    pause = random.randint(1,5)
    logging.debug('sleeping %s', pause)
    time.sleep(pause)
    logging.debug('ending')
    return

for i in range(3):
    t = threading.Thread(target=worker)
    t.setDaemon(True)
    t.start()

main_thread = threading.currentThread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()

ワーカースレッドは乱数で与えた時間を sleep するので、このプログラムの出力結果は変わる可能性があります。例えば、次のようになります。

$ python threading_enumerate.py
(Thread-1  ) sleeping 3
(Thread-2  ) sleeping 2
(Thread-3  ) sleeping 4
(MainThread) joining Thread-1
(Thread-2  ) ending
(Thread-1  ) ending
(MainThread) joining Thread-2
(MainThread) joining Thread-3
(Thread-3  ) ending

スレッドをサブクラス化する

開始時に Thread は、基本的な初期化を行ってからコンストラクタへ渡された関数を呼び出す run() メソッドを呼び出します。 Thread のサブクラスを作成するには、必要な処理を行う run() をオーバーライドしてください。

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')
        return

for i in range(5):
    t = MyThread()
    t.start()

run() の返り値は無視されます。

$ python threading_subclass.py
(Thread-1  ) running
(Thread-2  ) running
(Thread-3  ) running
(Thread-4  ) running
(Thread-5  ) running

Thread コンストラクタへ渡される argskwargs の値はプライベートな変数に保存されるので、そういった引数はサブクラスから簡単にアクセスできません。カスタムスレッドへ引数を渡すには、サブクラスからみえるようにインスタンス変数に値を保存するようにコンストラクタを再定義します。

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

class MyThreadWithArgs(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        return

    def run(self):
        logging.debug('running with %s and %s', self.args, self.kwargs)
        return

for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a':'A', 'b':'B'})
    t.start()

MyThreadWithArgsThread と同じ API を使用しますが、別のクラスです。そのコンストラクタメソッドはさらに引数を取る、もしくは任意のクラスのようにスレッドの目的に直結した別の引数を取るように簡単に変更できます。

$ python threading_subclass_args.py
(Thread-1  ) running with (0,) and {'a': 'A', 'b': 'B'}
(Thread-2  ) running with (1,) and {'a': 'A', 'b': 'B'}
(Thread-3  ) running with (2,) and {'a': 'A', 'b': 'B'}
(Thread-4  ) running with (3,) and {'a': 'A', 'b': 'B'}
(Thread-5  ) running with (4,) and {'a': 'A', 'b': 'B'}

タイマースレッド

TimerThread をサブクラス化する理由の1つでもあり、 threading で提供されています。 Timer は処理の開始を遅延させて、遅延時間内であればいつでも中止できます。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def delayed():
    logging.debug('worker running')
    return

t1 = threading.Timer(3, delayed)
t1.setName('t1')
t2 = threading.Timer(3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

2番目のタイマーは決して実行されず、メインプログラムの残りの部分が終了した後で最初のタイマーが実行されて表示されることに注意してください。これはデーモンスレッドではないので、メインスレッドが終了したときに、暗黙的に join されます。

$ python threading_timer.py
(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1        ) worker running

スレッド間でシグナルを送る

複数スレッドを使用する本質的なところは、並列に実行するために処理を分離させることです。しかし、2つ以上のスレッドで処理を同期するのが重要なときがあります。スレッド間で通信する簡単な方法として Event オブジェクトを使用します。 Event は、 set()clear() のどちらかで内部でフラグを管理します。他のスレッドは、後続の処理ができるようになるまで効率的にブロッキングして、そのフラグが set() になるまで wait() できます。

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')

wait() メソッドは、タイムアウトする前にイベントを待つ時間を表す秒を引数で取ります。これはイベントがセットされているかどうかを示すブーリアン値を返すので、呼び出し側は wait() が返された理由が分かります。 isSet() メソッドは、ブロッキングせずに個別のイベントで使用できます。

このサンプルでは、 wait_for_event_timeout() はずっとブロッキングせずにイベントステータスを確認します。 wait_for_event() は、イベントステータスが変更されるまで返ってこない wait() の呼び出しをブロックします。

$ python threading_event.py
(block     ) wait_for_event starting
(non-block ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(non-block ) event set: False
(non-block ) doing other work
(non-block ) wait_for_event_timeout starting
(MainThread) Event is set
(block     ) event set: True
(non-block ) event set: True
(non-block ) processing event

リソースへのアクセスを制御する

スレッドの処理を同期することに加えて、データを破壊したり失わないように共有リソースへのアクセスを制御することも重要です。Python の組み込みデータ構造は、バイトコードの扱いがアトミックになる副作用からスレッドセーフです(GIL は途中で解放しません)。Python で実装された別のデータ構造、または整数や小数のようなシンプルな型はそういった保護機構がありません。そういったオブジェクトへの同時アクセスを防ぐには Lock オブジェクトを使用してください。

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
class Counter(object):
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start
    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
        finally:
            self.lock.release()

def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.currentThread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

このサンプルでは、 worker() 関数は Counter インスタンスを増加させます。これは2つのスレッドが同時にその内部状態を変更しないように Lock を管理します。 Lock を使用しないと value 属性の変更が失われる可能性があります。

$ python threading_lock.py
(Thread-1  ) Sleeping 0.90
(Thread-2  ) Sleeping 0.16
(MainThread) Waiting for worker threads
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Sleeping 0.25
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Done
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Sleeping 0.42
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Done
(MainThread) Counter: 4

別のスレッドがカレントスレッドを待たせずにロックを獲得しているかどうかを確認するには、 acquire()blocking 引数に False を渡してください。次のサンプルでは、 worker() が3回ロックを獲得しようとして、何回そうしようとしたかを数えます。その一方で lock_holder() は負荷を模倣するためにロックを獲得したときと解放したときに少し一時停止して、ロックの獲得と解法を繰り返します。

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
def lock_holder(lock):
    logging.debug('Starting')
    while True:
        lock.acquire()
        try:
            logging.debug('Holding')
            time.sleep(0.5)
        finally:
            logging.debug('Not holding')
            lock.release()
        time.sleep(0.5)
    return
                    
def worker(lock):
    logging.debug('Starting')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Trying to acquire')
        have_it = lock.acquire(0)
        try:
            num_tries += 1
            if have_it:
                logging.debug('Iteration %d: Acquired',  num_tries)
                num_acquires += 1
            else:
                logging.debug('Iteration %d: Not acquired', num_tries)
        finally:
            if have_it:
                lock.release()
    logging.debug('Done after %d iterations', num_tries)


lock = threading.Lock()

holder = threading.Thread(target=lock_holder, args=(lock,), name='LockHolder')
holder.setDaemon(True)
holder.start()

worker = threading.Thread(target=worker, args=(lock,), name='Worker')
worker.start()

worker() は、3回のロックを獲得するために3回以上のループを繰り返します。

$ python threading_lock_noblock.py
(LockHolder) Starting
(LockHolder) Holding
(Worker    ) Starting
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 1: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 3: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 5: Acquired
(Worker    ) Done after 5 iterations

再入可能なロック

普通の Lock オブジェクトは、同じスレッドであっても複数回、獲得できません。ロックが同じコールチェーン内の複数の関数からアクセスされる場合、望ましくない副作用が発生します。

import threading

lock = threading.Lock()

print 'First try :', lock.acquire()
print 'Second try:', lock.acquire(0)

この場合、両方の関数は同じグローバルロックを使用するので、一方が他方を呼び出すと、2番目のロック獲得が失敗して acquire() へのデフォルト引数を使用してブロックされます。

$ python threading_lock_reacquire.py
First try : True
Second try: False

同じスレッドから別のコードがロックを獲得する必要がある状況では、代わりに RLock を使用してください。

import threading

lock = threading.RLock()

print 'First try :', lock.acquire()
print 'Second try:', lock.acquire(0)

前のサンプルコードからの変更は LockRLock に置き換えただけです。

$ python threading_rlock.py
First try : True
Second try: 1

コンテキストマネージャとしてのロック

ロックは、コンテキストマネージャ API を実装して with 文と互換性があります。 with 文を使用すると、明示的にロックを獲得して解放する必要がなくなります。

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')
        
def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()

lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))

w.start()
nw.start()

worker_with()worker_no_with() の2つの関数は、同等の方法でロックを管理します。

$ python threading_lock_with.py
(Thread-1  ) Lock acquired via with
(Thread-2  ) Lock acquired directly

スレッドを同期する

Events を使用することに加えて、 Condition オブジェクトを使用してスレッドを同期する別の方法があります。 ConditionLock を使用するので共有リソースに関連しています。これはスレッドにリソースが更新されるのを待たせます。このサンプルでは、 consumer() スレッドは、処理を継続する前にセットされる Conditionwait() します。 producer() スレッドは、状態をセットして別のスレッドが処理を継続できるように変更する責任をもちます。

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-2s) %(message)s',
                    )

def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    t = threading.currentThread()
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')

def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

スレッドは Condition に関連付けられたロックを獲得する with 文を使用します。 acquire()release() メソッドを明示的に使用しても同様に動作します。

$ python threading_condition.py
2013-02-17 11:34:26,422 (c1) Starting consumer thread
2013-02-17 11:34:28,423 (c2) Starting consumer thread
2013-02-17 11:34:30,425 (p ) Starting producer thread
2013-02-17 11:34:30,425 (p ) Making resource available
2013-02-17 11:34:30,425 (c1) Resource is available to consumer
2013-02-17 11:34:30,425 (c2) Resource is available to consumer

リソースへの同時アクセスを制限する

アクセスできる全体の数を制限しながら、複数のワーカーから1つのリソースへ同時にアクセスできると便利なときがあります。例えば、コネクションプールは同時接続数の最大数を制限したり、ネットワークアプリケーションは同時ダウンロード数の最大数を制限します。 Semaphore はそういったコネクション数を制御する方法の1つです。

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-2s) %(message)s',
                    )

class ActivePool(object):
    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)

def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.currentThread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(target=worker, name=str(i), args=(s, pool))
    t.start()

このサンプルでは、 ActivePool クラスは、どのスレッドがあるタイミングで実行できるかを追跡する簡単で便利な方法を提供します。実際のリソースプールはコネクションか、新たなアクティブスレッドへの他の値を割り合てて、スレッドが終了したときにその値を再利用します。2つのスレッドのみ並列に実行されているのを表すために、アクティブなスレッドの名前を保持するだけのサンプルがあります。

$ python threading_semaphore.py
2013-02-17 11:34:30,494 (0 ) Waiting to join the pool
2013-02-17 11:34:30,494 (0 ) Running: ['0']
2013-02-17 11:34:30,494 (1 ) Waiting to join the pool
2013-02-17 11:34:30,494 (1 ) Running: ['0', '1']
2013-02-17 11:34:30,495 (2 ) Waiting to join the pool
2013-02-17 11:34:30,495 (3 ) Waiting to join the pool
2013-02-17 11:34:30,595 (0 ) Running: ['1']
2013-02-17 11:34:30,595 (1 ) Running: []
2013-02-17 11:34:30,596 (3 ) Running: ['3']
2013-02-17 11:34:30,596 (2 ) Running: ['3', '2']
2013-02-17 11:34:30,696 (3 ) Running: ['2']
2013-02-17 11:34:30,697 (2 ) Running: []

スレッド固有のデータ

リソースによってはロックにより複数のスレッドでも利用できますが、その他のリソースを “所有” していないスレッドからみえないように保護する必要があります。 local() 関数は、別のスレッドからみえない値を保持するオブジェクトを作成します。

import random
import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

local_data.value は、そのスレッドでセットされるまで他のスレッドからみると存在しないことに注意してください。

$ python threading_local.py
(MainThread) No value yet
(MainThread) value=1000
(Thread-1  ) No value yet
(Thread-2  ) No value yet
(Thread-1  ) value=60
(Thread-2  ) value=10

全てのスレッドが同じ値で開始されるように設定を初期化するには、サブクラス化して __init__()value 属性をセットしてください。

import random
import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

class MyLocal(threading.local):
    def __init__(self, value):
        logging.debug('Initializing %r', self)
        self.value = value

local_data = MyLocal(1000)
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

__init__() は、それぞれのスレッドで一度だけ同じオブジェクト(id() の値に着目)で実行されます。

$ python threading_local_defaults.py
(MainThread) Initializing <__main__.MyLocal object at 0x1005191f0>
(MainThread) value=1000
(Thread-1  ) Initializing <__main__.MyLocal object at 0x1005191f0>
(Thread-1  ) value=1000
(Thread-1  ) value=70
(Thread-2  ) Initializing <__main__.MyLocal object at 0x1005191f0>
(Thread-2  ) value=1000
(Thread-2  ) value=63

See also

threading
本モジュールの標準ライブラリドキュメント
thread
低レベルスレッド API
Queue
スレッド間でメッセージパッシングに便利なスレッドセーフなキュー
multiprocessing
threading API のようにマルチプロセスで動作する API
Bookmark and Share