Queue – スレッドセーフな FIFO の実装¶
目的: | スレッドセーフな FIFO の実装を提供する |
---|---|
利用できるバージョン: | 1.4 以上 |
Queue モジュールは、マルチスレッドプログラミングに適した FIFO の実装を提供します。それはメッセージかその他のデータを渡して安全にスレッドを実行するために使用されます。呼び出し側をロックすることで、同じキューインスタンスで多くのスレッドと連携するのが簡単になります。Queue のサイズ(要素数)は、メモリ使用量または処理を調整するために制限される可能性があります。
Note
この記事はあなたが既に一般的なキューの特性を理解していることを前提としています。もしそうではないなら、この後の記事を読む前に他の参考文献を読んでみてください。
基本的な FIFO キュー¶
Queue クラスは基本的な先入先出のコンテナを実装します。要素は put() でシーケンスの “一端” に追加され、 get() でもう一方の端から削除されます。
import Queue
q = Queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
このサンプルは、追加された順番でキューから要素が削除されるのを説明するためにシングルスレッドで実行します。
$ python Queue_fifo.py
0
1
2
3
4
LIFO キュー¶
標準的な Queue の FIFO 実装と比較して、 LifoQueue は後入先出の順番になります(通常はスタックのデータ構造と関連します)。
import Queue
q = Queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
put() でキューへ追加された最新の要素が get() で削除されます。
$ python Queue_lifo.py
4
3
2
1
0
優先キュー¶
その要素が作成されてキューへ追加された順番よりも、そのキュー内の要素の特性に基づいて処理しなければならないときがあります。例えば、人事部からのお仕事の表示は、開発者が優先度を高くしてリストに表示するかもしれません。 PriorityQueue は、要素の取り出しを決定するキューのコンテンツのソート順序を使用します。
import Queue
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'New job:', description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put( Job(3, 'Mid-level job') )
q.put( Job(10, 'Low-level job') )
q.put( Job(1, 'Important job') )
while not q.empty():
next_job = q.get()
print 'Processing job:', next_job.description
このシングルスレッドのサンプルでは、そのお仕事は厳密に優先順序をもってキューから取り出されます。マルチスレッドでそのお仕事を処理する場合、 get() が呼び出されたときにキューの要素の優先順序に基づいて処理されます。
$ python Queue_priority.py
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
スレッドでキューを使用する¶
マルチスレッドで Queue クラスを使用するサンプルとして、とてもシンプルな podcast クライアントを作成します。このクライアントは、1つかそれ以上の RSS フィードを読み込み、ダウンロードするためにそのエンクロージャをキューに追加します。そして、スレッドを使用して複数のコンテンツを並行にダウンロードします。これは実用においては単純過ぎて不適切ですが、そのプロトタイプ実装は Queue モジュールと連携するサンプルコードとして十分です。
# システムモジュール
from Queue import Queue
from threading import Thread
import time
# ローカルモジュール
import feedparser
# グローバル変数をセット
num_fetch_threads = 2
enclosure_queue = Queue()
# 実際のアプリではハードコーディングしない...
feed_urls = [ 'http://www.castsampler.com/cast/feed/rss/guest',
]
def downloadEnclosures(i, q):
"""This is the worker thread function.
It processes items in the queue one after
another. These daemon threads go into an
infinite loop, and only exit when
the main thread ends.
"""
while True:
print '%s: Looking for the next enclosure' % i
url = q.get()
print '%s: Downloading:' % i, url
# URL から実際にダウンロードせずに sleep で欺く
time.sleep(i + 2)
q.task_done()
# エンクロージャを取得してスレッドを設定する
for i in range(num_fetch_threads):
worker = Thread(target=downloadEnclosures, args=(i, enclosure_queue,))
worker.setDaemon(True)
worker.start()
# フィードをダウンロードしてキューへエンクロージャの URL を追加する
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries']:
for enclosure in entry.get('enclosures', []):
print 'Queuing:', enclosure['url']
enclosure_queue.put(enclosure['url'])
# 全てのダウンロード処理が完了したことを表す、キューが空になるまで待つ
print '*** Main thread waiting'
enclosure_queue.join()
print '*** Done'
まず操作用のパラメータを設計します。通常はユーザ入力(優先度、データベース、対象)です。サンプルでは、取得する URL リストとスレッドの数をハードコーディングしています。
次に ワーカースレッドで実行して、ダウンロードを行う downloadEnclosures() を定義する必要があります。今回は説明目的なのでこの処理はダウンロードを模倣します。実際にはエンクロージャをダウンロードするために urllib か urllib2 を使用すると良いです。このサンプルでは、スレッド ID からその秒を sleep することでダウンロードによる遅延を模倣します。
スレッドの target 引数に関数を指定すると、ワーカースレッドが開始します。 downloadEnclosures() は、キューが何か返すまで url = q.get() のコードをブロックすることに注意してください。そのため、キューに何かある状態で安全にそのスレッドを開始します。
次のステップは、フィードコンテンツを取り出して(Mark Pilgrim の feedparser モジュールを使用します)、エンクロージャの URL をキューへ追加することです。最初の URL がキューへ追加されると同時に、ワーカースレッドの1つがその要素を取り出してダウンロードを開始します。そのループ処理はフィードがなくなるまで要素を追加し続けます。そして、ワーカースレッドは、ダウンロードするために URL をキューから取り出して終了します。
最後に残された唯一の処理は join() でキューが空になるまで待つことです。
サンプルスクリプトを実行すると、次のような実行結果を確認できます。
0: Looking for the next enclosure
1: Looking for the next enclosure
Queuing: http://http.earthcache.net/htc-01.media.globix.net/COMP009996MOD1/Danny_Meyer.mp3
Queuing: http://feeds.feedburner.com/~r/drmoldawer/~5/104445110/moldawerinthemorning_show34_032607.mp3
Queuing: http://www.podtrac.com/pts/redirect.mp3/twit.cachefly.net/MBW-036.mp3
Queuing: http://media1.podtech.net/media/2007/04/PID_010848/Podtech_calacaniscast22_ipod.mp4
Queuing: http://media1.podtech.net/media/2007/03/PID_010592/Podtech_SXSW_KentBrewster_ipod.mp4
Queuing: http://media1.podtech.net/media/2007/02/PID_010171/Podtech_IDM_ChrisOBrien2.mp3
Queuing: http://feeds.feedburner.com/~r/drmoldawer/~5/96188661/moldawerinthemorning_show30_022607.mp3
*** Main thread waiting
0: Downloading: http://http.earthcache.net/htc-01.media.globix.net/COMP009996MOD1/Danny_Meyer.mp3
1: Downloading: http://feeds.feedburner.com/~r/drmoldawer/~5/104445110/moldawerinthemorning_show34_032607.mp3
0: Looking for the next enclosure
0: Downloading: http://www.podtrac.com/pts/redirect.mp3/twit.cachefly.net/MBW-036.mp3
1: Looking for the next enclosure
1: Downloading: http://media1.podtech.net/media/2007/04/PID_010848/Podtech_calacaniscast22_ipod.mp4
0: Looking for the next enclosure
0: Downloading: http://media1.podtech.net/media/2007/03/PID_010592/Podtech_SXSW_KentBrewster_ipod.mp4
0: Looking for the next enclosure
0: Downloading: http://media1.podtech.net/media/2007/02/PID_010171/Podtech_IDM_ChrisOBrien2.mp3
1: Looking for the next enclosure
1: Downloading: http://feeds.feedburner.com/~r/drmoldawer/~5/96188661/moldawerinthemorning_show30_022607.mp3
0: Looking for the next enclosure
1: Looking for the next enclosure
*** Done
実際の実行結果は http://www.CastSampler.com のゲストアカウントのサブスクリプションを誰かが変更すると変わります。
See also
- Queue
- 本モジュールの標準ライブラリドキュメント
- Deque from collections
- deque (両端キュー) クラスを提供する collections モジュール
- Wikipedia: Queue data structures
- http://en.wikipedia.org/wiki/Queue_(data_structure)
- Wikipedia: FIFO
- http://en.wikipedia.org/wiki/FIFO
- feedparser
- Mark Pilgrim の feedparser モジュール (http://www.feedparser.org/).
- インメモリデータ構造
- 標準ライブラリのその他の複雑なデータ構造