sqlite3 – 組み込み関係データベース

目的:SQL をサポートした組み込み関係データベースを実装する
利用できるバージョン:2.5 以上

sqlite3 モジュールは SQLite 関係データベースに対する DB-API 2.0 準拠のインタフェースを提供します。SQLite はアプリケーション内に組み込むように設計された、インプロセスデータベースであり、MySQL, PostgreSQL や Oracle のような独立したサーバプログラムではありません。SQLite は高速且つ柔軟で徹底的にテストされており、プロトタイピングのみならず、アプリケーションによっては本番環境にも使用できます。

データベースを作成する

SQLite データベースはファイルシステム上の1つのファイルに格納されます。ライブラリはそのファイルへのアクセスを管理し、複数のプログラムがそのファイルへ書き込むときにデータが破損しないようにロックします。そのファイルへの初回アクセス時にデータベースが作成されますが、アプリケーションはデータベース内のテーブル定義もしくは スキーマ を管理する責任があります。

このサンプルは connect() でデータベースファイルをオープンする前にデータベースファイルの存在を調べるので、新しいデータベースのスキーマを作成するかどうかが分かります。

import os
import sqlite3

db_filename = 'todo.db'

db_is_new = not os.path.exists(db_filename)

conn = sqlite3.connect(db_filename)

if db_is_new:
    print 'Need to create schema'
else:
    print 'Database exists, assume schema does, too.'

conn.close()

このスクリプトを2回実行すると、そのデータベースファイルが存在しなければ空のファイルを作成することが分かります。

$ ls *.db
ls: *.db: No such file or directory

$ python sqlite3_createdb.py
Need to create schema

$ ls *.db
todo.db

$ python sqlite3_createdb.py
Database exists, assume schema does, too.

新しいデータベースファイルを作成した後、その次のステップはデータベース内のテーブルを定義するスキーマを作成します。このセクションのこの後の全サンプルは、タスク管理テーブルを持つ同じデータベーススキーマを使用します。テーブルは次の通りです。

project

説明
name text プロジェクト名
description text プロジェクトの説明
deadline date プロジェクト全体の期限

task

説明
id number ユニークなタスク識別子
priority integer 優先度の数値、低いほど優先度が高い
details text タスクの詳細
status text タスクの状況 (‘new’, ‘pending’, ‘done’ 又は ‘canceled’ のどれか)
deadline date タスクの期限
completed_on date タスクの完了を表す
project text タスクのプロジェクト名

テーブルを作成する データ定義言語 (DDL) 文は次の通りです。

-- Schema for to-do application examples.

-- Projects are high-level activities made up of tasks
create table project (
    name        text primary key,
    description text,
    deadline    date
);

-- Tasks are steps that can be taken to complete a project
create table task (
    id           integer primary key autoincrement not null,
    priority     integer default 1,
    details      text,
    status       text,
    deadline     date,
    completed_on date,
    project      text not null references project(name)
);

Connection クラスの executescript() メソッドはそのスキーマを作成する DDL 命令を実行するために使用されます。

import os
import sqlite3

db_filename = 'todo.db'
schema_filename = 'todo_schema.sql'

db_is_new = not os.path.exists(db_filename)

with sqlite3.connect(db_filename) as conn:
    if db_is_new:
        print 'Creating schema'
        with open(schema_filename, 'rt') as f:
            schema = f.read()
        conn.executescript(schema)

        print 'Inserting initial data'
        
        conn.execute("""
        insert into project (name, description, deadline)
        values ('pymotw', 'Python Module of the Week', '2010-11-01')
        """)
        
        conn.execute("""
        insert into task (details, status, deadline, project)
        values ('write about select', 'done', '2010-10-03', 'pymotw')
        """)
        
        conn.execute("""
        insert into task (details, status, deadline, project)
        values ('write about random', 'waiting', '2010-10-10', 'pymotw')
        """)
        
        conn.execute("""
        insert into task (details, status, deadline, project)
        values ('write about sqlite3', 'active', '2010-10-17', 'pymotw')
        """)
    else:
        print 'Database exists, assume schema does, too.'

そのテーブルを作成した後、数個の insert 文がサンプルプロジェクトと関連タスクを作成します。 sqlite3 コマンドラインプログラムをデータベースのコンテンツを調べるために使用します。

$ python sqlite3_create_schema.py
Creating schema
Inserting initial data

$ sqlite3 todo.db 'select * from task'
1|1|write about select|done|2010-10-03||pymotw
2|1|write about random|waiting|2010-10-10||pymotw
3|1|write about sqlite3|active|2010-10-17||pymotw

データを取り出す

Python プログラムから task テーブルに保存された値を取り出すには cursor() メソッドでデータベースコネクションから Cursor を作成します。カーソルはデータの一貫したビューを生成して、SQLite のようなトランザクションデータベースシステムで対話的にやり取りする基本的な方法です。

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select id, priority, details, status, deadline from task where project = 'pymotw'
    """)

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print '%2d {%d} %-20s [%-8s] (%s)' % (task_id, priority, details, status, deadline)

クエリの実行は2つのステップで行われます。最初にカーソルの execute() メソッドを実行して、どんなデータを取得するかをデータベースエンジンに伝えます。それから、その結果を取り出すために fetchall() を使用します。返り値はクエリの select 句に記述された列の値を含むタプルのシーケンスです。

$ python sqlite3_select_tasks.py
 1 {1} write about select   [done    ] (2010-10-03)
 2 {1} write about random   [waiting ] (2010-10-10)
 3 {1} write about sqlite3  [active  ] (2010-10-17)

その結果は fetchone() で一度に1つだけ取り出したり、 fetchmany() で固定サイズ分まとめて取り出すこともできます。

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select name, description, deadline from project where name = 'pymotw'
    """)
    name, description, deadline = cursor.fetchone()

    print 'Project details for %s (%s) due %s' % (description, name, deadline)

    cursor.execute("""
    select id, priority, details, status, deadline from task
    where project = 'pymotw' order by deadline
    """)

    print '\nNext 5 tasks:'

    for row in cursor.fetchmany(5):
        task_id, priority, details, status, deadline = row
        print '%2d {%d} %-25s [%-8s] (%s)' % (task_id, priority, details, status, deadline)

fetchmany() へ渡された値は返す要素の最大数です。もし指定した最大数よりも要素が少ないなら、その最大数よりも小さいシーケンスが返されます。

$ python sqlite3_select_variations.py
Project details for Python Module of the Week (pymotw) due 2010-11-01

Next 5 tasks:
 1 {1} write about select        [done    ] (2010-10-03)
 2 {1} write about random        [waiting ] (2010-10-10)
 3 {1} write about sqlite3       [active  ] (2010-10-17)

クエリメタデータ

DB-API 2.0 の仕様では execute() が呼び出された後、 Cursor はその description 属性をセットして、fetch メソッドが返すデータに関する情報を保持します。API の仕様では、その description の値は列名、型、表示サイズ、内部サイズ、精度、スケールや NULL 値を許容するかどうかのフラグを含むタプルのシーケンスになります。

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    cursor.execute("""
    select * from task where project = 'pymotw'
    """)

    print 'Task table has these columns:'
    for colinfo in cursor.description:
        print colinfo

sqlite3 はデータベースへ追加するデータの型やサイズの制約を強制しないので、列名のみがセットされます。

$ python sqlite3_cursor_description.py
Task table has these columns:
('id', None, None, None, None, None, None)
('priority', None, None, None, None, None, None)
('details', None, None, None, None, None, None)
('status', None, None, None, None, None, None)
('deadline', None, None, None, None, None, None)
('completed_on', None, None, None, None, None, None)
('project', None, None, None, None, None, None)

行オブジェクト

デフォルトでは、データベースから “行” として fetch メソッドが返す値はタプルです。呼び出し側はクエリ内の列の順番を覚えておいて、タプルから対応する個々の値を展開する責任があります。あるクエリの値の数が増加する、もしくはそのデータと連携するコードがライブラリに展開されるとき、1つのオブジェクトに列名でその列の値にアクセスすると簡単です。その理由は、そのタプルの要素の順番と数はクエリが修正されると変更され、クエリの結果に依存するコードが壊れにくくなるからです。

Connection オブジェクトは row_factory プロパティを持ち、クエリの結果セットの各行を表すのに作成されたオブジェクトの型を制御します。さらに sqlite3 は1行のファクトリとして使用することを目的とした Row クラスもあります。 Row インスタンスは列のインデックスや名前でアクセスできます。

import sqlite3

db_filename = 'todo.db'

with sqlite3.connect(db_filename) as conn:
    # Row を使用するために行ファクトリを変更する
    conn.row_factory = sqlite3.Row
    
    cursor = conn.cursor()

    cursor.execute("""
    select name, description, deadline from project where name = 'pymotw'
    """)
    name, description, deadline = cursor.fetchone()

    print 'Project details for %s (%s) due %s' % (description, name, deadline)

    cursor.execute("""
    select id, priority, status, deadline, details from task
    where project = 'pymotw' order by deadline
    """)

    print '\nNext 5 tasks:'

    for row in cursor.fetchmany(5):
        print '%2d {%d} %-25s [%-8s] (%s)' % (
            row['id'], row['priority'], row['details'], row['status'], row['deadline'],
            )

この sqlite3_select_variations.py のサンプルは、タプルの代わりに Row インスタンスを使用して書き直しています。project テーブルの行はクエリの列名の位置でその値へアクセスして表示しますが、task テーブルの行を表示する print 文はその代わりにキーワードを使用します。そのため、クエリの列の順番が変更されても問題にはなりません。

$ python sqlite3_row_factory.py
Project details for Python Module of the Week (pymotw) due 2010-11-01

Next 5 tasks:
 1 {1} write about select        [done    ] (2010-10-03)
 2 {1} write about random        [waiting ] (2010-10-10)
 3 {1} write about sqlite3       [active  ] (2010-10-17)

クエリで変数を使用する

プログラム内に組み込まれた文字列リテラルで定義されたクエリを使用することは柔軟性に欠けます。例えば、別のプロジェクトをデータベースに追加するとき、トップ5のタスクを表示するクエリを実行するにはどちらか一方のプロジェクトを更新すべきです。より柔軟性を確保するための1つの方法は、Python の変数を組み合わせて設計したクエリで SQL 文を構築することです。しかし、この方法を使用して文字列でクエリを構築することは危険なので止めた方が良いです。クエリで扱う変数の特殊文字のエスケープ処理を正しく行わないと、SQL の構文解析エラーか、もっと悪い SQL インジェクション攻撃 として知られるセキュリティ上の脆弱性を引き起こします。

クエリで動的な値を使用する適切な方法は、SQL 命令に従い execute() へ渡す ホスト変数 を経由することです。SQL のプレースホルダ値はその SQL 文の実行時にホスト変数の値で置き換えられます。SQL 内に任意の値を追加する代わりに、SQL が構文解析される前でホスト変数を使用することはインジェクション攻撃を避けます。その理由は信頼できない値が SQL 文の構文解析で影響を与える可能性がないからです。SQLite はプレースホルダを用いる2つのクエリ形態、位置と名前をサポートします。

位置パラメータ

クエスチョンマーク (?) は位置引数を表します。それはタプルの要素として execute() に渡されます。

import sqlite3
import sys

db_filename = 'todo.db'
project_name = sys.argv[1]

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    query = "select id, priority, details, status, deadline from task where project = ?"

    cursor.execute(query, (project_name,))

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print '%2d {%d} %-20s [%-8s] (%s)' % (task_id, priority, details, status, deadline)

コマンドライン引数は位置引数として安全にクエリへ渡されて、データベースを汚染するデータにはなりません。

$ python sqlite3_argument_positional.py pymotw
 1 {1} write about select   [done    ] (2010-10-03)
 2 {1} write about random   [waiting ] (2010-10-10)
 3 {1} write about sqlite3  [active  ] (2010-10-17)

名前付きパラメータ

多くのパラメータを持つ、または複数のパラメータがクエリ内で何回か繰り返される複雑なクエリには名前付きパラメータを使用してください。名前付きパラメータは :param_name のようにコロンを接頭辞に取ります。

import sqlite3
import sys

db_filename = 'todo.db'
project_name = sys.argv[1]

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()

    query = """select id, priority, details, status, deadline from task
            where project = :project_name
            order by deadline, priority
            """

    cursor.execute(query, {'project_name':project_name})

    for row in cursor.fetchall():
        task_id, priority, details, status, deadline = row
        print '%2d {%d} %-25s [%-8s] (%s)' % (task_id, priority, details, status, deadline)

位置パラメータも名前付きパラメータも、クエリパーサにより特別な処理が行われるのでクォートやエスケープ処理が必要ありません。

$ python sqlite3_argument_named.py pymotw
 1 {1} write about select        [done    ] (2010-10-03)
 2 {1} write about random        [waiting ] (2010-10-10)
 3 {1} write about sqlite3       [active  ] (2010-10-17)

クエリパラメータは select, insertupdate 文で使用できます。クエリパラメータはリテラル値が正しく処理されるクエリ内のどこでも使用できます。

import sqlite3
import sys

db_filename = 'todo.db'
id = int(sys.argv[1])
status = sys.argv[2]

with sqlite3.connect(db_filename) as conn:
    cursor = conn.cursor()
    query = "update task set status = :status where id = :id"
    cursor.execute(query, {'status':status, 'id':id})

この update 文は2つの名前付きパラメータを使用します。 id 値は変更する行を見つけるために使用され status 値がテーブルに書き込まれます。

$ python sqlite3_argument_update.py 2 done
$ python sqlite3_argument_named.py pymotw
 1 {1} write about select        [done    ] (2010-10-03)
 2 {1} write about random        [done    ] (2010-10-10)
 3 {1} write about sqlite3       [active  ] (2010-10-17)

バルクロード

大量データに対して同じ SQL 命令を適用するには executemany() を使用します。Python で入力のループを行わず低レイヤのライブラリでループを最適化するので、これはデータを読み込むのに便利です。このサンプルプログラムは csv モジュールを用いて、カンマで区切られた値を持つファイルからタスクのリストを読み込みます。そして、データベースへその読み込んだ値をロードします。

import csv
import sqlite3
import sys

db_filename = 'todo.db'
data_filename = sys.argv[1]

SQL = """insert into task (details, priority, status, deadline, project)
         values (:details, :priority, 'active', :deadline, :project)
      """

with open(data_filename, 'rt') as csv_file:
    csv_reader = csv.DictReader(csv_file)
    
    with sqlite3.connect(db_filename) as conn:
        cursor = conn.cursor()
        cursor.executemany(SQL, csv_reader)

サンプルのデータファイル tasks.csv は次のデータを含みます。

deadline,project,priority,details
2010-10-02,pymotw,2,"finish reviewing markup"
2010-10-03,pymotw,2,"revise chapter intros"
2010-10-03,pymotw,1,"subtitle"

プログラムを実行すると次のようになります。

$ python sqlite3_load_csv.py tasks.csv
$ python sqlite3_argument_named.py pymotw
 4 {2} finish reviewing markup   [active  ] (2010-10-02)
 6 {1} subtitle                  [active  ] (2010-10-03)
 1 {1} write about select        [done    ] (2010-10-03)
 5 {2} revise chapter intros     [active  ] (2010-10-03)
 2 {1} write about random        [done    ] (2010-10-10)
 3 {1} write about sqlite3       [active  ] (2010-10-17)

列の型

SQLite は整数、浮動小数点やテキストの列をネイティブにサポートします。これらの型のデータは Python における表現からデータベースへ格納できる値へ sqlite3 により 自動的に変換されます。また必要に応じて元に戻せます。整数値はその値のサイズによりデータベースから intlong 変数へ読み込まれます。 テキストは Connectiontext_factory 属性に変更していない限り unicode として取り出されて保存されます。

SQLite は内部的には少しのデータ型のみをサポートしますが、 sqlite3 には、どんなデータでも列に格納するために Python アプリケーションがカスタム型を定義できます。デフォルトでサポートされていない型の変換は、 detect_types フラグを使用したデータベースコネクションで有効です。 PARSE_DECLTYPES を使用すると、そのテーブルが定義されたときにカスタム型で定義された列になります。

import sqlite3
import sys

db_filename = 'todo.db'

sql = "select id, details, deadline from task"

def show_deadline(conn):
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(sql)
    row = cursor.fetchone()
    for col in ['id', 'details', 'deadline']:
        print '  column:', col
        print '    value :', row[col]
        print '    type  :', type(row[col])
    return

print 'Without type detection:'

with sqlite3.connect(db_filename) as conn:
    show_deadline(conn)

print '\nWith type detection:'

with sqlite3.connect(db_filename, detect_types=sqlite3.PARSE_DECLTYPES) as conn:
    show_deadline(conn)

sqlite3datetime モジュールから datedatetime を使用して、Python で日付やタイムスタンプの列を扱うための変換機能を提供します。両方の日付に関する変換機能は型検出が有効なときに自動的に有効になります。

$ python sqlite3_date_types.py
Without type detection:
  column: id
    value : 1
    type  : <type 'int'>
  column: details
    value : write about select
    type  : <type 'unicode'>
  column: deadline
    value : 2010-10-03
    type  : <type 'unicode'>

With type detection:
  column: id
    value : 1
    type  : <type 'int'>
  column: details
    value : write about select
    type  : <type 'unicode'>
  column: deadline
    value : 2010-10-03
    type  : <type 'datetime.date'>

カスタム型

新しい型を定義するために2つの関数を登録する必要があります。 アダプタ は入力として Python オブジェクトを受け取り、データベースに格納されるバイト列を返します。 コンバータ はデータベースから文字列を受け取り、Python オブジェクトを返します。アダプタ関数を定義するには register_adapter() を、コンバータ関数には register_converter() を使用してください。

import sqlite3
try:
    import cPickle as pickle
except:
    import pickle

db_filename = 'todo.db'

def adapter_func(obj):
    """Convert from in-memory to storage representation.
    """
    print 'adapter_func(%s)\n' % obj
    return pickle.dumps(obj)

def converter_func(data):
    """Convert from storage to in-memory representation.
    """
    print 'converter_func(%r)\n' % data
    return pickle.loads(data)


class MyObj(object):
    def __init__(self, arg):
        self.arg = arg
    def __str__(self):
        return 'MyObj(%r)' % self.arg

# 型を操作する関数を登録する
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)

# 保存するオブジェクトを作成する
# タプルのリストを使用するのでこのシーケンスを直接 executemany() へ渡せる
to_save = [ (MyObj('this is a value to save'),),
            (MyObj(42),),
            ]

with sqlite3.connect(db_filename, detect_types=sqlite3.PARSE_DECLTYPES) as conn:
    # "MyObj" 型の列でテーブルを作成する
    conn.execute("""
    create table if not exists obj (
        id    integer primary key autoincrement not null,
        data  MyObj
    )
    """)
    cursor = conn.cursor()

    # オブジェクトをデータベースへ追加する
    cursor.executemany("insert into obj (data) values (?)", to_save)

    # たった今データベースに保存されたオブジェクトをクエリする
    cursor.execute("select id, data from obj")
    for obj_id, obj in cursor.fetchall():
        print 'Retrieved', obj_id, obj, type(obj)
        print

このサンプルは、あるオブジェクトをデータベースに格納する文字列として保存するために pickle を使用します。このテクニックは任意のオブジェクトを格納するのに便利ですが、 オブジェクト属性をベースにしてクエリすることができません。独自の型に属性値を格納する SQLAlchemy のような、実際の オブジェクト関係マッパー は大量データを扱うのにもっと便利でしょう。

$ python sqlite3_custom_type.py
adapter_func(MyObj('this is a value to save'))
adapter_func(MyObj(42))
converter_func("ccopy_reg\n_reconstructor\np1\n(c__main__\nMyObj\np2\n
c__builtin__\nobject\np3\nNtRp4\n(dp5\nS'arg'\np6\nS'this is a value t
o save'\np7\nsb.")
converter_func("ccopy_reg\n_reconstructor\np1\n(c__main__\nMyObj\np2\n
c__builtin__\nobject\np3\nNtRp4\n(dp5\nS'arg'\np6\nI42\nsb.")
Retrieved 1 MyObj('this is a value to save') <class '__main__.MyObj'>
Retrieved 2 MyObj(42) <class '__main__.MyObj'>

列名から型を派生する

クエリのそのデータに関する型情報のために2つの情報源があります。オリジナルのテーブル定義は、この前のセクションで紹介したように本当の列の型を識別するために使用されます。さらに型識別子は as "name [type]" の形でクエリ内の select 句に含めることもできます。

import sqlite3
try:
    import cPickle as pickle
except:
    import pickle

db_filename = 'todo.db'

def adapter_func(obj):
    """Convert from in-memory to storage representation.
    """
    print 'adapter_func(%s)\n' % obj
    return pickle.dumps(obj)

def converter_func(data):
    """Convert from storage to in-memory representation.
    """
    print 'converter_func(%r)\n' % data
    return pickle.loads(data)


class MyObj(object):
    def __init__(self, arg):
        self.arg = arg
    def __str__(self):
        return 'MyObj(%r)' % self.arg

# 型を操作する関数を登録する
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)

# 保存するオブジェクトを作成する
# タプルのリストを使用するのでこのシーケンスを直接 executemany() へ渡せる
to_save = [ (MyObj('this is a value to save'),),
            (MyObj(42),),
            ]

with sqlite3.connect(db_filename, detect_types=sqlite3.PARSE_COLNAMES) as conn:
    # "MyObj" 型の列でテーブルを作成する
    conn.execute("""
    create table if not exists obj2 (
        id    integer primary key autoincrement not null,
        data  text
    )
    """)
    cursor = conn.cursor()

    # オブジェクトをデータベースへ追加する
    cursor.executemany("insert into obj2 (data) values (?)", to_save)

    # たった今データベースに保存されたオブジェクトをクエリする
    cursor.execute('select id, data as "pickle [MyObj]" from obj2')
    for obj_id, obj in cursor.fetchall():
        print 'Retrieved', obj_id, obj, type(obj)
        print

オリジナルのテーブル定義の代わりにクエリの一部に型があるときは detect_types フラグに PARSE_COLNAMES を使用してください。

$ python sqlite3_custom_type_column.py
adapter_func(MyObj('this is a value to save'))
adapter_func(MyObj(42))
converter_func("ccopy_reg\n_reconstructor\np1\n(c__main__\nMyObj\np2\n
c__builtin__\nobject\np3\nNtRp4\n(dp5\nS'arg'\np6\nS'this is a value t
o save'\np7\nsb.")
converter_func("ccopy_reg\n_reconstructor\np1\n(c__main__\nMyObj\np2\n
c__builtin__\nobject\np3\nNtRp4\n(dp5\nS'arg'\np6\nI42\nsb.")
Retrieved 1 MyObj('this is a value to save') <class '__main__.MyObj'>
Retrieved 2 MyObj(42) <class '__main__.MyObj'>

トランザクション

関係データベースの重要な機能の1つは トランザクション であり、一貫した内部状態を保持するために使用します。トランザクションを有効にすることで1つのコネクションを通した複数の変更に対して、その結果が コミット されて実際にデータベースへ書き込まれるまで他のユーザへ影響を与えずに行うことができます。

変更を保存する

insertupdate 文のどちらかでデータベースを変更するとき、明示的に commit() を呼び出して保存する必要があります。この要求はアプリケーションに対して同時に関連する複数の変更を行う機会を与えます。そして、インクリメンタルではなく 原始的 に複数の変更を保存して、データベースへ複数のクライアントが接続することによる部分的な更新が行われないようにします。

commit() を呼び出す効果はデータベースへ複数のコネクションを持つプログラムを見ると分かり易いです。最初のコネクションで新たな行を追加した後、別のコネクションを使用してその行を読み込もうと2回行います。

import sqlite3

db_filename = 'todo.db'

def show_projects(conn):
    cursor = conn.cursor()
    cursor.execute('select name, description from project')
    for name, desc in cursor.fetchall():
        print '  ', name
    return

with sqlite3.connect(db_filename) as conn1:

    print 'Before changes:'
    show_projects(conn1)

    # あるカーソルで追加する
    cursor1 = conn1.cursor()
    cursor1.execute("""
    insert into project (name, description, deadline)
    values ('virtualenvwrapper', 'Virtualenv Extensions', '2011-01-01')
    """)

    print '\nAfter changes in conn1:'
    show_projects(conn1)

    # 他のコネクションからコミットせずに選択する
    print '\nBefore commit:'
    with sqlite3.connect(db_filename) as conn2:
        show_projects(conn2)

    # コミットしてから他のコネクションで選択する
    conn1.commit()
    print '\nAfter commit:'
    with sqlite3.connect(db_filename) as conn3:
        show_projects(conn3)
    

conn1 がコミットされる前に show_projects() が呼び出されると、その結果はどちらのコネクションが使用されるかに依ります。その変更は conn1 を通して行われたので、その変更されたデータが確認されます。しかし conn2 はそうではありません。コミットした後で、新たなコネクション conn3 ではその追加された行が確認されます。

$ python sqlite3_transaction_commit.py
Before changes:
   pymotw

After changes in conn1:
   pymotw
   virtualenvwrapper

Before commit:
   pymotw

After commit:
   pymotw
   virtualenvwrapper

変更を放棄する

また、コミットされていない変更は rollback() で完全に放棄することもできます。 commit()rollback() メソッドは、普通は同じ try:except ブロックの違う部分から呼び出されます。つまり、エラーはロールバックのトリガーとなります。

import sqlite3

db_filename = 'todo.db'

def show_projects(conn):
    cursor = conn.cursor()
    cursor.execute('select name, description from project')
    for name, desc in cursor.fetchall():
        print '  ', name
    return

with sqlite3.connect(db_filename) as conn:

    print 'Before changes:'
    show_projects(conn)

    try:

        # 削除
        cursor = conn.cursor()
        cursor.execute("delete from project where name = 'virtualenvwrapper'")

        # 設定を表示する
        print '\nAfter delete:'
        show_projects(conn)

        # 処理がエラーを引き起こしたように見せかける
        raise RuntimeError('simulated error')

    except Exception, err:
        # 変更を放棄する
        print 'ERROR:', err
        conn.rollback()
        
    else:
        # 変更を保存する
        conn.commit()

    # 結果を表示する
    print '\nAfter rollback:'
    show_projects(conn)

rollback() を呼び出した後はデータベースに対する変更はありません。

$ python sqlite3_transaction_rollback.py
Before changes:
   pymotw
   virtualenvwrapper

After delete:
   pymotw
ERROR: simulated error

After rollback:
   pymotw
   virtualenvwrapper

分離レベル

sqlite3分離レベル(isolation levels) と呼ばれる3つのロックモードをサポートします。それはコネクション間で非互換な変更を行わないように使用されるロックを制御します。分離レベルはコネクションがオープンされるときに isolation_level 引数として文字列を渡すことでセットします。そのため、コネクションが違えば違う値を使用できます。

このプログラムは同じデータベースに対して独立したコネクションを使用するスレッドのイベントの順番で違う分離レベルの効果を説明します。4つのスレッドが作成されます。2つのスレッドは既存の行を更新することでデータベースに変更を書き込みます。他の2つのスレッドは task テーブルから全ての行を読み込もうとします。

import logging
import sqlite3
import sys
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-10s) %(message)s',
                    )

db_filename = 'todo.db'
isolation_level = sys.argv[1]

def writer():
    my_name = threading.currentThread().name
    logging.debug('connecting')
    with sqlite3.connect(db_filename, isolation_level=isolation_level) as conn:
        cursor = conn.cursor()
        logging.debug('connected')
        cursor.execute('update task set priority = priority + 1')
        logging.debug('changes made')
        logging.debug('waiting to synchronize')
        ready.wait() # 同期
        logging.debug('PAUSING')
        time.sleep(1)
        conn.commit()
        logging.debug('CHANGES COMMITTED')
    return

def reader():
    my_name = threading.currentThread().name
    with sqlite3.connect(db_filename, isolation_level=isolation_level) as conn:
        cursor = conn.cursor()
        logging.debug('waiting to synchronize')
        ready.wait() # 同期
        logging.debug('wait over')
        cursor.execute('select * from task')
        logging.debug('SELECT EXECUTED')
        results = cursor.fetchall()
        logging.debug('results fetched')
    return

if __name__ == '__main__':
    ready = threading.Event()

    threads = [
        threading.Thread(name='Reader 1', target=reader),
        threading.Thread(name='Reader 2', target=reader),
        threading.Thread(name='Writer 1', target=writer),
        threading.Thread(name='Writer 2', target=writer),
        ]
    
    [ t.start() for t in threads ]
    
    time.sleep(1)
    logging.debug('setting ready')
    ready.set()

    [ t.join() for t in threads ]

そのスレッドは threading モジュールの Event を使用して同期します。 writer() 関数は接続して、データベースに変更を書き込みますが、そのイベントが発生する前にはコミットしません。 reader() 関数は接続してから、同期イベントの発生後までデータベースにクエリするのを待ちます。

遅延(Deferred)

デフォルトの分離レベルは DEFERRED です。遅延モードを使用するとデータベースをロックしますが、1度に1つの変更のみが開始されます。これまでの全てのサンプルは遅延モードを使用します。

$ python sqlite3_isolation_levels.py DEFERRED
2013-02-17 11:33:37,164 (Reader 1  ) waiting to synchronize
2013-02-17 11:33:37,164 (Reader 2  ) waiting to synchronize
2013-02-17 11:33:37,164 (Writer 1  ) connecting
2013-02-17 11:33:37,165 (Writer 2  ) connecting
2013-02-17 11:33:37,165 (Writer 1  ) connected
2013-02-17 11:33:37,165 (Writer 2  ) connected
2013-02-17 11:33:37,166 (Writer 1  ) changes made
2013-02-17 11:33:37,166 (Writer 1  ) waiting to synchronize
2013-02-17 11:33:38,166 (MainThread) setting ready
2013-02-17 11:33:38,166 (Reader 2  ) wait over
2013-02-17 11:33:38,166 (Reader 1  ) wait over
2013-02-17 11:33:38,166 (Writer 1  ) PAUSING
2013-02-17 11:33:38,167 (Reader 2  ) SELECT EXECUTED
2013-02-17 11:33:38,167 (Reader 2  ) results fetched
2013-02-17 11:33:38,167 (Reader 1  ) SELECT EXECUTED
2013-02-17 11:33:38,167 (Reader 1  ) results fetched
2013-02-17 11:33:39,172 (Writer 1  ) CHANGES COMMITTED
2013-02-17 11:33:39,221 (Writer 2  ) changes made
2013-02-17 11:33:39,221 (Writer 2  ) waiting to synchronize
2013-02-17 11:33:39,221 (Writer 2  ) PAUSING
2013-02-17 11:33:40,224 (Writer 2  ) CHANGES COMMITTED

即時(Immediate)

即時モードは変更が開始されるとすぐにデータベースをロックして、そのトランザクションがコミットされるまで他のカーソルから変更できないようにします。それは書き込みよりも読み込みの方が多いが、複雑な書き込みを行うデータベースには適しています。読み込みはトランザクションが進行中でもブロックされないからです。

$ python sqlite3_isolation_levels.py IMMEDIATE
2013-02-17 11:33:40,260 (Reader 1  ) waiting to synchronize
2013-02-17 11:33:40,260 (Writer 1  ) connecting
2013-02-17 11:33:40,260 (Writer 2  ) connecting
2013-02-17 11:33:40,260 (Reader 2  ) waiting to synchronize
2013-02-17 11:33:40,260 (Writer 1  ) connected
2013-02-17 11:33:40,260 (Writer 2  ) connected
2013-02-17 11:33:40,261 (Writer 1  ) changes made
2013-02-17 11:33:40,261 (Writer 1  ) waiting to synchronize
2013-02-17 11:33:41,261 (MainThread) setting ready
2013-02-17 11:33:41,261 (Writer 1  ) PAUSING
2013-02-17 11:33:41,261 (Reader 2  ) wait over
2013-02-17 11:33:41,262 (Reader 1  ) wait over
2013-02-17 11:33:41,262 (Reader 2  ) SELECT EXECUTED
2013-02-17 11:33:41,262 (Reader 1  ) SELECT EXECUTED
2013-02-17 11:33:41,262 (Reader 1  ) results fetched
2013-02-17 11:33:41,262 (Reader 2  ) results fetched
2013-02-17 11:33:42,273 (Writer 1  ) CHANGES COMMITTED
2013-02-17 11:33:42,317 (Writer 2  ) changes made
2013-02-17 11:33:42,318 (Writer 2  ) waiting to synchronize
2013-02-17 11:33:42,318 (Writer 2  ) PAUSING
2013-02-17 11:33:43,320 (Writer 2  ) CHANGES COMMITTED

排他(Exclusive)

排他モードは全ての読み込みと書き込みに対してデータベースをロックします。それぞれの排他コネクションが他の全てのユーザをブロックするので、データベースのパフォーマンスが重要な状況に限定して使用すべきです。

$ python sqlite3_isolation_levels.py EXCLUSIVE
2013-02-17 11:33:43,359 (Reader 2  ) waiting to synchronize
2013-02-17 11:33:43,359 (Reader 1  ) waiting to synchronize
2013-02-17 11:33:43,359 (Writer 1  ) connecting
2013-02-17 11:33:43,359 (Writer 2  ) connecting
2013-02-17 11:33:43,359 (Writer 1  ) connected
2013-02-17 11:33:43,359 (Writer 2  ) connected
2013-02-17 11:33:43,361 (Writer 1  ) changes made
2013-02-17 11:33:43,361 (Writer 1  ) waiting to synchronize
2013-02-17 11:33:44,360 (MainThread) setting ready
2013-02-17 11:33:44,361 (Reader 1  ) wait over
2013-02-17 11:33:44,361 (Reader 2  ) wait over
2013-02-17 11:33:44,361 (Writer 1  ) PAUSING
2013-02-17 11:33:45,364 (Writer 1  ) CHANGES COMMITTED
2013-02-17 11:33:45,405 (Reader 1  ) SELECT EXECUTED
2013-02-17 11:33:45,405 (Reader 2  ) SELECT EXECUTED
2013-02-17 11:33:45,405 (Reader 1  ) results fetched
2013-02-17 11:33:45,405 (Reader 2  ) results fetched
2013-02-17 11:33:45,412 (Writer 2  ) changes made
2013-02-17 11:33:45,412 (Writer 2  ) waiting to synchronize
2013-02-17 11:33:45,412 (Writer 2  ) PAUSING
2013-02-17 11:33:46,430 (Writer 2  ) CHANGES COMMITTED

最初の writer が変更を開始するので、その reader と2番目の writer は最初の writer がコミットされるまでブロックします。 sleep() 呼び出しは writer スレッドで人工的な遅延を発生させて、他のコネクションがブロックされているという現象を目立たせます。

自動コミット

コネクションの isolation_level パラメータは自動コミットモードを有効にする None をセットすることもできます。自動コミットが有効になると、それぞれの execute() 呼び出しは終了時に即コミットされます。自動コミットモードは、1つのテーブルに小さなデータを追加するような短いトランザクションに適しています。スレッド間で衝突する機会が少ないので、データベースはできるだけ短い時間ロックされます。

import logging
import sqlite3
import sys
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-10s) %(message)s',
                    )

db_filename = 'todo.db'
isolation_level = None # 自動コミットモード

def writer():
    my_name = threading.currentThread().name
    logging.debug('connecting')
    with sqlite3.connect(db_filename, isolation_level=isolation_level) as conn:
        cursor = conn.cursor()
        logging.debug('connected')
        cursor.execute('update task set priority = priority + 1')
        logging.debug('changes made')
        logging.debug('waiting to synchronize')
        ready.wait() # 同期
        logging.debug('PAUSING')
        time.sleep(1)
    return

def reader():
    my_name = threading.currentThread().name
    with sqlite3.connect(db_filename, isolation_level=isolation_level) as conn:
        cursor = conn.cursor()
        logging.debug('waiting to synchronize')
        ready.wait() # 同期
        logging.debug('wait over')
        cursor.execute('select * from task')
        logging.debug('SELECT EXECUTED')
        results = cursor.fetchall()
        logging.debug('results fetched')
    return

if __name__ == '__main__':
    ready = threading.Event()

    threads = [
        threading.Thread(name='Reader 1', target=reader),
        threading.Thread(name='Reader 2', target=reader),
        threading.Thread(name='Writer 1', target=writer),
        threading.Thread(name='Writer 2', target=writer),
        ]
    
    [ t.start() for t in threads ]
    
    time.sleep(1)
    logging.debug('setting ready')
    ready.set()

    [ t.join() for t in threads ]

commit() の明示的な呼び出しを除去している点を除けば sqlite3_autocommit.pysqlite3_isolation_levels.py と同じです。しかし、どちらの reader もクエリを開始する前に両方の writer スレッドの処理が終了するので出力は違っています。

$ python sqlite3_autocommit.py
2013-02-17 11:33:46,487 (Reader 2  ) waiting to synchronize
2013-02-17 11:33:46,487 (Reader 1  ) waiting to synchronize
2013-02-17 11:33:46,488 (Writer 1  ) connecting
2013-02-17 11:33:46,488 (Writer 1  ) connected
2013-02-17 11:33:46,488 (Writer 2  ) connecting
2013-02-17 11:33:46,488 (Writer 2  ) connected
2013-02-17 11:33:46,489 (Writer 1  ) changes made
2013-02-17 11:33:46,490 (Writer 1  ) waiting to synchronize
2013-02-17 11:33:46,491 (Writer 2  ) changes made
2013-02-17 11:33:46,491 (Writer 2  ) waiting to synchronize
2013-02-17 11:33:47,489 (MainThread) setting ready
2013-02-17 11:33:47,489 (Writer 1  ) PAUSING
2013-02-17 11:33:47,489 (Writer 2  ) PAUSING
2013-02-17 11:33:47,490 (Reader 1  ) wait over
2013-02-17 11:33:47,490 (Reader 2  ) wait over
2013-02-17 11:33:47,490 (Reader 2  ) SELECT EXECUTED
2013-02-17 11:33:47,490 (Reader 1  ) SELECT EXECUTED
2013-02-17 11:33:47,491 (Reader 2  ) results fetched
2013-02-17 11:33:47,491 (Reader 1  ) results fetched

ユーザ定義の振る舞い

sqlite3 は Python で実装された関数やクラスでデータベースの機能拡張と複数の拡張の仕組みをサポートします。

SQL で Python 関数を使用する

SQL 構文はその列のリストか select 文の where 句のどちらかで、クエリ中の関数呼び出しをサポートします。この機能は SQL にクエリから処理したデータを返す前にそのデータを処理することを可能にします。そして、違うフォーマット間の変換に使用できて、純粋な SQL では体裁の悪い演算を行い、アプリケーションのコードを再利用します。

import sqlite3

db_filename = 'todo.db'

def encrypt(s):
    print 'Encrypting %r' % s
    return s.encode('rot-13')

def decrypt(s):
    print 'Decrypting %r' % s
    return s.encode('rot-13')


with sqlite3.connect(db_filename) as conn:

    conn.create_function('encrypt', 1, encrypt)
    conn.create_function('decrypt', 1, decrypt)
    cursor = conn.cursor()

    # Raw values
    print 'Original values:'
    query = "select id, details from task"
    cursor.execute(query)
    for row in cursor.fetchall():
        print row

    print '\nEncrypting...'
    query = "update task set details = encrypt(details)"
    cursor.execute(query)
    
    print '\nRaw encrypted values:'
    query = "select id, details from task"
    cursor.execute(query)
    for row in cursor.fetchall():
        print row
    
    print '\nDecrypting in query...'
    query = "select id, decrypt(details) from task"
    cursor.execute(query)
    for row in cursor.fetchall():
        print row

Connectioncreate_function() メソッドで SQL から見えるように関数を公開します。そのパラメータは(SQL 内で使用される)関数の名前、関数が受け取る引数の数や公開する Python の関数です。

$ python sqlite3_create_function.py
Original values:
(1, u'write about select')
(2, u'write about random')
(3, u'write about sqlite3')
(4, u'finish reviewing markup')
(5, u'revise chapter intros')
(6, u'subtitle')

Encrypting...
Encrypting u'write about select'
Encrypting u'write about random'
Encrypting u'write about sqlite3'
Encrypting u'finish reviewing markup'
Encrypting u'revise chapter intros'
Encrypting u'subtitle'

Raw encrypted values:
(1, u'jevgr nobhg fryrpg')
(2, u'jevgr nobhg enaqbz')
(3, u'jevgr nobhg fdyvgr3')
(4, u'svavfu erivrjvat znexhc')
(5, u'erivfr puncgre vagebf')
(6, u'fhogvgyr')

Decrypting in query...
Decrypting u'jevgr nobhg fryrpg'
Decrypting u'jevgr nobhg enaqbz'
Decrypting u'jevgr nobhg fdyvgr3'
Decrypting u'svavfu erivrjvat znexhc'
Decrypting u'erivfr puncgre vagebf'
Decrypting u'fhogvgyr'
(1, u'write about select')
(2, u'write about random')
(3, u'write about sqlite3')
(4, u'finish reviewing markup')
(5, u'revise chapter intros')
(6, u'subtitle')

カスタム集約

集約関数は多くの個々のデータを集めて、何らかの方法でそういったデータを集約します。組み込みの集約関数の例は avg() (平均), min(), max()count() です。

sqlite3 が使用する集約の API は2つのメソッドを持つクラスで定義されます。 step() メソッドはそのクエリが処理されるときに、それぞれのデータの値につき呼び出されます。 finalize() メソッドはそのクエリの最後で1回呼び出されて集約された値を返します。このサンプルは算術的な mode の集約関数を実装します。その集約関数は入力に最も多く現れた値を返します。

import sqlite3
import collections

db_filename = 'todo.db'

class Mode(object):
    def __init__(self):
        self.counter = collections.Counter()
    def step(self, value):
        print 'step(%r)' % value
        self.counter[value] += 1
    def finalize(self):
        result, count = self.counter.most_common(1)[0]
        print 'finalize() -> %r (%d times)' % (result, count)
        return result

with sqlite3.connect(db_filename) as conn:

    conn.create_aggregate('mode', 1, Mode)
    
    cursor = conn.cursor()
    cursor.execute("select mode(deadline) from task where project = 'pymotw'")
    row = cursor.fetchone()
    print 'mode(deadline) is:', row[0]

Connectioncreate_aggregate() メソッドで集約クラスを登録します。そのパラメータは(SQL 内で使用される)関数の名前、 step() が受け取る引数の数と使用するクラスです。

$ python sqlite3_create_aggregate.py
step(u'2010-10-03')
step(u'2010-10-10')
step(u'2010-10-17')
step(u'2010-10-02')
step(u'2010-10-03')
step(u'2010-10-03')
finalize() -> u'2010-10-03' (3 times)
mode(deadline) is: 2010-10-03

カスタムソート

照合(collation) は SQL クエリの order by セクションで使用される比較関数です。カスタム照合は内部的に SQLite がソートできないデータ型を比較するために使用されます。例えば、カスタム照合はこの前のセクションで紹介した sqlite3_custom_type.py で保存した pickle 化されたオブジェクトをソートするために必要です。

import sqlite3
try:
    import cPickle as pickle
except:
    import pickle

db_filename = 'todo.db'

def adapter_func(obj):
    return pickle.dumps(obj)

def converter_func(data):
    return pickle.loads(data)

class MyObj(object):
    def __init__(self, arg):
        self.arg = arg
    def __str__(self):
        return 'MyObj(%r)' % self.arg
    def __cmp__(self, other):
        return cmp(self.arg, other.arg)

# 型を操作する関数を登録する
sqlite3.register_adapter(MyObj, adapter_func)
sqlite3.register_converter("MyObj", converter_func)

def collation_func(a, b):
    a_obj = converter_func(a)
    b_obj = converter_func(b)
    print 'collation_func(%s, %s)' % (a_obj, b_obj)
    return cmp(a_obj, b_obj)

with sqlite3.connect(db_filename, detect_types=sqlite3.PARSE_DECLTYPES) as conn:
    # 照合を定義する
    conn.create_collation('unpickle', collation_func)

    # テーブルをクリアして新しい値を追加する
    conn.execute('delete from obj')
    conn.executemany('insert into obj (data) values (?)',
                     [(MyObj(x),) for x in xrange(5, 0, -1)],
                     )

    # たった今データベースに保存されたオブジェクトをクエリする
    print '\nQuerying:'
    cursor = conn.cursor()
    cursor.execute("select id, data from obj order by data collate unpickle")
    for obj_id, obj in cursor.fetchall():
        print obj_id, obj
        print

照合関数の引数はバイト列なので、その引数は比較処理が行われる前に unpickle されて MyObj インスタンスへ変換されます。

$ python sqlite3_create_collation.py

Querying:
collation_func(MyObj(5), MyObj(4))
collation_func(MyObj(4), MyObj(3))
collation_func(MyObj(4), MyObj(2))
collation_func(MyObj(3), MyObj(2))
collation_func(MyObj(3), MyObj(1))
collation_func(MyObj(2), MyObj(1))
7 MyObj(1)

6 MyObj(2)

5 MyObj(3)

4 MyObj(4)

3 MyObj(5)

データへのアクセスを制限する

SQLite は他の大規模な関係データベースで見られるユーザアクセス制御を行いませんが、列に対するアクセスを制限する仕組みを持っています。それぞれのコネクションは、要求される基準で実行時に列へのアクセスを許可する、または拒否するために 認証関数 をインストールすることができます。認証関数は SQL 文の構文解析を行うときに実行されて、5つの引数が渡されます。1つ目は実行される操作の種別(読み込み、書き込み、削除、その他)を示すアクションコードです。残りの引数はアクションコード次第で変わってきます。 SQLITE_READ 操作では、その引数はテーブルの名前、列の名前、そのアクセスが発生する SQL の場所(メインクエリ、トリガー等)と None です。

import sqlite3

db_filename = 'todo.db'

def authorizer_func(action_code, table, column, sql_location, ignore):
    print '\nauthorizer_func(%s, %s, %s, %s, %s)' % \
        (action_code, table, column, sql_location, ignore)

    response = sqlite3.SQLITE_OK # be permissive by default

    if action_code == sqlite3.SQLITE_SELECT:
        print 'requesting permission to run a select statement'
        response = sqlite3.SQLITE_OK
    
    elif action_code == sqlite3.SQLITE_READ:
        print 'requesting permission to access the column %s.%s from %s' % \
            (table, column, sql_location)
        if column == 'details':
            print '  ignoring details column'
            response = sqlite3.SQLITE_IGNORE
        elif column == 'priority':
            print '  preventing access to priority column'
            response = sqlite3.SQLITE_DENY

    return response

with sqlite3.connect(db_filename) as conn:
    conn.row_factory = sqlite3.Row
    conn.set_authorizer(authorizer_func)

    print 'Using SQLITE_IGNORE to mask a column value:'
    cursor = conn.cursor()
    cursor.execute("select id, details from task where project = 'pymotw'")
    for row in cursor.fetchall():
        print row['id'], row['details']

    print '\nUsing SQLITE_DENY to deny access to a column:'
    cursor.execute("select id, priority from task where project = 'pymotw'")
    for row in cursor.fetchall():
        print row['id'], row['details']

このサンプルは SQLITE_IGNORE を使用して、クエリ結果の task.details 列の文字列を Null 値で置き換えます。さらに SQLITE_DENY を返すことで task.priority 列への全てのアクセスを SQLite に例外を発生させることで拒否します。

$ python sqlite3_set_authorizer.py
Using SQLITE_IGNORE to mask a column value:

authorizer_func(21, None, None, None, None)
requesting permission to run a select statement

authorizer_func(20, task, id, main, None)
requesting permission to access the column task.id from main

authorizer_func(20, task, details, main, None)
requesting permission to access the column task.details from main
  ignoring details column

authorizer_func(20, task, project, main, None)
requesting permission to access the column task.project from main
1 None
2 None
3 None
4 None
5 None
6 None

Using SQLITE_DENY to deny access to a column:

authorizer_func(21, None, None, None, None)
requesting permission to run a select statement

authorizer_func(20, task, id, main, None)
requesting permission to access the column task.id from main

authorizer_func(20, task, priority, main, None)
requesting permission to access the column task.priority from main
  preventing access to priority column
Traceback (most recent call last):
  File "sqlite3_set_authorizer.py", line 47, in <module>
    cursor.execute("select id, priority from task where project = 'pymotw'")
sqlite3.DatabaseError: access to task.priority is prohibited

アクションコードは sqlite3SQLITE_ の接頭辞を持つ定数が利用できると考えられます。SQL 文のそれぞれの種別はフラグで指定されて、同様に個々の列に対するアクセス制御が行われます。

インメモリデータベース

SQLite はディスク上のファイルに頼らずに RAM 上での完全なデータベース管理をサポートします。インメモリデータベースはデータベースをテスト間で保存する必要がない自動テスト、もしくはスキーマやその他のデータベース機能の検証を行うときに便利です。インメモリデータベースをオープンするには Connection を作成するとき、ファイル名の代わりに ':memory:' を使用してください。

import sqlite3

schema_filename = 'todo_schema.sql'

with sqlite3.connect(':memory:') as conn:
    conn.row_factory = sqlite3.Row
    
    print 'Creating schema'
    with open(schema_filename, 'rt') as f:
        schema = f.read()
    conn.executescript(schema)

    print 'Inserting initial data'
    conn.execute("""
        insert into project (name, description, deadline)
        values ('pymotw', 'Python Module of the Week', '2010-11-01')
        """)
    data = [
        ('write about select', 'done', '2010-10-03', 'pymotw'),
        ('write about random', 'waiting', '2010-10-10', 'pymotw'),
        ('write about sqlite3', 'active', '2010-10-17', 'pymotw'),
        ]
    conn.executemany("""
        insert into task (details, status, deadline, project)
        values (?, ?, ?, ?)
        """, data)

    print 'Looking for tasks...'
    cursor = conn.cursor()
    cursor.execute("""
    select id, priority, status, deadline, details from task
    where project = 'pymotw' order by deadline
    """)
    for row in cursor.fetchall():
        print '%2d {%d} %-25s [%-8s] (%s)' % (
            row['id'], row['priority'], row['details'], row['status'], row['deadline'],
            )

with sqlite3.connect(':memory:') as conn2:
    print '\nLooking for tasks in second connection...'
    cursor = conn2.cursor()
    cursor.execute("""
    select id, priority, status, deadline, details from task
    where project = 'pymotw' order by deadline
    """)
    for row in cursor.fetchall():
        print '%2d {%d} %-25s [%-8s] (%s)' % (
            row['id'], row['priority'], row['details'], row['status'], row['deadline'],
            )

このサンプルの2番目のクエリはテーブルが存在しないのでエラーで失敗します。コネクション毎に独立したデータベースを作成するので、あるカーソルによる変更は他のコネクションに影響しません。

$ python sqlite3_memory.py
Creating schema
Inserting initial data
Looking for tasks...
 1 {1} write about select        [done    ] (2010-10-03)
 2 {1} write about random        [waiting ] (2010-10-10)
 3 {1} write about sqlite3       [active  ] (2010-10-17)

Looking for tasks in second connection...
Traceback (most recent call last):
  File "sqlite3_memory.py", line 54, in <module>
    """)
sqlite3.OperationalError: no such table: task

データベースのコンテンツをエクスポートする

インメモリデータベースのコンテンツは Connectioniterdump() メソッドを使用して保存されます。 iterdump() が返すイテレータはデータベースの状態を再作成する SQL 命令をまとめて構築する一連の文字列を生成します。

import sqlite3

schema_filename = 'todo_schema.sql'

with sqlite3.connect(':memory:') as conn:
    conn.row_factory = sqlite3.Row
    
    print 'Creating schema'
    with open(schema_filename, 'rt') as f:
        schema = f.read()
    conn.executescript(schema)

    print 'Inserting initial data'
    conn.execute("""
        insert into project (name, description, deadline)
        values ('pymotw', 'Python Module of the Week', '2010-11-01')
        """)
    data = [
        ('write about select', 'done', '2010-10-03', 'pymotw'),
        ('write about random', 'waiting', '2010-10-10', 'pymotw'),
        ('write about sqlite3', 'active', '2010-10-17', 'pymotw'),
        ]
    conn.executemany("""
        insert into task (details, status, deadline, project)
        values (?, ?, ?, ?)
        """, data)

    print 'Dumping:'
    for text in conn.iterdump():
        print text
    

また iterdump() はファイルに保存されたデータベースで使用することもできますが、他の方法で保存されないデータベースを保存するために使用すると最も便利です。

$ python sqlite3_iterdump.py
Creating schema
Inserting initial data
Dumping:
BEGIN TRANSACTION;
CREATE TABLE project (
    name        text primary key,
    description text,
    deadline    date
);
INSERT INTO "project" VALUES('pymotw','Python Module of the Week','2010-11-01');
CREATE TABLE task (
    id           integer primary key autoincrement not null,
    priority     integer default 1,
    details      text,
    status       text,
    deadline     date,
    completed_on date,
    project      text not null references project(name)
);
INSERT INTO "task" VALUES(1,1,'write about select','done','2010-10-03',NULL,'pymotw');
INSERT INTO "task" VALUES(2,1,'write about random','waiting','2010-10-10',NULL,'pymotw');
INSERT INTO "task" VALUES(3,1,'write about sqlite3','active','2010-10-17',NULL,'pymotw');
DELETE FROM sqlite_sequence;
INSERT INTO "sqlite_sequence" VALUES('task',3);
COMMIT;

スレッドとコネクションの共有

歴史的な理由で SQLite の旧バージョンで行わなければならない Connection オブジェクトはスレッド間で共有されません。それぞれのスレッドはデータベースへの独自のコネクションを作成しなければなりません。

import sqlite3
import sys
import threading
import time

db_filename = 'todo.db'
isolation_level = None # autocommit mode

def reader(conn):
    my_name = threading.currentThread().name
    print 'Starting thread'
    try:
        cursor = conn.cursor()
        cursor.execute('select * from task')
        results = cursor.fetchall()
        print 'results fetched'
    except Exception, err:
        print 'ERROR:', err
    return

if __name__ == '__main__':

    with sqlite3.connect(db_filename, isolation_level=isolation_level) as conn:
        t = threading.Thread(name='Reader 1', target=reader, args=(conn,))
        t.start()
        t.join()

スレッド間でコネクションを共有しようとすると例外が発生します。

$ python sqlite3_threading.py
Starting thread
ERROR: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140735239297376 and this is thread id 4315942912

See also

sqlite3
本モジュールの標準ライブラリドキュメント
PEP 249 – DB API 2.0 Specificiation
関係データベースへのアクセスを提供するモジュールの標準インタフェース
SQLite
SQLite ライブラリの公式サイト
shelve
任意の Python オブジェクトを保存するキーバリューストア
SQLAlchemy
SQLite やその他の多くの関係データベースをサポートする人気のあるオブジェクト関係マッパ
Bookmark and Share