asynchat – 非同期プロトコルハンドラ

目的:非同期ネットワーク通信プロトコルハンドラ
利用できるバージョン:1.5.2 以上

asynchat モジュールは asyncore 上で構築されるサーバクライアント間でメッセージのやり取りに基づいたプロトコルを簡単に実装します。 async_chat クラスは、データを受け取り、メッセージターミネイタを探す asyncore.dispatcher のサブクラスです。そのサブクラスは、データを受け取ったときに何をするかと、ターミネイタを見つけたときにどう応答するかの2つのみ指定する必要があります。送信するデータは async_chat が管理する FIFO オブジェクトを経由して転送キューに追加されます。

メッセージターミネイタ

受け取ったメッセージは ターミネイタ に基づいて分割されて、 set_terminator() を通してそれぞれのインスタンスで管理されます。設定方法は3つあります。

  1. set_terminator() に文字列の引数が渡された場合、入力データにその文字列が現れたときにメッセージが完了したと見なされます。
  1. 数値の引数が渡された場合、そのバイト数を読み込んだときにメッセージが完了したと見なされます。
  1. None が渡された場合、 async_chat はメッセージの完了を管理しません。

次の EchoServer サンプルは、入力データのコンテキストに依存するシンプルな文字列ターミネイタとメッセージ長ターミネイタの2つを使用します。標準ライブラリドキュメントにある HTTP リクエストハンドラのサンプルは、HTTP ヘッダと HTTP POST リクエストの本文の間で異なるコンテキストに基づいてターミネイタを変更する方法を紹介します。

サーバとハンドラ

asynchatasyncore の違いを簡単に説明するために、 asyncore の記事から同じ EchoServer サンプルを紹介します。サーバオブジェクトはコネクションを受け付ける、ハンドラオブジェクトはそれぞれのクライアントとの通信を扱う、クライアントオブジェクトは通信を開始するといった同じ役割を担います。

asynchat で動作する EchoServer は、今回はあまり関係ない logging の呼び出す回数を少なくして、 asyncore のサンプルと同様に同じオブジェクトを作成します。

import asyncore
import logging
import socket

from asynchat_echo_handler import EchoHandler

class EchoServer(asyncore.dispatcher):
    """Receives connections and establishes handlers for each client.
    """
    
    def __init__(self, address):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(address)
        self.address = self.socket.getsockname()
        self.listen(1)
        return

    def handle_accept(self):
        # クライアントがソケットへ接続したときに呼び出される
        client_info = self.accept()
        EchoHandler(sock=client_info[0])
        # 一度に一クライアントのみを扱うのでハンドラを設定したらクローズする
        # 普通はクローズせずにサーバは停止命令を受け取るか、永遠に実行される
        self.handle_close()
        return
    
    def handle_close(self):
        self.close()

このサンプルは asyncore.dispatcher ではなく asynchat.async_chat をベースクラスとする EchoHandler です。それはわずかに高レベルの抽象化を行って操作するので、読み書きは自動的に扱われます。そのバッファは次の4つを知っている必要があります。

  • (handle_incoming_data() をオーバーライドすることで) 入力データで何をするか
  • (set_terminator() 経由の) 入力メッセージの終わりをどうやって認識するか
  • (found_terminator() で) メッセージが完了したときに何をするか
  • (push() を使用して) 何を送信するか

サンプルアプリケーションは2つの操作モードがあります。1つは ECHO length\n フォームのコマンドを待つか、もう1つは送り返されるデータを待つかです。このモードは、ターミネイタが見つかった後で必要に応じてターミネイタを変更するときに、実行されるメソッドに対するインスタンス変数 process_data の設定によって交互に変わります。

import asynchat
import logging


class EchoHandler(asynchat.async_chat):
    """Handles echoing messages from a single client.
    """

    # 並列にメッセージを送受信するのを説明するために
    # 人為的にバッファサイズを減少させる
    ac_in_buffer_size = 64
    ac_out_buffer_size = 64
    
    def __init__(self, sock):
        self.received_data = []
        self.logger = logging.getLogger('EchoHandler')
        asynchat.async_chat.__init__(self, sock)
        # ECHO コマンドを探すのを開始する
        self.process_data = self._process_command
        self.set_terminator('\n')
        return

    def collect_incoming_data(self, data):
        """Read an incoming message from the client and put it into our outgoing queue."""
        self.logger.debug('collect_incoming_data() -> (%d bytes)\n"""%s"""', len(data), data)
        self.received_data.append(data)

    def found_terminator(self):
        """The end of a command or message has been seen."""
        self.logger.debug('found_terminator()')
        self.process_data()
    
    def _process_command(self):        
        """We have the full ECHO command"""
        command = ''.join(self.received_data)
        self.logger.debug('_process_command() "%s"', command)
        command_verb, command_arg = command.strip().split(' ')
        expected_data_len = int(command_arg)
        self.set_terminator(expected_data_len)
        self.process_data = self._process_message
        self.received_data = []
    
    def _process_message(self):
        """We have read the entire message to be sent back to the client"""
        to_echo = ''.join(self.received_data)
        self.logger.debug('_process_message() echoing\n"""%s"""', to_echo)
        self.push(to_echo)
        # 一度に一つだけ処理するので応答を送信した後で切断する
        self.close_when_done()

完了コマンドが見つかると、ハンドラはメッセージ処理モードを切り替えて、テキストセット全体を受信するのを待ちます。全てのデータを受信すると、送信チャンネルへ追加されて、そのデータを送信したら閉じるようにハンドラを設定します。

クライアント

クライアントは、ほとんどハンドラと同様に動作します。 asyncore の実装と同様に、その送信メッセージはクライアントのコンストラクタへの引数です。ソケットのコネクションが確立されると、 handle_connect() が呼び出されるので、クライアントはコマンドとメッセージデータを送れます。

そのコマンドは直接追加されますが、メッセージテキストには特別な “producer” クラスを使用します。producer は、ネットワーク上で送信するためにデータのチャンクをポーリングします。producer が空文字を返すとき、それはデータがなくなり、停止を書き込んだと想定します。

クライアントは、応答のメッセージデータをただ待ち続けるので、整数ターミネイタを設定して、メッセージ全体を受信するまでリストにデータを追加します。

import asynchat
import logging
import socket


class EchoClient(asynchat.async_chat):
    """Sends messages to the server and receives responses.
    """

    # 並列にメッセージを送受信するのを説明するために
    # 人為的にバッファサイズを減少させる
    ac_in_buffer_size = 64
    ac_out_buffer_size = 64
    
    def __init__(self, host, port, message):
        self.message = message
        self.received_data = []
        self.logger = logging.getLogger('EchoClient')
        asynchat.async_chat.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.logger.debug('connecting to %s', (host, port))
        self.connect((host, port))
        return
        
    def handle_connect(self):
        self.logger.debug('handle_connect()')
        # コマンドを送る
        self.push('ECHO %d\n' % len(self.message))
        # データを送る
        self.push_with_producer(EchoProducer(self.message, buffer_size=self.ac_out_buffer_size))
        # データがそのまま送り返されるはずなのでデータ長をターミネイタにセットする
        self.set_terminator(len(self.message))
    
    def collect_incoming_data(self, data):
        """Read an incoming message from the client and put it into our outgoing queue."""
        self.logger.debug('collect_incoming_data() -> (%d)\n"""%s"""', len(data), data)
        self.received_data.append(data)

    def found_terminator(self):
        self.logger.debug('found_terminator()')
        received_message = ''.join(self.received_data)
        if received_message == self.message:
            self.logger.debug('RECEIVED COPY OF MESSAGE')
        else:
            self.logger.debug('ERROR IN TRANSMISSION')
            self.logger.debug('EXPECTED "%s"', self.message)
            self.logger.debug('RECEIVED "%s"', received_message)
        return

class EchoProducer(asynchat.simple_producer):

    logger = logging.getLogger('EchoProducer')

    def more(self):
        response = asynchat.simple_producer.more(self)
        self.logger.debug('more() -> (%s bytes)\n"""%s"""', len(response), response)
        return response

全てひとつにまとめる

このサンプルのメインプログラムは、同じ asyncore メインループ内にクライアントとサーバを設定します。

import asyncore
import logging
import socket

from asynchat_echo_server import EchoServer
from asynchat_echo_client import EchoClient

logging.basicConfig(level=logging.DEBUG,
                    format='%(name)s: %(message)s',
                    )

address = ('localhost', 0) # カーネルにポート番号を割り当てさせる
server = EchoServer(address)
ip, port = server.address # 与えられたポート番号を調べる

message_data = open('lorem.txt', 'r').read()
client = EchoClient(ip, port, message=message_data)

asyncore.loop()

普通は、独立したプロセスでクライアントやサーバを実行しますが、こうすることで簡単にログ出力を組み合わせて表示できます。

$ python asynchat_echo_main.py
EchoClient: connecting to ('127.0.0.1', 59601)
EchoClient: handle_connect()
EchoProducer: more() -> (64 bytes)
"""Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
"""
EchoHandler: collect_incoming_data() -> (8 bytes)
"""ECHO 166"""
EchoHandler: found_terminator()
EchoHandler: _process_command() "ECHO 166"
EchoHandler: collect_incoming_data() -> (55 bytes)
"""Lorem ipsum dolor sit amet, consectetuer adipiscing eli"""
EchoProducer: more() -> (64 bytes)
"""egestas, enim et consectetuer ullamcorper, lectus ligula rutrum """
EchoHandler: collect_incoming_data() -> (64 bytes)
"""t. Donec
egestas, enim et consectetuer ullamcorper, lectus ligul"""
EchoProducer: more() -> (38 bytes)
"""leo, a
elementum elit tortor eu quam.
"""
EchoHandler: collect_incoming_data() -> (47 bytes)
"""a rutrum leo, a
elementum elit tortor eu quam.
"""
EchoHandler: found_terminator()
EchoHandler: _process_message() echoing
"""Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
egestas, enim et consectetuer ullamcorper, lectus ligula rutrum leo, a
elementum elit tortor eu quam.
"""
EchoProducer: more() -> (0 bytes)
""""""
EchoClient: collect_incoming_data() -> (64)
"""Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
"""
EchoClient: collect_incoming_data() -> (64)
"""egestas, enim et consectetuer ullamcorper, lectus ligula rutrum """
EchoClient: collect_incoming_data() -> (38)
"""leo, a
elementum elit tortor eu quam.
"""
EchoClient: found_terminator()
EchoClient: RECEIVED COPY OF MESSAGE

See also

asynchat
本モジュールの標準ライブラリドキュメント
asyncore
低レベルの非同期 I/O イベントループを実装する asyncore モジュール
Bookmark and Share