pydata

Keep Looking, Don't Settle

SQLAlchemy

1. SQLAlchemy简介

SQLAlchemy是一个与数据库进行交互的库。它可以让你用Python的类和语句创建数据模型来查询数据.它可以连接许多数据库,包括 Postgres,MySQL,SQLite和Oracle等等.

为什么用SQLAlchemy

简单地说,就是用统一的python的格式来写SQLcode.

我们在使用不同的SQL的时候,语法会不尽相同。比如sqlite,mysql,mongodb等等,进行插入操作,新建变量操作等等,都不相同。这样导致每用一个SQL,都要去看一遍语法.使用SQLAlchemy相当于提供了一个统一的接口,这样我们只要写python的code,而不需要去关注具体的SQL的细节部分。

用SQLAlchemy的主要原因是,把你从底层的数据库和SQL奇葩语法中解放出来。SQLAlchemy将常用语句和类型和SQL语句对应起来,让你可以更容易地理解数据库类型,而不需要担心太多细节。这样在处理像Oracle到PostgreSQL数据库这类的迁移工作,或从一个应用数据库到数据仓库时,事情就简单了。它还能确保数据在增加到数据库之前是经过安全的,适当转义处理的。这样可以避免SQL注入之类的事情发生。

SQLAlchemy通过两个主要的模型来实现灵活的操作:SQL表达式语言(通常也叫Core)和ORM(Object-relational mapping,对象关系映射)。这两个模型可以根据你的需要独立使用,也可以合在一起使用。

SQLAlchemy Core和SQL表达式语言

SQL表达式语言是用Pythonic方式的来表达SQL语句和表达式,只是对传统的SQL语言的轻微抽象。它侧重于实用数据库的模式(schema,其实是具体到一个Tabel和View等),但是它实现了不同数据库之间标准化的接口。SQL表达式语言也是SQLAlchemy ORM的基础。

ORM

SQLAlchemy ORM与你在其他语言里遇到的ORM类似。它侧重于应用的Domain Model(一种将数据与其行为集成在一起的模式),借助工作单元的模式来维护对象状态。它还在SQL表达式语言之上增加了一层抽象,让用户可以更容易的操作数据库。你可以把ORM和SQL表达式语言结合起来构建强大的应用。ORM构建了一个声明式的系统,与许多其他ORM模型(如Ruby on Rails)使用的 active-record systems类似。

虽然ORM非常有用,但是你要注意,类的很多用法与数据库的工作方式是不一样的。我们将在后面的章节介绍这些差异。

Core和ORM的选择

究竟是选择Core还是ORM作为应用的数据链接层呢?除了个人喜好,理由可以归结为一些影响因素。这两种模式的语法不太一样,但Core和ORM最大的差异是Core对数据模式和业务对象(business objects)的不同处理方式。

SQLAlchemy Core是以模式为中心,和普通SQL一样有表,键和索引等。SQLAlchemy Core最擅长的时数据仓库,报表分析,以及其他使用数据查询和其他操作可以牢牢掌控的地方。它拥有强大的数据库连接池( connection pool)和数据结果集(ResultSet)优化,非常适合处理大量数据,甚至多数据库也适用。

但是,如果你更侧重于领域驱动设计(domain driven design),那么ORM就可以将原数据和业务对象的底层的模式和结构大部分细节都封装起来。这样封装让数据库连接更简单,更像Python代码。大多数应用都更适合按照这种方法建模。ORM可以用一种非常高效的方法把领域驱动设计方法导入传统应用,或者改造原来带有原始SQL语句的应用。还有一个好处就是,通过对底层数据库的合理抽象,ORM让开发者把精力更多地集中在业务流程的实现上。

不过,ORM是建立在SQLAlchemy Core基础之上的,你可以把处理MySQL的同样方式用于Oracle的数据仓库和Amazon Redshift数据库。当你需要业务对象和仓库数据时,ORM可以无缝的衔接每个环节。

  • 如果你的应用框架已经使用了ORM,但是想要更强大的报表功能,使用Core
  • 如果你不想像普通SQL一样以模式为中心,用ORM
  • 如果你的数据不需要业务对象,用Core
  • 如果你把数据看成业务对象,用ORM
  • 如果要建立快速原型,用ORM
  • 如果你既要业务对象,又要其他数据无关的功能(报表,数据分析等等),两个都用。

知道了如何选择Core和ORM,我们介绍SQLAlchemy的安装与数据库连接方法。

SQLAlchemy安装与数据库连接

SQLAlchemy支持Python 2.6+,Python 3.3+和Pypy 2.1+,强烈推荐conda安装,pip也可以(Python 2.7.5+和Python 3.4+自带pip)。

pip install sqlalchemy

连接数据库

连接数据库需要SQLAlchemy引擎,SQLAlchemy引擎为数据库执行SQL语句创建了一个常用的接口。引擎通过封装一个数据库连接池和方言来实现不同数据库类型统一的接口。这样做使得Python代码不需要关心不同数据库DBAPI之间的差异。SQLAlchemy提供了一个带连接字符串(connection string)和一些参数的函数来创建引擎。连接字符串形式如下:

  • 数据库类型(SQLite,Postgres,MySQL等)
  • 默认数据库类型的方言(Psycopg2,PyMySQL等)
  • 验证信息(用户名和密码)
  • 数据库的位置(文件名或数据库服务器地址)
  • 数据库服务器端口(可选)
  • 数据库名称(可选)

SQLite数据库连接字符串就是一个文件或储存位置。例1-1中,第二行表示SQLAlchemy连接了当前文件夹中的一个SQLite数据库文件cookies.db,第三行是连接内存数据库,第四、五行分别是Unix和Windows系统中的全路径文件。Windows系统路径名称分隔符(\)在Python中是'\\'r'\'

from sqlalchemy import create_engine
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
engine3 = create_engine('sqlite:////home/cookiemonster/cookies.db')
engine3 = create_engine('sqlite:///c:\\Users\\cookiemonster\\cookies.db')
`create_engine`函数创建了一个引擎实例,但是,它并没有真正打开链接,直到一个动作要求引擎执行时才会执行,比如查询或新建数据。

下面再让我们创建一个PostgreSQL数据库mydb。然后我们用函数构建一个引擎实例,如例1-2所示,你会发现我用了postgresql+psycopg2作为连接字符串的引擎和方言部分。

from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/mydb')

同理,我们再看看MySQL引擎的创建,如例1-3所示,我们把参数pool_recycle设置成3600,表示每一小时自动连接一次。

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip'
                       '@mysql01.monster.internal/cookies', pool_recycle=3600)
MySQL默认超过8小时空闲则断开连接。为了绕开这个问题,引擎设置pool_recycle=3600

create_engine函数的可选参数是:

  • echo:表示引擎行为的日志是否显示,像执行的SQL语句和其他参数等等。默认是False
  • ecoding:默认使用SQLAlchemy的字符串编码utf-8,大多数DBAPI都用此编码。
  • isolation_level:SQLAlchemy分离等级。PostgreSQL+psycopg2的分类等级有READ COMMITTEDREAD UNCOMMITTEDREPEATABLE READSERIALIZABLEAUTOCOMMIT五种,默认是READ COMMITTED。PyMySQL也是这五种,InnoDB存储引擎数据库默认是REPEATABLE READ

isolation_level关键词会为具体的DBAPI设置隔离等级,可就像数据库对应连接字符串的键值对一样,比如PostgreSQL是用psycopg2。

pool_recycle:这个参数是指数据库连接多少秒循环检查一次,对MySQL非常重要。默认值为-1,表示没有时间限制,一直连接。

一旦引擎建立,我们就可以连接数据库了。通过connect()函数就可以。

connection = engine.connect()

现在我们已经连接了数据库,可以用SQLAlchemy Core和ORM了。下面这部分,我们将探索SQLAlchemy Core的内容,学习如何定义和查询数据库。

SQLAlchemy Core

连接数据库之后,我们就可以使用SQLAlchemy Core来进行数据库操作了。SQLAlchemy Core是用Pythonic方式的SQL表达式语言来表示SQL命令和数据结构的。SQLAlchemy Core具有的这种特性让它不仅可以用于Django或SQLAlchemy ORM,也可以单独使用。

首先我们需要定义数据表的数据类型,数据的关联性以及其他约束条件。

2.Schema and Types

为了搞定数据库,SQLAlchemy提供了以下三种方式来表示数据库中的表结构:

  • Core:自定义表Table对象
  • ORM:用类class表示数据表
  • SQLsoup和Automap:直接从数据库映射

本章重点介绍第一种方式,即通过SQLAlchemy Core来实现,后面的章节会介绍其他两种方法。Table对象包括了一系列具有特定类型的列和属性,这些通过一个常用的元数据容器来控制。首先我们介绍SQLAlchemy建数据表的数据类型。

Types

SQLAlchemy有四种数据类型:

  • 通用类型(Generic)
  • SQL标准类型(SQL standard)
  • 数据库特有类型(Vendor Specific)
  • 用户自定义类型(User Defined)

SQLAlchemy为不同的数据库定义了一些通用的数据类型。这些类型都在sqlalchemy.types模块中,为了引用方便也支持放在sqlalchemy里面。

布尔型通用类型使用BOOLEAN SQL类型,对应的Python类型就是TrueFalse;但是,对那些不支持BOOLEAN SQL类型的数据库通常要使用SMALLINT来代替。由于SQLAlchemy把这些细节都隐藏了,因此你可以放心大胆的操作数据,不用担心后面数据库用的是什么细节,只要在Python代码里处理TrueFalse就行。即使数据仓库和交换数据库不一样,通用类型也可以完成数据处理。通用类型对应Python和SQL的含义如下表所示:

SQLAlchemy Python SQL
BigInteger int BIGINT
Boolean bool BOOLEAN or SMALLINT
Date datetime.date Date (SQLite: String)
DateTime datetime.datetime DATETIME (SQLite: String)
Enum str ENUM or VARCHAR
Float float or Decimal FLOAT or REAL
Integer int Integer
Interval datetime.timedelta INTERVAL or DATE from epoch
LargeBinary byte BLOB or BYTEA
Numeric decimal.Decimal NUMERIC or DECIMAL
Unicode unicode UNICODE or VARCHAR
Text str CLOB or TEXT
Time datetime.time DATETIME

掌握通用类型非常重要,会经常使用

如果通用类型不能满足需求,也会用到SQL标准类型和数据库专有类型。CHARNVARCHAR类型就是最好的例证,源自SQL类型。如果数据库的模式是在使用SQLAlchemy之前建立的,我们就要注意原模式与SQLAlchemy的差异。SQL标准类型的特性在不同的数据库里面可能有很大变化,也是在sqlalchemy.types模块中,为了和通用类型分开,都用大写字母表示。

数据库专有类型只存在于特定的数据库中。你可以通过SQLALchemy网站或某种数据库的方言(dialect)文档查询此类型,放在sqlalchemy.dialects模块和数据库方言的子模块中,这些类型同样是大写字母。例如,我们想使用PostgreSQL强大的JSON类型,我们可以这样:

from sqlalchemy.dialects.postgresql import JSON

这样我们就可以用PostgreSQL特有的JSON函数为我们的应用定义JSON类型,比如array_to_json。

你也可以根据自己的需要自定义类型。例如,当增加数据记录时,需要对即将存储到VARCHAR列的文本增加前缀,下一次从数据记录中获取这些记录的时候要重新去掉前缀。这样定义类型就可以给那些系统中已经存在的旧数据进行标识,表示这类数据在新应用中并没什么用或者不太重要。

了解了四类数据类型之后,我们来看看如果用元数据组织数据库的结构。

Metadata

元数据用于把数据库结构集成在一起方便SQLAlchemy快速对接。可以把元数据看成是一种表目录,再加一些引擎和链接的信息。这些信息可以通过MetaData.tables查看。读操作是线程安全的,但是,表的建立不完全是线程安全的。元数据使用之前需要导入并初始化。下面让我们建立一个Metadata对象,为本章后面的例子建立容器来盛放数据库的信息目录。

from sqlalchemy import MetaData
metadata = MetaData()

有了盛放数据库结构的地方,我们就可以建立数据表了。

Tables

在SQLAlchemy Core里数据表对象初始化过程是通过Table通过表名称,元数据和数据列的名称,类型和属性共同构建,最终放入MetaData对象的。列对象表示数据表的每个字段,都是用包含名称、数据类型和一些SQL结构与约束特征的Column对象表示。我们将从这里开始建立一些数据表在SQLAlchemy Core部分使用。例2-1如下所示,建立一个网店的饼干库存表。

from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey

cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2))

)

list(cookies.columns)

建立新表之前,我们需要理解表的基础单元——列。

Columns

列定义了数据表中的字段,它们通过我们对其关键词参数的设置来表达具体的含义。不同类型的主要参数不同。例如,字符串类型的基本参数是长度,而带小数的数值类型基本参数是精度和长度。其他类型大都没有基本参数。

有时你也会看到字符串类型没有设置长度。并非所有的数据库都支持这种特性,包括MySQL也不支持。 列也有一些别的参数来帮助它们建立更丰富的特性。我们可以为列设置是否允许空值或必须具有唯一性(unique)。还可以定义初始值(default),通常这么做是为了日志记录或财务审计的需要,如下所示。

from datetime import datetime
from sqlalchemy import DateTime
users = Table('users', metadata,
                Column('user_id', Integer(), primary_key=True),
                Column('username', String(15), nullable=False, unique=True),
                Column('email_address', String(255), nullable=False),
                Column('phone', String(20), nullable=False),
                Column('password', String(25), nullable=False),
                Column('created_on', DateTime(), default=datetime.now),
                Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)
)

list(users.columns)

你会发现这里使用datatime.now,而不是datatime.now()。如果我们直接用函数,那么时间可能就是数据表第一次建立的时间。不带()我们就可以在每条记录生成时产生一个新的时间。

我们通过列关键词参数定义了数据表的结构和约束;但是,列对象有时可能需要和其他表进行关联。当你处理数据库时,你必须告诉SQLAlchemy关于这个数据库内部的模式,结构和约束。假如有一个数据库和SQLAlchemy使用的索引名称模式不同,那么你必须定义索引才能正常使用。下面两个小节将告诉你怎么做。

Keys and Constraints和Index两节的程序其实都可以在Table构造函数里直接实现,也可以通过特定方法在表建立之后实现,再增加到表中。它们都会作为单独一行语句保存到metadata里面。

Keys and Constraints

键和约束是用来保证数据在存到数据库之前能够满足一些约束条件的方法。表示键和约束的对象在SQLAlchemy模块里面都有,常用的三个如下所示:

from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint

主键是最常用的,表示数据表中一条记录的唯一标识,可以用来保证不同表里相关数据之间正确的关系。前面的例1和例2中,我们用primary_key参数将一列设置为主键。你还可以用若干列构成的元组来定义一个复合主键。这个键在表中被看成是一个内容为元组的列,会按照它们原始顺序进行排列。主键还可以在表构建之后再定义,如下所示。你可以用逗号分隔,增加多列形成一个复合主键。如果我们想在上面例2中显示定义主键,可以这样做:

PrimaryKeyConstraint('user_id', name='user_pk')

另一个常用的约束就是唯一性约束,确保一个字段里任意两个值不重复,如果在登录系统里面出现两个用户名是一样的,那就麻烦了。我们也可以用下面的方式定义例2中用户名列的唯一性约束:

UniqueConstraint('username', name='uix_username')

例2里没有用到检查约束类型。这种约束类型是用来保证一列数据与用户定义的条件一致。下面的例子,我们保证unit_cost列永远非负,因为成本不可能小于0。

CheckConstraint('unit_coust >= 0.00', name='unit_coust_positive')

除了键和约束,我们还想更高效的查询一些字段。这就要介绍索引(Indexes)。

Indexes

索引是用来加速查询的,在例1里面,我们把cookie_name加上了索引,因为我们知道我们经常需要查询它们。索引创建之后你就会获得一个ix_cookies_cookie_name索引。我们也可以显式定义一个索引。多列的索引可以通过分号分隔名称来建立。你还可以增加一个参数unique=True来保证索引具有唯一性。如果显式声明索引,它们就会在放在对应列后面。下面的做法与例1相同:

from sqlalchemy import Index

Index('ix_cookies_cookie_name', 'cookie_name')

我们还可以创建了函数索引,不同的数据库可能用法上会有点儿变化。这样可以让你为一些经常查询不常用信息的需求创建检索。例如,如果你想从饼干的SKU号和名称的组合中找SKU0001 Chocolate Chip信息。我们就可以建立一个复合索引来查询:

Index('ix_test', mytable.c.cookie_sku, mytable.c.cookie_name)

下面到了关系型数据库最重要的部分了,就是表的关联关系与定义。

Relationships and ForeignKeyConstraints

现在我们有了一个约束和索引都正确的表,让我们看看表之间的关系如何建立。我们需要一种方法来跟踪订单,包括记录已销售的饼干和数量的信息。表关系图如下所示:

例3实现了line_items表order_id列的关系,就是用ForeignKeyConstraint定义两个表的关系。在本例中,我们有很多line_items表示单个订单。但是,如果你深入到这些订单中,你会发现订单与cookies表的cookie_id外键有关联关系。这是因为line_items表其实与orders和cookies表的一些数据都有关联。关联表用来表示另外两个表之间的多对多关系。通常一个外键用来表示一对多关系,但是如果一个表有多个外键,那么这个表很可能是一个关联表。

from sqlalchemy import ForeignKey, Boolean
orders = Table('orders', metadata,
            Column('order_id', Integer(), primary_key=True),
            Column('user_id', ForeignKey('users.user_id')),
            Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
                Column('line_items_id', Integer(), primary_key=True),
                Column('order_id', ForeignKey('orders.order_id')),
                Column('cookie_id', ForeignKey('cookies.cookie_id')),
                Column('quantity', Integer()),
                Column('extended_cost', Numeric(12, 2))
)

用字符串替换实际的列名可以在多个模块中把表的定义分离,这样就不用担心表加载的顺序了。这是因为,它只会在首次连接表名称和列名称的时候解析字符串执行表查询。如果我们直接用表引用,比如cookies.c.cookie_id,我们的外键在每次模块初始化的时候都要执行一次,如果表加载的顺序特别靠后就会出错。

你可以显式的调研ForeignKeyConstraint定义,在SQLAlchemy里面可以对已经建好的表建立外键,就像其他的键,约束和索引的创建一样。需要先倒入ForeignKeyConstraint模块然后定义。下面的代码是创建一个ForeignKeyConstraint表示line_items表和orders之间order_id外键。

from sqlalchemy import ForeignKeyConstraint

ForeignKeyConstraint(['order_id'], ['orders.order_id'])

前面做的每件事情都是用SQLAlchemy可以理解的方式定义的。如果你的数据库已经存在,而且schema已经建立,你就可以进行查询了。如果你还没建立,下面就介绍如何把数据库建成文件。

Persisting the Tables

表和模式定义与原数据相关。把模式保存到数据库很简单,用metadata的create_all()方法就可以了:

metadata.create_all(engine)

默认情况下,create_all()方法不会重新创建已经存在的数据库,所有运行多次也很安全。相比直接在应用代码里修改数据库,用Alembic那样的迁移工具处理数据库的更新或其他模式是更好的方法。我们将在后面的章节里介绍。现在数据库里面已经有表了,这一章完整的代码如下所示:

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        Boolean, DateTime, ForeignKey, create_engine)
metadata = MetaData()
cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('cookie_sku', String(55)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2))
                )
users = Table('users', metadata,
              Column('user_id', Integer(), primary_key=True),
              Column('customer_number', Integer(), autoincrement=True),
              Column('username', String(15), nullable=False, unique=True),
              Column('email_address', String(255), nullable=False),
              Column('phone', String(20), nullable=False),
              Column('password', String(25), nullable=False),
              Column('created_on', DateTime(), default=datetime.now),
              Column('updated_on', DateTime(),
                     default=datetime.now, onupdate=datetime.now)
              )
orders = Table('orders', metadata,
               Column('order_id', Integer(), primary_key=True),
               Column('user_id', ForeignKey('users.user_id')),
               Column('shipped', Boolean(), default=False)
               )
line_items = Table('line_items', metadata,
                   Column('line_items_id', Integer(), primary_key=True),
                   Column('order_id', ForeignKey('orders.order_id')),
                   Column('cookie_id', ForeignKey('cookies.cookie_id')),
                   Column('quantity', Integer()),
                   Column('extended_cost', Numeric(12, 2))
                   )
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
connection = engine.connect()

本章我们介绍了如何在SQLAlchemy里把元数据当作目录(catalog)来存放表模式和其他数据。我们还定义了带有多列和多约束的表。然后,我们介绍约束的类型和这些约束对列进行的显式定义的方法。其次,我们介绍了默认值的设置和为了查账进行更新值的方法。最后,我们介绍了把模式保存到数据库进行重用的方法。下面我们介绍如何在模式中用SQL表达式语言进行数据的操作。

3.SQLAlchemy Core数据操作

现在数据库里面有了表,让我们来操作它们。首先我们将演示如何增删改查,然后介绍如果排序,组合以及如何使用关系。我们用SQLAlchemy Core提供的SEL(SQL表达式语言)演示。还是用上一章建立的数据库,首先我们看看如何新建数据。

Inserting Data

首先,我们在cookies表新建一行我最喜欢的饼干(巧克力味的)。用cookies表的insert()方法,然后在values()语句里面设置各个列的值就可以了。如下所示:

ins = cookies.insert().values(
    cookie_name="chocolate chip",
    cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
    cookie_sku="CC01",
    quantity="12",
    unit_cost="0.50"
)
print(str(ins))
INSERT INTO cookies (cookie_name, cookie_recipe_url, cookie_sku, quantity, unit_cost) VALUES (:cookie_name, :cookie_recipe_url, :cookie_sku, :quantity, :unit_cost)

上面显示了对应的SQL语句。每一列的值都用:column_name代替了,SQLAlchemy的str()函数就是这样显示的。数值都进行过清洗和转义处理,保证数据安全,避免SQL注入攻击。因为不同种类的数据库处理参数值的方言可能有点差别,所以通过编译版本的语句可以看到输入的内容。ins对象的compile()方法会返回一个SQLAlchemy对象,通过params属性就可以看到数值了。

ins.compile().params
{'cookie_name': 'chocolate chip',
 'cookie_recipe_url': 'http://some.aweso.me/cookie/recipe.html',
 'cookie_sku': 'CC01',
 'quantity': '12',
 'unit_cost': '0.50'}

ins.compile()通过方言编译数据,但是并没执行,因此我们需要用params属性查看。

介绍了新建语句的用法之后,我们用connection的execute()方法把数据加入数据表。

result = connection.execute(ins)

我们还可以用inserted_primary_key属性查看刚刚新建数据的ID号。

result.inserted_primary_key
[1]

我们简单介绍一下excute()执行的过程。当我们建立前面那条SQL表达式语言的插入语句时,实际上是创建了一个可以快速向下遍历的树状结构。当我们调用excute()方法时,它把刚刚传入的语句和其他任何参数一起编译成对应数据库方言编译器能够识别的语句。编译器通过遍历那个树状结构建成一个普通的SQL语句。这个语句返回到excute()方法,excute()方法通过绑定的连接把语句传递到数据库。数据库服务器就执行SQL语句然后把结果返回给excute()方法。

insert除了可以作为表对象的实例方法,也可以当作顶层函数使用,这样可以把表对象作为参数,更具灵活性。例如,假如公司的两个部门分别拥有相互独立的库存数据,就可以按照例3-3的形式再插入一行数据。

from sqlalchemy import insert
ins = insert(cookies).values(
    cookie_name="chocolate chip",
    cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
    cookie_sku="CC01",
    quantity="12",
    unit_cost="0.50"
)

虽然insert的表对象方法和更具一般性的函数两种形式结果一样,我还是更喜欢后者,因为它更接近SQL语句的用法。

连接对象的execute()方法不仅仅只是处理语句,还可以把values当作execute()方法的参数。当语句被编译时,它会把每个关键词参数的键增加到字段列表中,然后再把每个字段对应的值增加到SQL语句的VALUE参数里。

ins = cookies.insert()
result = connection.execute(
    ins,
    cookie_name='dark chocolate chip',
    cookie_recipe_url='http://some.aweso.me/cookie/recipe_dark.html',
    cookie_sku='CC02',
    quantity='1',
    unit_cost='0.75'
)
result.inserted_primary_key
[2]

不过这种形式并不常用,但是,它为语句在传到数据库服务器之前的编译和组织方式提供了一个很好的解释。我们可以用放了字段和数值词典的列表一次性插入多个记录。让我们把两种饼干的库存数据插入cookies表。

ins = cookies.insert()
inventory_list = [
    {
        'cookie_name': 'peanut butter',
        'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
        'cookie_sku': 'PB01',
        'quantity': '24',
        'unit_cost': '0.25'
    },
    {
        'cookie_name': 'oatmeal raisin',
        'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
        'cookie_sku': 'EWW01',
        'quantity': '100',
        'unit_cost': '1.00'
    }
]
result = connection.execute(ins, inventory_list)

列表中的每个词典都要有同样的键(keys)。首先方言编译器会编译第一个词典的语句内容,如果后面词典的键与第一条不同就会失败,因为第一条的字段已经建好了。 现在有了数据,我们就可以查询了。

Querying Data
from sqlalchemy.sql import select
s = select([cookies])
rp = connection.execute(s)
results = rp.fetchall()
results
[(1, 'chocolate chip', 'http://some.aweso.me/cookie/recipe.html', 'CC01', 12, Decimal('0.50')),
 (2, 'dark chocolate chip', 'http://some.aweso.me/cookie/recipe_dark.html', 'CC02', 1, Decimal('0.75')),
 (3, 'peanut butter', 'http://some.aweso.me/cookie/peanut.html', 'PB01', 24, Decimal('0.25')),
 (4, 'oatmeal raisin', 'http://some.okay.me/cookie/raisin.html', 'EWW01', 100, Decimal('1.00'))]
print(str(s))
SELECT cookies.cookie_id, cookies.cookie_name, cookies.cookie_recipe_url, cookies.cookie_sku, cookies.quantity, cookies.unit_cost 
FROM cookies

和insert用法类似,select也是既可以作为表对象的实例方法,也可以作为更具一般性的顶层函数使用。我更喜欢顶层函数的使用方式,因为和SQL用法一样。

from sqlalchemy.sql import select
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()
results
[(1, 'chocolate chip', 'http://some.aweso.me/cookie/recipe.html', 'CC01', 12, Decimal('0.50')),
 (2, 'dark chocolate chip', 'http://some.aweso.me/cookie/recipe_dark.html', 'CC02', 1, Decimal('0.75')),
 (3, 'peanut butter', 'http://some.aweso.me/cookie/peanut.html', 'PB01', 24, Decimal('0.25')),
 (4, 'oatmeal raisin', 'http://some.okay.me/cookie/raisin.html', 'EWW01', 100, Decimal('1.00'))]
ResultProxy

ResultProxy是对数据库API光标对象的封装,其主要目的是让数据操作更简单。例如,它可以通过索引,字段名称和列对象让查询操作更简单,演示如例3-8所示。任何一种方法查询数据都很简单。

first_row = results[0]

first_row[1]
'chocolate chip'
first_row.cookie_name
'chocolate chip'
first_row[cookies.c.cookie_name]
'chocolate chip'

上面的输出结果都是results变量第一个记录的数据chocolate chip。接入灵活只是ResultProxy能力的一部分。我们还可以用ResultProxy实现循环。例如,我们可以打印所有饼干的名称。

rp = connection.execute(s)
for record in rp:
    print(record.cookie_name)
chocolate chip
dark chocolate chip
peanut butter
oatmeal raisin

ResultProxy除了可以循环,或调用fetchall()方法之外,很多其他数据接入方式也可以用。其实,上例中所有的result变量所有的插入数据操作都是用ResultProxy实现的。rowcount()和inserted_primary_key()方法也是ResultProxy获取信息的一种方式。你也可以用下面的方法获取信息:

  • first()——如果存在则返回第一条记录并关闭连接
  • fetchone()——返回一条记录,光标继续开着,等待新的查询
  • scalar()——如果查询结果只有一行一列,就返回一个值

如果要看结果中所有列名,可以用keys()方法。后面的章节里,我们还会经常使用first,scalar,fetchone,fetchall方法和ResultProxy循环。

产品代码 写产品代码的时候,我有几条原则:

  • first获取一条记录,不用scalar和fetchone,因为这样更清晰
  • 用ResultProxy循环,不用fetchall和fetchone方法。这样内存开销更新,因为通常我们都是一次处理一条记录
  • 尽量不要用fetchone,因为它会让连接一直开着
  • scalar也要少用,因为当查询返回多余一行一列数据的时候容易出错,这一点在测试的时候经常被忽略

在上例中,每一次我们查询数据集的时候每条记录的所有列都会返回。而通常我们只需要一部分列就可以。如果数据非常大,这样查询就会非常耗费内存,查询速度就会很慢。SQLAlchemy不会为查询或ResultProxy增加负担;但是,通常查询完成后,你需要看看查询是否消耗了太多内存。下面我们就来介绍如何控制查询的范围。

控制查询的列

可以用select()方法将要查询的列以列表形式放入。例如,下面代码是只需要查看饼干的名称和质量时的操作。

s = select([cookies.c.cookie_name, cookies.c.quantity])
rp = connection.execute(s)
print(rp.keys())
result = rp.first()
['cookie_name', 'quantity']
result
('chocolate chip', 12)

这样我们就建立了简单的select语句,我们将看看其他改变选择结果的操作。首先我们看看如何改变顺序。

Ordering

如果在上面例10中你要查看所有的数据结果,你会发现名称排序很混乱。但是,如果我们想让名称按照指定顺序排列,可用select的order_by()语句。

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
rp = connection.execute(s)
for cookie in rp:
    print('{} - {}'.format(cookie.quantity, cookie.cookie_name))
1 - dark chocolate chip
12 - chocolate chip
24 - peanut butter
100 - oatmeal raisin

如果要逆序排列,就在order_by()里面增加desc()。用desc()包裹排序参数即可。

desc()也可以当成列对象的方法来使用,

from sqlalchemy import desc
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(desc(cookies.c.quantity))
rp = connection.execute(s)
for cookie in rp:
    print('{} - {}'.format(cookie.quantity, cookie.cookie_name))
100 - oatmeal raisin
24 - peanut butter
12 - chocolate chip
1 - dark chocolate chip

还可以限制查询记录结果的数量。

Limiting

前面的例子里,first()和fetchone()方法是用来获得一行记录的。ResultProxy可以提供一行数据,实际中我们经常需要多行数据。如果我们想限制查询数量,我们可以用limit()函数。例如,如果我现在想做销量最好的两种饼干,那么对排序后的查询增加一个限制条件就可以了。

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
print([result.cookie_name for result in rp])
['dark chocolate chip', 'chocolate chip']

现在我们知道饼干的种类了,我开始关心库存还有多少。很多数据库都有一堆SQL函数对数据进行统计,必然SUM等等,让我们看看这些函数如何使用。

Builtin SQL Functions and Labels

SQLAlchemy的SQL函数里最常用的是SUM()和COUNT()。使用这个函数之前我们需要导入sqlalchemy.sql.func模块,这些函数只要包裹列对象就可以运行。所以要统计饼干的总量,我们可以这样:

from sqlalchemy import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())
137

因为Python内置函数也有sum,所以使用SQLAlchemy的sum时,建议导入func用func.sum。

现在让我们用count函数统计cookie表里有多少条库存记录。

s = select([func.count(cookies.c.quantity)])
rp = connection.execute(s)
record = rp.first()
print(record.keys()) # 显示Resultproxy里的列
print(record.count_1) # 列名是自动生成的,命名方式是`<函数名>_<位置>`
['count_1']
4

这个列名有点隐蔽。如果我们想自定义名称,也可以用在count()函数后用label()函数来定义列名。例如,我们想用更好记的名称来定义记录数量。

s = select([func.count(cookies.c.cookie_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)
['inventory_count']
4

介绍了如何限制行和列的数量之后,我们再看看如何进行数据过滤。

Filtering

过滤查询和SQL一样用where()函数来实现。通常where()函数有一个列名称,一个操作符和一个值或列。也可以用连接多个where()语句,像SQL里面逻辑操作符AND的作用。下面我们来找名称为chocolate chip的饼干。

s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
record = rp.first()
record.items() # 显示所有列名和数值
[('cookie_id', 1),
 ('cookie_name', 'chocolate chip'),
 ('cookie_recipe_url', 'http://some.aweso.me/cookie/recipe.html'),
 ('cookie_sku', 'CC01'),
 ('quantity', 12),
 ('unit_cost', Decimal('0.50'))]

我们还可以用where()找包含chocolate的饼干名称。

s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
    print(record.cookie_name)
chocolate chip
dark chocolate chip

where()语句里我们用cookies.c.cookie_name作为过滤的ClauseElement类型。我们现在停下来,看看ClauseElement类型的其他功能。

Operators

ClauseElements就是我们在从句中使用的一个元素,通常都是数据表的列;但是,和列不同,ClauseElements有许多功能。在例3-18里面,我们用来like()方法,其实还有很多方法如表3-1所示。每个方法都和标准SQL里面的函数类似。后面会大量使用它们。

方法 目的
between(cleft, cright) Find where the column is between cleft and cright
concat(column_two) Concatenate column with column_two
distinct() Find only unique values for column
in_([list]) Find where the column is in the list
is_(None) Find where the column is None (commonly used for Null checks with None)
contains(string) Find where the column has string in it (Case-sensitive)
endswith(string) Find where the column ends with string (Case-sensitive)
like(string) Find where the column is like string (Case-sensitive)
startswith(string) Find where the column begins with string (Case-sensitive)
ilike(string) Find where the column is like string (NOT Case-sensitive)

还有两个相反操作的函数notlike()和notin_(),以及一个不带下划线的函数isnot()。

如果我们不用这些方法,我们还可以用运算符。大部分运算符都和你的习惯一样,不过我们要介绍一些奇怪的地方。

操作符

到目前为止我们过滤数据都是通过判断列是否等于一个数值,或者用ClauseElement方法的函数,比如like();其实我们还可以用很多运算符号来过滤数据。SQLAlchemy提供了大量Python的标准运算符。这包括所有的标准比较运算符(==,!=,<,>,>=),与Python语句里使用方法一致。==操作符在和None比较时还表示IS NULL。算法运算符(+,-,*,/,%)也可用于数据库字符串处理,如例3-19所示。

s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku])
for row in connection.execute(s):
    print(row)
('chocolate chip', 'SKU-CC01')
('dark chocolate chip', 'SKU-CC02')
('peanut butter', 'SKU-PB01')
('oatmeal raisin', 'SKU-EWW01')

另一个常用操作就是计算多列的数值。通常做财务和统计报表时经常用到,例3-20计算库存金额:

from sqlalchemy import cast
s = select([cookies.c.cookie_name,
            cast((cookies.c.quantity * cookies.c.unit_cost),
                 Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
    print('{} - {}'.format(row.cookie_name, row.inv_cost))
chocolate chip - 6.00
dark chocolate chip - 0.75
peanut butter - 6.00
oatmeal raisin - 100.00
Boolean Operators

SQLAlchemy也支持逻辑运算符(与AND,或OR,非NOT),用位操作符表示(&,|,~)。使用时一定要注意,因为Python自带的逻辑运算符规则。比如,&比<优先级高,当你写A < B & C < D的时候,运算结果是A < (B & C) < D,而你实际想写的是(A < B) & (C < D)。为了清晰表达意思,请用连接词(conjunctions),不要用这些重载运算符。

通常在处理多个从句时,从句之间存在与或非关系时,应该用连接词。

Conjunctions

虽然可以把多个where()从句连起来,但是用连接词会让语句更清晰好看。SQLAlchemy的连接词是and_(),or_()和not_()。如果我们想要查询库存和单价满足特定条件的饼干时,可以用and_()实现。

from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    and_(
        cookies.c.quantity > 23,
        cookies.c.unit_cost < 0.40
    )
)
for row in connection.execute(s):
    print(row.cookie_name)
peanut butter

or_()函数与and_()相反,包含任何一种情形就可以。如果我们想找到库存在10到50之间,或名称包含chip的饼干时,可以用or_。

from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    or_(
        cookies.c.quantity.between(10, 50),
        cookies.c.cookie_name.contains('chip')
    )
)
for row in connection.execute(s):
    print(row.cookie_name)
chocolate chip
dark chocolate chip
peanut butter

not()连接词类似,只是表示不包含条件的结果。到这里我们已经可以轻松的查询数据了,下面我们看看如何更新数据。

Updating Data

还有一个数据更新方法,和前面用的insert方法类似,除了它们需要指定一个条件表面要更新的行,语法与insert完全一致。更新方法可以用update()函数或待更新表的update()方法。如果不增加条件,就会更新表中所有行。当我做完新的饼干之后,就要更新库存数据。在例3-23中,我们更新对应饼干的库存,然后计算新的总库存量。

from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == 'chocolate chip')
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
print(result.rowcount)

s = select([cookies]).where(cookies.c.cookie_name == "chocolate chip")
result = connection.execute(s).first()
for key in result.keys():
    print('{:>20}: {}'.format(key, result[key]))
1
cookie_id: 1
cookie_name: chocolate chip
cookie_recipe_url: http://some.aweso.me/cookie/recipe.html
cookie_sku: CC01
quantity: 132
unit_cost: 0.50

除了更新数据,还要删除数据。

Deleting Data

删除数据可以用delete()函数或表的delete()方法。和insert(),update()不同的是,delete()无数值参数,只有一个可选的where()用于设置删除区域(如果没有就删除全表所有数据)。

from sqlalchemy import delete

u = delete(cookies).where(cookies.c.cookie_name == 'dark chocolate chip')
result = connection.execute(u)
print(result.rowcount)

s = select([cookies]).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(s).fetchall()
print(len(result))
1
0

现在让我们回想一下所学的知识,对users,orders,line_items表进行更新。你可以直接复制代码,但是建议你试试其他方法新建数据。

customer_list = [
    {
        'username': 'cookiemon',
        'email_address': 'mon@cookie.com',
        'phone': '111-111-1111',
        'password': 'password'
    },
    {
        'username': 'cakeeater',
        'email_address': 'cakeeater@cake.com',
        'phone': '222-222-2222',
        'password': 'password'
    },
    {
        'username': 'pieguy',
        'email_address': 'guy@pie.com',
        'phone': '333-333-3333',
        'password': 'password'
    }
]
ins = users.insert()
result = connection.execute(ins, customer_list)

有了用户数据,让我们再更新他们的orders,line_items表。

from sqlalchemy import insert
ins = insert(orders).values(user_id=1, order_id=1)
result = connection.execute(ins)
ins = insert(line_items)
order_items = [
    {
        'order_id': 1,
        'cookie_id': 1,
        'quantity': 2,
        'extended_cost': 1.00
    },
    {
        'order_id': 1,
        'cookie_id': 3,
        'quantity': 12,
        'extended_cost': 3.00
    }
]
result = connection.execute(ins, order_items)

ins = insert(orders).values(user_id=2, order_id=2)
result = connection.execute(ins)
ins = insert(line_items)
order_items = [
    {
        'order_id': 2,
        'cookie_id': 1,
        'quantity': 24,
        'extended_cost': 12.00
    },
    {
        'order_id': 2,
        'cookie_id': 4,
        'quantity': 6,
        'extended_cost': 6.00
    }
]
result = connection.execute(ins, order_items)

在SQLAlchemy Core的第二章中我们介绍过外键和关系;但是我们还没有用它们做过查询,下面我们就来看看这些关系。

Joins

用join()和outerjoin()方法进行数据关联。例如,发货之前需要了解用户cookiemon订购了那种饼干。这就需要用3个join来汇总三张表的数据。另外,当用多个join汇总数据时,你可能需要重新组织from后面join关联内容的顺序,SQLAlchemy提供了select_from来实现这个功能。通过select_from我们可以将整个from从句替换成SQLAlchemy支持的形式。

columns = [orders.c.order_id, users.c.username, users.c.phone,
            cookies.c.cookie_name, line_items.c.quantity,
            line_items.c.extended_cost]
cookiemon_orders = select(columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(
                    line_items).join(cookies)).where(users.c.username ==
                    'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
for row in result:
    print(row)
(1, 'cookiemon', '111-111-1111', 'chocolate chip', 2, Decimal('1.00'))
(1, 'cookiemon', '111-111-1111', 'peanut butter', 12, Decimal('3.00'))

上面的SQL语句是这样:

print(str(cookiemon_orders))
SELECT orders.order_id, users.username, users.phone, cookies.cookie_name, line_items.quantity, line_items.extended_cost 
FROM orders JOIN users ON users.user_id = orders.user_id JOIN line_items ON orders.order_id = line_items.order_id JOIN cookies ON cookies.cookie_id = line_items.cookie_id 
WHERE users.username = :username_1

统计所有用户各自的订单数量也常用,不只是当前的订单。可以通过outerjoin()方法实现,需要注意join的顺序,因为用outerjoin()方法生成的表会返回所有外键匹配结果。

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username) #后面介绍分组
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)
('cakeeater', 1)
('cookiemon', 1)
('pieguy', 0)

现在,我们已经可以通过join实现关联查询了。但是,如果我们有一个像员工与老板关系表,用SQLAlchemy清楚的读取和理解内容需要用alias。

Aliases

用join的时候,通常要对一个表进行多次引用。在SQL里面,这是通过查询中的aliases实现的。例如,假设我们有下面的schema结构:

employee_table = Table(
    'employee', metadata,
    Column('id', Integer, primary_key=True),
    Column('manager_id', None, ForeignKey('employee.id')),
    Column('name', String(255)))

现在假如我们想选择Fred手下的所有员工。在SQL里面,我们会这么做:

SELECT employee.name
FROM employee, employee AS manager
WHERE employee.manager_id = manager.id
AND manager.name = 'Fred'

SQLAlchemy也允许用alias()方法实现:

manager = employee_table.alias('mgr')

stmt = select([employee_table.c.name],
             and_(employee_table.c.id==manager.c.id,
                 manager.c.name=='Fred'))

print(stmt)
SELECT employee.name 
FROM employee, employee AS mgr 
WHERE employee.id = mgr.id AND mgr.name = :name_1

SQLAlchemy也可以自动选择别名:

manager = employee_table.alias()

stmt = select([employee_table.c.name],
             and_(employee_table.c.id==manager.c.id,
                 manager.c.name=='Fred'))

print(stmt)
SELECT employee.name 
FROM employee, employee AS employee_1 
WHERE employee.id = employee_1.id AND employee_1.name = :name_1

数据分组group_by也是很有用的,下面我们来看看。

Grouping

SQLAlchemy可以对一个或多个列进行数据分组,然后统计各组的计数项(count),总和(sum)和其他统计参数,与SQL类似。下面我们看看每个客户的订单数:

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)
('cakeeater', 1)
('cookiemon', 1)
('pieguy', 0)

前面的join的例子中我们已经用过,这里再看看group_by的用法。

Chaining

前面我们已经用过链式表达式,只是那时没有明说。进行数据查询时链式表达式能够清楚地显示查询的逻辑。因此,如果我们想用一个函数获取订单数据,可以如例3-28所示。

def get_orders_by_customer(cust_name):
    columns = [orders.c.order_id, users.c.username, users.c.phone,
                cookies.c.cookie_name, line_items.c.quantity, 
                line_items.c.extended_cost]
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(users.join(orders).join(line_items).join(cookies))
    cust_orders = cust_orders.where(users.c.username == cust_name)
    result = connection.execute(cust_orders).fetchall()
    return result

get_orders_by_customer('cakeeater')
[(2, 'cakeeater', '222-222-2222', 'chocolate chip', 24, Decimal('12.00')),
 (2, 'cakeeater', '222-222-2222', 'oatmeal raisin', 6, Decimal('6.00'))]

如果我们只想显示已发货或未发货的订单,怎么办?我们可以再写函数来支持其他选项,或者我们可以用链式查询来过滤。推荐后一种方法,尤其是处理复杂查询和制作报表时威力非常强大。

def get_orders_by_customer(cust_name, shipped=None, details=False):
    columns = [orders.c.order_id, users.c.username, users.c.phone]
    joins = users.join(orders)
    if details:
        columns.extend([cookies.c.cookie_name, line_items.c.quantity,
                        line_items.c.extended_cost])
        joins = joins.join(line_items).join(cookies)
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(joins)
    cust_orders = cust_orders.where(users.c.username == cust_name)
    if shipped is not None:
        cust_orders = cust_orders.where(orders.c.shipped == shipped)
    result = connection.execute(cust_orders).fetchall()
    return result

get_orders_by_customer('cakeeater')
[(2, 'cakeeater', '222-222-2222')]
get_orders_by_customer('cakeeater', details=True)
[(2, 'cakeeater', '222-222-2222', 'chocolate chip', 24, Decimal('12.00')),
 (2, 'cakeeater', '222-222-2222', 'oatmeal raisin', 6, Decimal('6.00'))]
get_orders_by_customer('cakeeater', shipped=True)
[]
get_orders_by_customer('cakeeater', shipped=False)
[(2, 'cakeeater', '222-222-2222')]
get_orders_by_customer('cakeeater', shipped=False, details=True)
[(2, 'cakeeater', '222-222-2222', 'chocolate chip', 24, Decimal('12.00')),
 (2, 'cakeeater', '222-222-2222', 'oatmeal raisin', 6, Decimal('6.00'))]

到此为止,我们已经用SQL表达式语言的例子演示了SQLAlchemy Core。其实,你也可以直接用标准的SQL语言来进行。

Raw Queries

在SQLAlchemy Core里面可以用原始的SQL语句进行操作,结果也是返回一个代理,后面的操作和SQLAlchemy Core的SQL表达式语法一样。除非必须,一般不推荐使用原始SQL,因为这么做存在安全隐患。下面我们简单演示一下。

result = connection.execute('select * from orders').fetchall()
print(result)
[(1, 1, 0), (2, 2, 0)]

有时候一些SQL小片段可以让语句表述更清晰。下面是用text()函数实现where的条件。

from sqlalchemy import text
stmt = select([users]).where(text("username='cookiemon'"))
print(connection.execute(stmt).fetchall())
[(1, None, 'cookiemon', 'mon@cookie.com', '111-111-1111', 'password', datetime.datetime(2015, 9, 1, 21, 37, 37, 531465), datetime.datetime(2015, 9, 1, 21, 37, 37, 531465))]

通过本章介绍的增删改查等操作,现在你应该已经理解SQLAlchemy Core的SQL表达式用法了,用自己的数据库试试吧。下面我们来介绍SQLAlchemy的异常处理,以及事务处理transactions分组的用法。

Exceptions and Transactions

前面做数据处理时,大部分工作我们都只用一行语句就搞定了,我们尽量避免做任何可能引起异常的事情,而本章我们会刻意搞点bug来演示异常处理方法。另外,我们还会介绍如何把对需要处理的任务分成单独的事务进行处理,保证每个事务都可以被适当地执行或正确地清理。

Exceptions

SQLAlchemy会产生许多不同类型的异常;不过这里只重点介绍一些常见的异常:AttributeErrors和IntegrityErrors。通过对常用异常处理方法的学习,你可以掌握其他异常的处理方法。

下面,请重新开一个Python shell或Notebook,然后用SQLAlchemy Core建立数据表。具体过程如例4-1所示。

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        DateTime, ForeignKey, Boolean, create_engine,
                        CheckConstraint)
metadata = MetaData()
cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('cookie_sku', String(55)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2)),
                CheckConstraint('quantity > 0', name='quantity_positive')
                )
users = Table('users', metadata,
              Column('user_id', Integer(), primary_key=True),
              Column('username', String(15), nullable=False, unique=True),
              Column('email_address', String(255), nullable=False),
              Column('phone', String(20), nullable=False),
              Column('password', String(25), nullable=False),
              Column('created_on', DateTime(), default=datetime.now),
              Column('updated_on', DateTime(),
                     default=datetime.now, onupdate=datetime.now)
              )
orders = Table('orders', metadata,
               Column('order_id', Integer()),
               Column('user_id', ForeignKey('users.user_id')),
               Column('shipped', Boolean(), default=False)
               )
line_items = Table('line_items', metadata,
                   Column('line_items_id', Integer(), primary_key=True),
                   Column('order_id', ForeignKey('orders.order_id')),
                   Column('cookie_id', ForeignKey('cookies.cookie_id')),
                   Column('quantity', Integer()),
                   Column('extended_cost', Numeric(12, 2))
                   )
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
connection = engine.connect()

metadata.tables.keys()
dict_keys(['users', 'cookies', 'line_items', 'orders'])
AttributeError

AttributeError是指获取一个不存在的属性时产生的异常。经常是在连接ResultProxy里没有的一列时发生。在尝试获取一个对象不存在的属性时AttributeError也会发生。在普通的Python代码里也会发生。这里单独拿出来说是因为SQLAlchemy里非常容易出现这类异常,而其产生的根源却很容易忽略。为了演示这类异常,我们在users表里插入一列数据,然后我们查询没有selsect的一列来诱发异常。

from sqlalchemy import select, insert
ins = insert(users).values(
    username="cookiemon",
    email_address="mon@cookie.com",
    phone="111-111-1111",
    password="password"
)
result = connection.execute(ins)

s = select([users.c.username])
results = connection.execute(s)
for result in results:
    print(result.username)
    print(result.password)
    cookiemon
    ---------------------------------------------------------------------------
    KeyError                                  Traceback (most recent call last)
    d:\programfiles\Miniconda3\lib\site-packages\sqlalchemy\engine\result.py in __getitem__(self, key)
         69             try:
    ---> 70                 processor, obj, index = self._keymap[key]
         71             except KeyError:

    KeyError: 'password'

    During handling of the above exception, another exception occurred:

    NoSuchColumnError                         Traceback (most recent call last)
    d:\programfiles\Miniconda3\lib\site-packages\sqlalchemy\engine\result.py in __getattr__(self, name)
         95             try:
    ---> 96                 return self[name]
         97             except KeyError as e:

    d:\programfiles\Miniconda3\lib\site-packages\sqlalchemy\engine\result.py in __getitem__(self, key)
         71             except KeyError:
    ---> 72                 processor, obj, index = self._parent._key_fallback(key)
         73             except TypeError:

    d:\programfiles\Miniconda3\lib\site-packages\sqlalchemy\engine\result.py in _key_fallback(self, key, raiseerr)
        405                     "Could not locate column in row for column '%s'" %
    --> 406                     expression._string_or_unprintable(key))
        407             else:

    NoSuchColumnError: "Could not locate column in row for column 'password'"

    During handling of the above exception, another exception occurred:

    AttributeError                            Traceback (most recent call last)
    <ipython-input-9-c4520631a10a> in <module>()
          3 for result in results:
          4     print(result.username)
    ----> 5     print(result.password)

    d:\programfiles\Miniconda3\lib\site-packages\sqlalchemy\engine\result.py in __getattr__(self, name)
         96                 return self[name]
         97             except KeyError as e:
    ---> 98                 raise AttributeError(e.args[0])
         99 
        100 

    AttributeError: Could not locate column in row for column 'password'

在例4-2中我们看到Python抛出了AttributeError并停止了程序,我们可以看到AttributeError是Python里常见的形式。首先显示错异常的类型,然后一个箭头会指明异常发生的位置,紧接着是异常发生源代码。最后一行会显示异常的具体内容,它会显示异常类型,以及为什么发生异常。这里出现异常的原因是ResultProxy里面没有password列,我们只查询了username列。这是在使用SQLAlchemy对象时出现的一个常见Python异常,还有一些SQLAlchemy自己特有的异常。下面就是其中一个:IntegrityError。

IntegrityError

IntegrityError是另一个SQLAlchemy常见的异常,当我们做了不符合数据表或字段约束条件的事情时就会发生。当你请求的数据是唯一的,比如users表中的username,想创建两个同名用户时就会产生IntegrityError,演示程序如下所示。

s = select([users.c.username])

connection.execute(s).fetchall()
[('cookiemon',)]
ins = insert(users).values(
    username="cookiemon",
    email_address="damon@cookie.com",
    phone="111-111-1111",
    password="password"
)

result = connection.execute(ins)

和前面的AttributeError类似。首先显示错异常的类型,然后一个箭头会指明异常发生的位置,紧接着是异常发生源代码。最后一行会显示异常的具体内容,它会显示异常类型,以及为什么发生异常。这里指明了发生的原因:

UNIQUE constraint failed: users.username 这就告诉我们在users表的username里面插入同名用户是不允许的。之后的内容是SQLAlchemy表达式转变成的SQL语句,我们在第三章里介绍过,还有我们计划插入却产生异常的数据。程序在这里停止。

当然会有很多种异常类型,这里介绍的两种是最常见的。SQLAlchemy里所有异常发生都会按照这两种方式产生。具体异常的内容请查看SQLAlchemy文档。

为了保证程序在发生异常时继续运行,我们需要实现异常处理方法。

Handling Errors

要防止异常中断程序,我们需要正确地处理异常。这与Python的异常处理方法一样,用try/except代码块实现。例如,我们可以用try/except代码块捕捉异常显示信息,然后让后面的程序继续运行。

from sqlalchemy.exc import IntegrityError

ins = insert(users).values(
    username="cookiemon",
    email_address="damon@cookie.com",
    phone="111-111-1111",
    password="password"
)
try:
    result = connection.execute(ins)
except IntegrityError as error:
    print(error)
(sqlite3.IntegrityError) UNIQUE constraint failed: users.username [SQL: 'INSERT INTO users (username, email_address, phone, password, created_on, updated_on) VALUES (?, ?, ?, ?, ?, ?)'] [parameters: ('cookiemon', 'damon@cookie.com', '111-111-1111', 'password', '2015-09-01 13:45:18.991258', '2015-09-01 13:45:18.992259')]

虽然和前面例子代码一样,但是因为有try/except处理IntegrityError异常,所有结果就是简单的异常信息。这个例子只是演示了异常打印功能,其实我们可以在异常处理中写任何Python代码。返回异常信息告诉用户刚刚的操作失败了是很有用的做法。异常处理完成后,程序还会继续运行。虽然这里只处理了IntegrityError,但对其他SQLAlchemy异常也适用。

try/except里面的代码越少越好。因为代码太多可能会出现你意料之外的异常,不符合你捕捉的目标。

用传统的Python方法可以捕捉一行语句的异常,如果我们有多个数据库语句,彼此之间互相关联,这个方法可能就不适用了。这时,我们需要将那些语句封装成一个数据库事务,SQLAlchemy提供了一个简单的包装来建立对象之间的链接:transactions。

Transactions

我们不需要学习一堆数据库理论,可以把事务看成是一种确保多个数据库语句打包成一组运行成功或运行失败的处理方式。当启动一个事务的时候,我们记录数据库的状态,然后执行多条SQL语句。如果所有的SQL语句都能顺利执行,数据库就会不断的更新状态,忽略前面的状态,如下图所示。 但是,如果有一条语句运行失败了,整个数据库就要回退(rollback)到原来的状态,如下图所示。 举个我们可能会在之前建立的数据库里操作的例子。当客户买了我们的饼干之后,我们就需要把饼干邮寄给客户,同时更新库存量。但是,如果我们没有足够的饼干库存履行客户的订单需求,怎么办呢?我们就查检查库存,并且不邮寄订单。这就可以用事务解决。

我们再新建一个Python Shell或Notebook,还用第三章的表,只是为quantity字段增加一个CheckConstraint限制条件,即库存量不能小于0。然后我们创建用户cookiemon,并设置chocolate chip和dark chocolate chip cookie的库存量,其中chocolate chip库存量为12,dark chocolate chip cookie库存量为1,具体程序如下所示。

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        DateTime, ForeignKey, Boolean, create_engine,
                        CheckConstraint)

metadata = MetaData()
cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('cookie_sku', String(55)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2)),
                CheckConstraint('quantity >= 0', name='quantity_positive')
                )
users = Table('users', metadata,
              Column('user_id', Integer(), primary_key=True),
              Column('username', String(15), nullable=False, unique=True),
              Column('email_address', String(255), nullable=False),
              Column('phone', String(20), nullable=False),
              Column('password', String(25), nullable=False),
              Column('created_on', DateTime(), default=datetime.now),
              Column('updated_on', DateTime(),
                     default=datetime.now, onupdate=datetime.now)
              )
orders = Table('orders', metadata,
               Column('order_id', Integer()),
               Column('user_id', ForeignKey('users.user_id')),
               Column('shipped', Boolean(), default=False)
               )
line_items = Table('line_items', metadata,
                   Column('line_items_id', Integer(), primary_key=True),
                   Column('order_id', ForeignKey('orders.order_id')),
                   Column('cookie_id', ForeignKey('cookies.cookie_id')),
                   Column('quantity', Integer()),
                   Column('extended_cost', Numeric(12, 2))
                   )
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
connection = engine.connect()

from sqlalchemy import select, insert, update
ins = insert(users).values(
    username="cookiemon",
    email_address="mon@cookie.com",
    phone="111-111-1111",
    password="password"
)
result = connection.execute(ins)
ins = cookies.insert()
inventory_list = [
    {
        'cookie_name': 'chocolate chip',
        'cookie_recipe_url': 'http://some.aweso.me/cookie/recipe.html',
        'cookie_sku': 'CC01',
        'quantity': '12',
        'unit_cost': '0.50'
    },
    {
        'cookie_name': 'dark chocolate chip',
        'cookie_recipe_url': 'http://some.aweso.me/cookie/recipe_dark.html',
        'cookie_sku': 'CC02',
        'quantity': '1',
        'unit_cost': '0.75'
    }
]
result = connection.execute(ins, inventory_list)

s = select([cookies.c.cookie_name, cookies.c.quantity])
connection.execute(s).fetchall()
[('chocolate chip', 12), ('dark chocolate chip', 1)]

现在,我们再定义两个cookiemon的订单,第一条订单是9块chocolate chip,第二条订单是4块chocolate chip和1块dark chocolate chip cookie。我们用插入语句来实现,具体如下所示。

ins = insert(orders).values(user_id=1, order_id='1')
result = connection.execute(ins)
ins = insert(line_items)
order_items = [
    {
        'order_id': 1,
        'cookie_id': 1,
        'quantity': 9,
        'extended_cost': 4.50
    }
]
result = connection.execute(ins, order_items)
ins = insert(orders).values(user_id=1, order_id='2')
result = connection.execute(ins)
ins = insert(line_items)
order_items = [
    {
        'order_id': 2,
        'cookie_id': 2,
        'quantity': 1,
        'extended_cost': 0.75
    },
    {
        'order_id': 2,
        'cookie_id': 1,
        'quantity': 4,
        'extended_cost': 2.00
    }
]
result = connection.execute(ins, order_items)

这样我们就有了演示事务的订单数据了,现在我们需要定义一个函数ship_it。这个函数接受一个order_id,然后从库存中去掉对应的购买量,并把订单标记成已发货,shipped=True,如下所示。

from sqlalchemy.exc import IntegrityError
def ship_it(order_id):
    try:
        s = select([line_items.c.cookie_id, line_items.c.quantity])
        s = s.where(line_items.c.order_id == order_id)
        cookies_to_ship = connection.execute(s)
        for cookie in cookies_to_ship:
            u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
            u = u.values(quantity=cookies.c.quantity - cookie.quantity)
            result = connection.execute(u)
        u = update(orders).where(orders.c.order_id == order_id)
        u = u.values(shipped=True)
        result = connection.execute(u)
        print("Shipped order ID: {}".format(order_id))
    except IntegrityError as error:
        print(error)

当一条订单发货之后,函数ship_it就会执行动作。让我们对第一条订单执行函数ship_it,然后看看cookies表是否更新了库存。

ship_it(1)
Shipped order ID: 1
s = select([cookies.c.cookie_name, cookies.c.quantity])
connection.execute(s).fetchall()
[('chocolate chip', 3), ('dark chocolate chip', 1)]

运行正常。但是现在的库存量不能满足第二条订单;但是,在工作节奏很快的仓库中,这些订单应该可以同时处理。现在我们再用函数ship_it处理一下第二条订单。

ship_it(2)
(sqlite3.IntegrityError) CHECK constraint failed: quantity_positive [SQL: 'UPDATE cookies SET quantity=(cookies.quantity - ?) WHERE cookies.cookie_id = ?'] [parameters: (4, 1)]
s = select([cookies.c.cookie_name, cookies.c.quantity])
connection.execute(s).fetchall()
[('chocolate chip', 3), ('dark chocolate chip', 0)]

程序中库存量先扣减1块dark chocolate chip cookie,可以正常执行,再扣减4块chocolate chip,因为没有足够的chocolate chip库存导致了IntegrityError异常。但是dark chocolate chip的库存扣减了,这不是我们想看到的。我们这里想要发生整个订单,不允许单独让一部分先发货。用前面介绍的异常处理方法可以解决这个问题,但是,事务提供了更好的处理方式。

事务通过connection对象的begin()方法启动。这样我们就获得一个事务,可以处理后面所有的语句。如果这些语句都成功执行,我们就可以用commit()方法对数据库进行状态确认。如果不完全成功,我们就用rollback()方法让数据库回退到原始状态。下面让我们用事务把函数ship_it改一下。

def ship_it(order_id):
    s = select([line_items.c.cookie_id, line_items.c.quantity])
    s = s.where(line_items.c.order_id == order_id)
    transaction = connection.begin()
    cookies_to_ship = connection.execute(s).fetchall()
    try:
        for cookie in cookies_to_ship:
            u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
            u = u.values(quantity=cookies.c.quantity-cookie.quantity)
            result = connection.execute(u)
        u = update(orders).where(orders.c.order_id == order_id)
        u = u.values(shipped=True)
        result = connection.execute(u)
        print("Shipped order ID: {}".format(order_id))
        transaction.commit()
    except IntegrityError as error:
        transaction.rollback()
        print(error)

现在我们把dark chocolate chip的库存重新补成1。

u = update(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
u = u.values(quantity = 1)
result = connection.execute(u)

s = select([cookies.c.cookie_name, cookies.c.quantity])
connection.execute(s).fetchall()
[('chocolate chip', 3), ('dark chocolate chip', 1)]

再用新的函数ship_it对第二条订单进行处理。程序不会因为异常而停止,只会打印异常信息。

ship_it(2)
(sqlite3.IntegrityError) CHECK constraint failed: quantity_positive [SQL: 'UPDATE cookies SET quantity=(cookies.quantity - ?) WHERE cookies.cookie_id = ?'] [parameters: (4, 1)]

再用库存查看语句看看库存的状态,回退到第二条订单处理之前的状态了。

s = select([cookies.c.cookie_name, cookies.c.quantity])
connection.execute(s).fetchall()
[('chocolate chip', 3), ('dark chocolate chip', 1)]

这就是事务的作用,短短几行代码就可以让数据库回退到异常发生之前的状态,非常给力吧。

  1. Essential SQLAlchemy: Mapping Python to Databases

  2. sqlalchemy introduce