Python 接入 MySQL 数据库
一、Python 与 MySQL 数据库交互概览 (Overview of Python-MySQL Interaction)
1. 为何选择 Python 与 MySQL 结合 (Why Combine Python with MySQL?)
Python 以其简洁的语法、丰富的库生态系统以及在Web开发、数据分析、人工智能、自动化脚本等领域的广泛应用而著称。MySQL 则是世界上最流行的开源关系型数据库之一,以其稳定性、高性能和易用性而受到青睐。将两者结合,可以构建出功能强大、数据驱动的应用程序。
Web 开发: 诸如 Django、Flask、FastAPI 等流行的 Python Web 框架都能够与 MySQL 无缝集成,用于存储用户信息、应用数据、会话管理等。
数据分析与科学: Python 的 Pandas、NumPy、SciPy 等库与 MySQL 结合,可以方便地从数据库中提取数据进行分析、处理和可视化。
自动化与脚本: Python 脚本可以轻松连接 MySQL,执行数据迁移、备份、报表生成、ETL(Extract, Transform, Load)等任务。
企业应用: 许多企业级应用后台使用 Python 处理业务逻辑,并依赖 MySQL 作为其核心数据存储。
易用性与社区支持: Python 提供了多种成熟的 MySQL 连接器(驱动程序),简化了开发过程。同时,Python 和 MySQL 都有庞大而活跃的开发者社区,可以方便地找到解决方案和支持。
2. Python 访问 MySQL 的基本架构 (Basic Architecture of Python Accessing MySQL)
Python 应用程序通过一个称为数据库连接器 (Database Connector) 或 驱动程序 (Driver) 的特定库来与 MySQL 服务器进行通信。这个连接器扮演着翻译官的角色,将 Python 代码中的数据库操作指令转换为 MySQL 服务器能够理解的协议和SQL语句,并将服务器返回的结果转换回 Python 的数据结构。
其基本交互流程如下:
加载连接器: Python 程序导入所需的 MySQL 连接器库。
建立连接 (Connection): Python 程序使用连接器提供的函数,传入必要的参数(如主机名、端口、用户名、密码、数据库名等)来与 MySQL 服务器建立一个网络连接。
创建游标 (Cursor): 一旦连接建立,程序会创建一个游标对象。游标可以看作是数据库操作的一个句柄,它允许你执行SQL语句并处理结果集。
执行 SQL 语句: 通过游标对象,程序可以执行各种 SQL 语句(SELECT
, INSERT
, UPDATE
, DELETE
, DDL语句等)。
处理结果:
对于 SELECT
查询,游标可以用来获取查询结果(一行、多行或所有行)。
对于 DML 语句(INSERT
, UPDATE
, DELETE
),可以获取受影响的行数。
事务处理: 对于需要原子性的操作序列,程序可以通过连接对象来管理事务(提交 commit
或回滚 rollback
)。
关闭游标和连接: 操作完成后,为了释放资源,程序需要关闭游标和数据库连接。
(图片仅为示意,表示Python应用通过Connector与MySQL Server通信)
二、选择合适的 Python MySQL 连接器 (Choosing the Right Python MySQL Connector)
Python 社区提供了多个用于连接 MySQL 的库。选择哪个连接器取决于项目的具体需求,如性能要求、部署环境、Python 版本兼容性、特定特性需求以及个人偏好。
2.1 主流 Python MySQL 连接器
以下是几个最常用和推荐的连接器:
mysql-connector-python
(官方连接器)
开发者: Oracle Corporation (MySQL 的母公司)
类型: 纯 Python 实现 (Pure Python)。这意味着它不依赖外部C库(除了可选的C扩展以提高性能),通常更容易安装在各种平台上。
特性:
完全实现了 Python DB-API 2.0 规范 (PEP 249)。
支持 MySQL 的所有最新特性,包括X Plugin (用于文档存储和异步操作)、新的认证插件 (如 caching_sha2_password
)。
提供了连接池功能。
支持压缩、SSL连接。
可以不安装 MySQL客户端库。
可选的 C 扩展 (mysql-connector-python-cext
) 可以提高性能,但会引入编译依赖。
许可: 通常是 GPLv2 或商业许可(取决于你如何获取和使用)。
安装: pip install mysql-connector-python
PyMySQL
开发者: PyMySQL 社区
类型: 纯 Python 实现。
特性:
几乎完全实现了 Python DB-API 2.0 规范。
轻量级,易于安装和部署,尤其在没有C编译器或受限环境中。
被许多第三方库(如 SQLAlchemy, Django – 作为备选驱动)良好支持。
支持 Python 3.x (Python 2.x 支持已在较旧版本中)。
支持 caching_sha2_password
认证 (需要 cryptography
库)。
许可: MIT License (非常宽松)。
安装: pip install PyMySQL
mysqlclient
(MySQLdb 的现代分支)
开发者: mysqlclient
维护者 (基于 MySQLdb
)
类型: C 扩展模块。它包装了 MySQL C 客户端库。
特性:
实现了 Python DB-API 2.0 规范。
性能通常被认为是最高的,因为它直接使用C库。
是 Django 框架默认推荐的 MySQL 连接器。
支持 Python 3.x (Python 2.x 支持通过旧版 MySQLdb
)。
支持最新的 MySQL 特性(取决于链接的C客户端库版本)。
缺点:
安装可能更复杂: 需要系统上安装了 MySQL 开发库 (C headers and libraries) 和 C 编译器。在某些操作系统(尤其是Windows)上直接通过 pip
安装可能遇到编译问题,可能需要预编译的二进制包 (wheels)。
许可: GPLv2。
安装:
Linux: sudo apt-get install python3-dev default-libmysqlclient-dev build-essential
(Debian/Ubuntu) 或 sudo yum install python3-devel mysql-devel gcc
(CentOS/RHEL) 然后 pip install mysqlclient
macOS: brew install mysql-client
(或 mysql
) 然后 pip install mysqlclient
Windows: 通常建议从 Christoph Gohlke’s Unofficial Windows Binaries for Python Extension Packages 下载预编译的 .whl
文件安装,或者使用 pip install mysqlclient
(如果环境中配置好了编译工具和库)。
2.2 连接器选择考量因素
特性/考量因素 | mysql-connector-python (官方) |
PyMySQL |
mysqlclient |
中文解释 |
---|---|---|---|---|
实现类型 | 纯Python (可选C扩展) | 纯Python | C扩展 (包装C库) | 纯Python易安装部署,C扩展通常性能更好但安装可能复杂。 |
性能 | 中等到高 (使用C扩展时) | 中等 | 通常最高 | C扩展因接近底层而快;纯Python实现会有Python解释器开销。 |
安装便捷性 | 高 (纯Python部分) | 非常高 | 中等到低 (依赖C库和编译器) | 纯Python库通常pip install 即可;C扩展可能需要系统依赖。 |
Python 版本 | Python 3.x (旧版支持2.x) | Python 3.x (旧版支持2.x) | Python 3.x (旧版MySQLdb 支持2.x) |
主流库都已转向Python 3。 |
DB-API 2.0 兼容性 | 完全兼容 | 几乎完全兼容 | 完全兼容 | DB-API 2.0是Python数据库编程的标准接口。 |
MySQL 最新特性支持 | 优秀 (官方维护) | 良好 (社区驱动,可能稍有延迟) | 良好 (依赖C客户端库版本) | 官方连接器通常最快支持MySQL新特性。 |
依赖 | 无 (纯Python核心) / C编译器(C扩展) | 可选cryptography (用于新认证) |
MySQL C 开发库, C编译器 | 依赖越少,部署越简单。 |
连接池 | 内置 | 无内置 (需第三方库) | 无内置 (需第三方库) | 对于Web应用或高并发服务,连接池很重要。 |
许可 | GPLv2 / 商业 | MIT | GPLv2 | MIT许可最宽松,GPL对商业应用有特定要求。 |
社区与文档 | 良好 (官方) | 良好 (活跃社区) | 良好 (广泛使用) | 都有不错的社区支持和文档资源。 |
Django/SQLAlchemy集成 | 支持 | 广泛支持 | Django首选, SQLAlchemy支持 | 主流ORM和框架通常支持多种连接器。 |
企业级项目选择建议:
追求极致性能且能处理C扩展安装: mysqlclient
往往是首选,尤其在Django项目中。
要求纯Python实现、易于部署、许可宽松: PyMySQL
是一个非常好的选择,并且由于其轻量和纯粹性,在Serverless、容器化等环境中也受欢迎。
需要MySQL官方支持、最新特性、内置连接池且不排斥GPL: mysql-connector-python
是一个可靠的选择。如果性能是瓶颈,可以尝试其C扩展。
初学者或快速原型: PyMySQL
或 mysql-connector-python
(纯Python模式) 通常更容易上手。
在后续的示例中,我们将主要使用 mysql-connector-python
和 PyMySQL
进行演示,因为它们代表了纯Python实现的便利性,并且功能全面。如果需要,也会提及 mysqlclient
的特定之处。
2.3 安装连接器
使用 pip
(Python的包安装器) 来安装这些库。建议在虚拟环境 (virtual environment) 中安装项目依赖。
安装 mysql-connector-python
:
pip install mysql-connector-python
或者,如果需要其C扩展以获得更好性能 (可能需要编译环境):
# pip install mysql-connector-python-cext # 通常这个包名不直接用
# 正确的方式是,mysql-connector-python 会尝试自动编译C扩展如果可用
# 如果你想要一个只包含C扩展的版本,你可能需要寻找特定的wheel或从源码编译
# 一般情况下,直接安装 mysql-connector-python 即可,它会按需处理
注意: Oracle 官方有时会提供名为 mysql-connector-python-rf
(release candidate for features) 或其他后缀的包,通常应使用稳定版 mysql-connector-python
。
安装 PyMySQL
:
pip install PyMySQL
如果需要支持 caching_sha2_password
认证方式(MySQL 8.0+ 默认),PyMySQL 可能需要 cryptography
库:
pip install PyMySQL[rsa]
# 或者
pip install PyMySQL cryptography
安装 mysqlclient
:
确保你已经安装了必要的系统依赖 (如前文所述的C开发库和编译器)。
pip install mysqlclient
如果遇到编译错误,请根据错误信息查找特定于你操作系统的解决方案,或者寻找预编译的wheel包。
三、Python DB-API 2.0 核心规范 (Core Specifications of Python DB-API 2.0)
Python DB-API 2.0 (PEP 249) 定义了一个标准的数据库接口,大多数Python数据库连接器都遵循这个规范。理解它有助于你编写更具移植性的数据库访问代码(尽管不同连接器的某些扩展特性可能不同)。
3.1 规范的目标
一致性: 为不同的数据库提供一个统一的编程接口。
易用性: 提供简单直观的方法来连接数据库、执行SQL和获取结果。
功能性: 包含处理事务、错误、多种数据类型等基本数据库操作所需的功能。
3.2 关键组件和方法
全局变量:
apilevel
: 字符串,表示API级别(通常是 “2.0”)。
threadsafety
: 整数,表示线程安全级别 (0到3)。
paramstyle
: 字符串,表示SQL参数占位符的风格(如 qmark
, numeric
, named
, format
, pyformat
)。
connect()
函数 (模块级别):
用于建立到数据库的连接。它返回一个连接对象 (Connection Object)。
参数通常包括:dsn
(数据源名称字符串,可选),user
, password
, host
, database
, port
等。不同连接器支持的参数名和具体细节可能略有差异,但核心参数类似。
连接对象 (Connection Object):
代表与数据库的一个会话。
close()
: 关闭数据库连接。
commit()
: 提交当前事务中所有挂起的操作。
rollback()
: 回滚当前事务中所有挂起的操作。
cursor([cursor_type])
: 返回一个新的游标对象 (Cursor Object)。可以指定游标类型(如 buffered, dictionary cursor,具体支持依赖连接器)。
Error
(异常类): 连接器定义的异常基类。
Warning
, InterfaceError
, DatabaseError
, DataError
, OperationalError
, IntegrityError
, InternalError
, ProgrammingError
, NotSupportedError
(标准异常类,通常是 Connection.Error
的子类)。
游标对象 (Cursor Object):
用于执行SQL语句并管理结果集。
description
: 只读属性。在执行 SELECT
查询后,它是一个包含7个元素的序列的序列,每个内部序列描述结果集中的一列(name
, type_code
, display_size
, internal_size
, precision
, scale
, null_ok
)。
rowcount
: 只读属性。对于 DML 语句(INSERT
, UPDATE
, DELETE
),返回受影响的行数。对于 SELECT
语句,其行为未被严格规定,可能返回 -1 或实际行数(取决于连接器和游标类型,例如buffered cursor可能知道行数)。
callproc(procname [, parameters])
: (可选) 调用存储过程。
close()
: 关闭游标。
execute(operation [, parameters])
: 执行一个数据库操作(SQL语句)。parameters
是一个可选的序列或映射,用于替换 operation
字符串中的参数占位符,以防止SQL注入。
executemany(operation, seq_of_parameters)
: 对参数序列中的每一项重复执行同一个数据库操作。通常用于批量插入或更新。
fetchone()
: 获取查询结果集中的下一行,返回一个序列(通常是元组),或者在没有更多行时返回 None
。
fetchmany([size=cursor.arraysize])
: 获取查询结果集中的下一批行,返回一个序列的序列(列表的列表或元组的列表)。size
参数指定要获取的行数,默认为 cursor.arraysize
。如果剩余行数少于 size
,则返回所有剩余行。如果没有更多行,返回空序列。
fetchall()
: 获取查询结果集中的所有(剩余)行,返回一个序列的序列。
nextset()
: (可选) 移动到下一个结果集(如果执行的操作返回多个结果集,例如某些存储过程)。
arraysize
: 可读写属性。fetchmany()
默认获取的行数。默认为1。
setinputsizes(sizes)
: (可选) 声明输入参数的大小。
setoutputsize(size [, column])
: (可选) 声明大列(如LOB)的缓冲区大小。
3.3 参数样式 (paramstyle
)
DB-API 2.0 规范定义了多种参数化查询的占位符风格,连接器会声明它支持哪种。
qmark
: 问号风格。例如:SELECT * FROM users WHERE id = ? AND name = ?
参数:(1, 'Alice')
numeric
: 数字,位置风格。例如:SELECT * FROM users WHERE id = :1 AND name = :2
参数:(1, 'Alice')
named
: 命名风格。例如:SELECT * FROM users WHERE id = :user_id AND name = :user_name
参数:{'user_id': 1, 'user_name': 'Alice'}
format
: ANSI C printf
格式代码。例如:SELECT * FROM users WHERE id = %s AND name = %s
参数:(1, 'Alice')
pyformat
: Python 扩展 printf
格式代码。例如:SELECT * FROM users WHERE id = %(user_id)s AND name = %(user_name)s
参数:{'user_id': 1, 'user_name': 'Alice'}
mysql-connector-python
和 PyMySQL
通常支持 format
(%s) 和 pyformat
(%(name)s) 风格。mysqlclient
也类似。 使用 %s
是最常见和推荐的方式,因为它简单且不易出错。
理解 DB-API 2.0 是掌握 Python 数据库编程的基础。即使你将来使用 ORM(如 SQLAlchemy),了解其底层是如何通过 DB-API 与数据库交互也是非常有益的。
四、基本连接与查询操作 (Basic Connection and Query Operations)
现在,我们将通过具体的代码示例来演示如何使用 Python 连接到 MySQL 数据库并执行基本的CRUD(Create, Read, Update, Delete)操作。我们将首先以 mysql-connector-python
为例,然后展示 PyMySQL
的等效实现。
4.1 使用 mysql-connector-python
4.1.1 建立数据库连接
首先,你需要安装 mysql-connector-python
。
pip install mysql-connector-python
然后,在 Python 脚本中导入并使用它:
import mysql.connector # 导入 mysql.connector 模块
from mysql.connector import errorcode # 导入 errorcode 用于更具体的错误处理
def connect_to_mysql_server_only(db_host, db_user, db_password):
"""尝试只连接到MySQL服务器,不指定数据库。"""
try:
cnx = mysql.connector.connect( # 调用 connect 函数建立连接
host=db_host, # MySQL 服务器主机名或IP地址
user=db_user, # MySQL 用户名
password=db_password # MySQL 用户密码
# port=3306, # 可选,MySQL 服务器端口,默认为3306
# connection_timeout=10 # 可选,连接超时时间(秒)
)
print(f"成功连接到 MySQL 服务器 {
db_host} (版本: {
cnx.get_server_info()})") # 打印服务器信息
return cnx # 返回连接对象
except mysql.connector.Error as err: # 捕获 mysql.connector 的错误
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: # 如果是访问被拒绝错误
print(f"连接失败: 用户名或密码不正确 (错误码: {
err.errno}, {
err.msg})")
elif err.errno == errorcode.ER_BAD_DB_ERROR: # 如果是数据库不存在错误 (这里不应发生,因为未指定数据库)
print(f"连接失败: 数据库不存在 (错误码: {
err.errno}, {
err.msg})")
else: # 其他 MySQL 连接错误
print(f"连接 MySQL 服务器失败: {
err} (错误码: {
err.errno})")
return None # 连接失败返回 None
def connect_to_mysql_database(db_host, db_user, db_password, db_name):
"""尝试连接到MySQL服务器上的特定数据库。"""
try:
cnx = mysql.connector.connect( # 调用 connect 函数建立连接
host=db_host,
user=db_user,
password=db_password,
database=db_name # 指定要连接的数据库名称
# auth_plugin='mysql_native_password' # 如果遇到认证插件问题 (如caching_sha2_password),可尝试指定
)
print(f"成功连接到数据库 '{
db_name}' 在服务器 {
db_host}")
return cnx
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print(f"连接失败: 用户名或密码不正确 (错误码: {
err.errno}, {
err.msg})")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print(f"连接失败: 数据库 '{
db_name}' 不存在 (错误码: {
err.errno}, {
err.msg})")
else:
print(f"连接数据库 '{
db_name}' 失败: {
err} (错误码: {
err.errno})")
return None
# --- 主程序逻辑 ---
if __name__ == "__main__":
# 请替换为你的MySQL服务器的实际连接信息
DB_HOST = "localhost" # 或者你的MySQL服务器IP
DB_USER = "your_mysql_user" # 替换为你的MySQL用户名
DB_PASSWORD = "your_mysql_password" # 替换为你的MySQL密码
DB_NAME = "your_database_name" # 替换为你要连接的数据库名
# 示例1: 只连接服务器
server_connection = connect_to_mysql_server_only(DB_HOST, DB_USER, DB_PASSWORD)
if server_connection:
print(f"服务器连接对象: {
server_connection}")
server_connection.close() # 操作完成后关闭连接
print("服务器连接已关闭。")
print("-" * 30)
# 示例2: 连接到特定数据库
db_connection = connect_to_mysql_database(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)
if db_connection:
print(f"数据库连接对象: {
db_connection}")
# 在这里可以进行后续的数据库操作 (创建游标、执行SQL等)
db_connection.close() # 操作完成后关闭连接
print(f"数据库 '{
DB_NAME}' 连接已关闭。")
# 准备一个示例表用于后续操作
# 为了运行后续示例,请确保你的MySQL中存在名为 your_database_name 的数据库,
# 并且该用户 your_mysql_user 有权限在该数据库中创建表和执行DML。
# 你可以使用 phpMyAdmin, MySQL Workbench, 或 MySQL命令行客户端来创建数据库。
# CREATE DATABASE IF NOT EXISTS your_database_name;
代码解释:
import mysql.connector
和 from mysql.connector import errorcode
: 导入必要的模块。errorcode
包含MySQL服务器返回的错误代码常量,便于进行更精确的错误判断。
mysql.connector.connect(...)
: 这是建立连接的核心函数。
host
, user
, password
, database
: 是最基本的连接参数。
其他可选参数如 port
, connection_timeout
, auth_plugin
等可以根据需要配置。例如,如果MySQL 8.0+ 使用 caching_sha2_password
认证,而你的连接器版本较旧或配置不当,可能需要显式指定 auth_plugin='mysql_native_password'
(但这通常是针对旧客户端兼容旧服务器的情况,或反之)。
错误处理: 使用 try...except mysql.connector.Error as err:
来捕获连接过程中可能发生的各种MySQL错误。
err.errno
: 错误码,可以与 errorcode
中的常量比较。
err.msg
: 错误的文本描述。
cnx.get_server_info()
: 获取MySQL服务器的版本信息。
cnx.close()
: 非常重要。在完成所有数据库操作后,必须关闭连接以释放服务器和客户端的资源。未关闭的连接可能会导致资源泄露,最终耗尽服务器的最大连接数。
企业级连接配置考量:
配置文件/环境变量: 不应将数据库凭证(用户名、密码)硬编码在源代码中。应使用配置文件 (如 INI, YAML, JSON, .env 文件) 或环境变量来存储这些敏感信息,并在程序启动时读取。
# 示例:从环境变量读取 (更安全)
import os
# DB_USER = os.getenv("DB_USER_PROD")
# DB_PASSWORD = os.getenv("DB_PASS_PROD")
SSL/TLS 加密连接: 对于生产环境,特别是当Python应用和MySQL服务器不在同一台机器或安全网络中时,应使用SSL/TLS加密连接以保护数据传输安全。mysql-connector-python
支持SSL连接,需要在 connect()
函数中配置 ssl_ca
, ssl_cert
, ssl_key
等参数。
# config = {
# 'user': 'your_user',
# 'password': 'your_password',
# 'host': 'your_host',
# 'database': 'your_db',
# 'ssl_ca': '/path/to/ca.pem', # CA证书路径
# 'ssl_cert': '/path/to/client-cert.pem', # 客户端证书路径 (如果需要双向SSL)
# 'ssl_key': '/path/to/client-key.pem', # 客户端私钥路径 (如果需要双向SSL)
# 'ssl_verify_cert': True # (MySQL 8.0.27+/Connector 8.0.27+) 强制验证服务器证书,更安全
# }
# cnx = mysql.connector.connect(**config)
连接超时: 设置合理的 connection_timeout
可以防止程序在网络问题时无限期等待。
字符集: 确保Python连接器使用的字符集(通常默认为 utf8mb4
或可配置)与数据库/表的字符集匹配,以避免乱码问题。可以在 connect()
时指定 charset='utf8mb4'
。
时区: 处理日期时间数据时,要注意Python应用、MySQL服务器、连接器之间的时区设置,避免混淆。
4.1.2 创建游标对象
连接建立后,需要创建一个游标对象来执行SQL。
# (接上文 db_connection 成功建立后)
# if db_connection and db_connection.is_connected(): # 确保连接是活动的
# try:
# cursor = db_connection.cursor() # 创建一个默认类型的游标
# print("默认游标已创建。")
#
# # 不同类型的游标 (mysql-connector-python 支持)
# buffered_cursor = db_connection.cursor(buffered=True) # 缓冲游标,获取所有结果到客户端内存
# print("缓冲游标已创建。当结果集不大时,或需要在获取所有数据后释放服务器资源时有用。")
#
# dictionary_cursor = db_connection.cursor(dictionary=True) # 字典游标,每行结果作为字典返回 (列名: 值)
# print("字典游标已创建。方便按列名访问数据。")
#
# named_tuple_cursor = db_connection.cursor(named_tuple=True) # 命名元组游标,每行结果作为命名元组返回
# print("命名元组游标已创建。提供类似字典的属性访问,但更轻量。")
#
# except mysql.connector.Error as err:
# print(f"创建游标失败: {err}")
# finally:
# if 'cursor' in locals() and cursor: cursor.close()
# if 'buffered_cursor' in locals() and buffered_cursor: buffered_cursor.close()
# if 'dictionary_cursor' in locals() and dictionary_cursor: dictionary_cursor.close()
# if 'named_tuple_cursor' in locals() and named_tuple_cursor: named_tuple_cursor.close()
# if db_connection and db_connection.is_connected():
# db_connection.close()
# print("数据库连接已关闭。")
代码解释:
db_connection.cursor()
: 返回一个标准的游标对象。默认情况下,对于 SELECT
查询,它可能是非缓冲的,意味着数据逐行从服务器获取。
buffered=True
: 创建一个缓冲游标。它会在执行查询后立即将整个结果集从服务器下载到客户端内存中。
优点: 获取数据后,可以立即释放服务器端的资源(与该查询相关的锁或临时表)。可以多次遍历结果集(如果需要)。
缺点: 如果结果集非常大,会消耗大量客户端内存。
dictionary=True
: 创建一个字典游标。fetchone()
和 fetchall()
等方法返回的每一行都是一个字典,键是列名,值是对应的数据。
# row = dictionary_cursor.fetchone()
# if row:
# print(row['column_name'])
named_tuple=True
: 创建一个命名元组游标。每一行都是一个命名元tuple,可以通过列名作为属性访问,也可以通过索引访问。
# from collections import namedtuple # 命名元组游标内部会使用类似机制
# row = named_tuple_cursor.fetchone()
# if row:
# print(row.column_name)
# print(row[0])
游标也需要关闭: cursor.close()
。与连接类似,游标也占用资源,使用完毕后应关闭。使用 with
语句管理游标是更好的做法(后续会讲)。
企业级游标选择:
默认游标: 适用于大多数情况,特别是当结果集可能很大,或者只需要逐行处理时。
缓冲游标 (buffered=True
): 当结果集确定不大,并且需要在获取数据后快速与服务器断开(或执行其他操作),或者需要多次迭代结果时使用。例如,获取少量配置数据。
字典游标 (dictionary=True
): 提高代码可读性,因为可以直接通过列名访问数据,无需记住列的索引顺序。但相比元组,字典会有轻微的额外开销。
命名元组游标 (named_tuple=True
): 兼具字典游标的可读性和元组的轻量级特性。
4.1.3 执行 SELECT
查询并获取结果
def create_sample_employees_table(connection):
"""在数据库中创建一个示例 employees 表 (如果不存在)"""
try:
cursor = connection.cursor() # 创建游标
table_description = """
CREATE TABLE IF NOT EXISTS employees (
emp_no INT AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
hire_date DATE,
salary DECIMAL(10, 2)
) ENGINE=InnoDB CHARACTER SET=utf8mb4
"""
cursor.execute(table_description) # 执行建表语句
print("表 'employees' 已检查/创建。")
# 检查表是否为空,如果为空则插入一些示例数据
cursor.execute("SELECT COUNT(*) FROM employees") # 查询员工数量
if cursor.fetchone()[0] == 0: # 如果员工数量为0
print("表 'employees' 为空,插入示例数据...")
insert_query = """
INSERT INTO employees (first_name, last_name, hire_date, salary) VALUES
('John', 'Doe', '2020-01-15', 60000.00),
('Jane', 'Smith', '2019-03-01', 75000.00),
('Alice', 'Johnson', '2021-07-22', 80000.00),
('Bob', 'Williams', '2018-06-10', 50000.00),
('Charlie', 'Brown', '2020-01-15', 62000.00)
"""
cursor.execute(insert_query) # 执行插入语句
connection.commit() # 提交事务 (DML操作后需要)
print(f"{
cursor.rowcount} 条示例数据已插入。")
cursor.close() # 关闭游标
except mysql.connector.Error as err:
print(f"创建或填充 'employees' 表失败: {
err}")
def query_all_employees(connection):
"""查询所有员工信息"""
print("
--- 查询所有员工 ---")
try:
# 使用字典游标方便按列名访问
cursor = connection.cursor(dictionary=True) # 创建字典游标
query = "SELECT emp_no, first_name, last_name, hire_date, salary FROM employees"
cursor.execute(query) # 执行 SELECT 查询
print(f"查询找到 {
cursor.rowcount} 行 (注意: rowcount 对SELECT的行为可能不一致,建议迭代判断)")
# 打印列描述信息 (可选)
print("列描述:")
for col_desc in cursor.description: # 遍历列描述
print(col_desc) # 打印每一列的描述 (name, type_code, etc.)
print("
员工列表:")
# 方式1: fetchall() 获取所有行
# all_rows = cursor.fetchall()
# if not all_rows:
# print("没有找到员工数据。")
# else:
# for row in all_rows:
# print(f" ID: {row['emp_no']}, Name: {row['first_name']} {row['last_name']}, "
# f"Hired: {row['hire_date']}, Salary: {row['salary']}")
# 方式2: 逐行获取 fetchone()
# print("
员工列表 (逐行获取):")
# row = cursor.fetchone()
# if not row:
# print(" 没有找到员工数据。")
# while row:
# print(f" ID: {row['emp_no']}, Name: {row['first_name']} {row['last_name']}, "
# f"Hired: {row['hire_date']}, Salary: {row['salary']}")
# row = cursor.fetchone()
# 方式3: 直接迭代游标 (最Pythonic的方式)
print("
员工列表 (迭代游标):")
count = 0
for row in cursor: # 直接迭代游标对象
count += 1
print(f" ID: {
row['emp_no']}, Name: {
row['first_name']} {
row['last_name']}, "
f"Hired: {
row['hire_date']}, Salary: {
row['salary']}")
if count == 0:
print(" 没有找到员工数据。")
except mysql.connector.Error as err:
print(f"查询员工失败: {
err}")
finally:
if 'cursor' in locals() and cursor: # 检查cursor是否已定义且非None
cursor.close() # 确保游标被关闭
def query_employees_with_condition(connection, min_salary):
"""查询薪水高于指定值的员工 (参数化查询)"""
print(f"
--- 查询薪水高于 {
min_salary} 的员工 ---")
try:
cursor = connection.cursor(dictionary=True)
# 参数化查询: 使用 %s 作为占位符
query = """
SELECT emp_no, first_name, last_name, salary
FROM employees
WHERE salary > %s
ORDER BY salary DESC
"""
cursor.execute(query, (min_salary,)) # 第二个参数是元组,包含要替换占位符的值
print("符合条件的员工:")
fetched_rows = cursor.fetchall() # 获取所有结果
if not fetched_rows:
print(f" 没有找到薪水高于 {
min_salary} 的员工。")
else:
for row in fetched_rows:
print(f" ID: {
row['emp_no']}, Name: {
row['first_name']} {
row['last_name']}, Salary: {
row['salary']}")
except mysql.connector.Error as err:
print(f"条件查询员工失败: {
err}")
finally:
if 'cursor' in locals() and cursor:
cursor.close()
# --- 主程序逻辑 (续) ---
if __name__ == "__main__":
# (前面的连接代码)
DB_HOST = "localhost"
DB_USER = "your_mysql_user"
DB_PASSWORD = "your_mysql_password"
DB_NAME = "your_database_name"
db_connection = connect_to_mysql_database(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)
if db_connection and db_connection.is_connected():
create_sample_employees_table(db_connection) # 创建/准备示例表
query_all_employees(db_connection) # 执行查询所有员工
query_employees_with_condition(db_connection, 60000.00) # 执行条件查询
db_connection.close() # 完成所有操作后关闭连接
print(f"
数据库 '{
DB_NAME}' 连接已彻底关闭。")
代码解释:
create_sample_employees_table()
: 一个辅助函数,用于创建示例表并填充数据,确保后续查询可以执行。
CREATE TABLE IF NOT EXISTS ...
: 避免表已存在时报错。
cursor.execute("SELECT COUNT(*)...")
和 cursor.fetchone()[0]
: 用于检查表是否为空。fetchone()
返回一行,如果表为空或查询无结果,它返回 None
;否则返回一个元组(对于默认游标)或字典(对于字典游标)。这里假设默认游标,所以用 [0]
取第一个元素(即COUNT(*)的值)。
connection.commit()
: 对于 INSERT
, UPDATE
, DELETE
等修改数据的操作,执行后必须调用 connection.commit()
才能将更改永久保存到数据库。 否则,在连接关闭或事务回滚时,这些更改会丢失。MySQL 默认可能是自动提交模式(autocommit=True),但这取决于服务器和连接器配置。显式提交是良好实践。
query_all_employees()
:
cursor = connection.cursor(dictionary=True)
: 使用字典游标。
cursor.execute(query)
: 执行 SELECT
语句。
cursor.description
: 打印列信息。每个元组包含 (name
, type_code
, display_size
, internal_size
, precision
, scale
, null_ok
)。
获取结果的方式:
cursor.fetchall()
: 一次性获取所有行,返回一个列表,列表中的每个元素是代表一行的字典(因为是字典游标)。适合结果集不大时。
cursor.fetchone()
: 每次获取一行,直到返回 None
。适合逐行处理,或结果集非常大时,避免一次性加载到内存。
直接迭代游标 (for row in cursor:
): 这是最 Pythonic 且通常最高效的方式来处理多行结果。它在内部可能类似于 while True: row = cursor.fetchone(); if not row: break; ...
。
query_employees_with_condition()
: 演示了参数化查询 (Parameterized Query)。
query = "SELECT ... WHERE salary > %s ..."
: SQL 语句中使用 %s
作为占位符。
cursor.execute(query, (min_salary,))
: execute
方法的第二个参数是一个元组(或列表),其元素会按顺序替换SQL语句中的 %s
占位符。这是防止 SQL 注入攻击的正确方法。 绝不要使用Python的字符串格式化(如 f-string 或 %
操作符)将变量直接拼接到SQL语句中。
即使只有一个参数,也要传入一个单元素的元组,例如 (value,)
。
finally
块: 确保无论 try
块中是否发生异常,游标 (cursor.close()
) 和连接 (connection.close()
) 都会被关闭。这是非常重要的资源管理实践。
4.1.4 执行 INSERT
, UPDATE
, DELETE
操作 (DML)
def insert_new_employee(connection, first_name, last_name, hire_date, salary):
"""插入一个新员工记录 (参数化)"""
print(f"
--- 插入新员工: {
first_name} {
last_name} ---")
new_emp_no = None
try:
cursor = connection.cursor()
insert_query = """
INSERT INTO employees (first_name, last_name, hire_date, salary)
VALUES (%s, %s, %s, %s)
"""
employee_data = (first_name, last_name, hire_date, salary) # 准备要插入的数据元组
cursor.execute(insert_query, employee_data) # 执行插入语句,传入数据
connection.commit() # !! 提交事务以保存更改 !!
new_emp_no = cursor.lastrowid # 获取最后插入行的 AUTO_INCREMENT ID (如果主键是自增的)
print(f"员工 '{
first_name} {
last_name}' 成功插入。受影响行数: {
cursor.rowcount}, 新员工ID: {
new_emp_no}")
except mysql.connector.Error as err:
print(f"插入员工失败: {
err}")
if connection.is_connected(): # 如果连接仍然有效,可以选择回滚
print("尝试回滚事务...")
connection.rollback() # 回滚未提交的更改
finally:
if 'cursor' in locals() and cursor:
cursor.close()
return new_emp_no # 返回新员工的ID
def update_employee_salary(connection, emp_no, new_salary):
"""更新指定员工的薪水 (参数化)"""
print(f"
--- 更新员工 ID {
emp_no} 的薪水为 {
new_salary} ---")
try:
cursor = connection.cursor()
update_query = "UPDATE employees SET salary = %s WHERE emp_no = %s"
cursor.execute(update_query, (new_salary, emp_no)) # 执行更新语句,传入参数
connection.commit() # 提交事务
if cursor.rowcount > 0:
print(f"员工 ID {
emp_no} 的薪水已成功更新。受影响行数: {
cursor.rowcount}")
else:
print(f"未找到员工 ID {
emp_no},或薪水未改变。受影响行数: {
cursor.rowcount}")
except mysql.connector.Error as err:
print(f"更新员工薪水失败: {
err}")
if connection.is_connected(): connection.rollback()
finally:
if 'cursor' in locals() and cursor:
cursor.close()
def delete_employee(connection, emp_no):
"""删除指定员工 (参数化)"""
print(f"
--- 删除员工 ID {
emp_no} ---")
try:
cursor = connection.cursor()
delete_query = "DELETE FROM employees WHERE emp_no = %s"
cursor.execute(delete_query, (emp_no,)) # 执行删除语句,传入参数
connection.commit() # 提交事务
if cursor.rowcount > 0:
print(f"员工 ID {
emp_no} 已成功删除。受影响行数: {
cursor.rowcount}")
else:
print(f"未找到员工 ID {
emp_no} 进行删除。受影响行数: {
cursor.rowcount}")
except mysql.connector.Error as err:
print(f"删除员工失败: {
err}")
if connection.is_connected(): connection.rollback()
finally:
if 'cursor' in locals() and cursor:
cursor.close()
# --- 主程序逻辑 (续) ---
if __name__ == "__main__":
# (前面的连接和建表示例代码)
DB_HOST = "localhost"
DB_USER = "your_mysql_user"
DB_PASSWORD = "your_mysql_password"
DB_NAME = "your_database_name"
db_connection = connect_to_mysql_database(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)
if db_connection and db_connection.is_connected():
create_sample_employees_table(db_connection) # 确保表存在
# 执行DML操作
new_id = insert_new_employee(db_connection, "Peter", "Jones", "2023-01-10", 95000.00)
if new_id:
update_employee_salary(db_connection, new_id, 98000.00)
# query_all_employees(db_connection) # 可以取消注释查看更新后的结果
delete_employee(db_connection, new_id)
# query_all_employees(db_connection) # 可以取消注释查看删除后的结果
else:
print("由于插入失败,后续的更新和删除操作未执行。")
# 演示插入一个不存在的ID的更新和删除
update_employee_salary(db_connection, 9999, 100000.00) # 更新不存在的员工
delete_employee(db_connection, 9999) # 删除不存在的员工
db_connection.close()
print(f"
数据库 '{
DB_NAME}' DML操作演示完成,连接已关闭。")
代码解释:
参数化查询: 所有DML语句 (INSERT
, UPDATE
, DELETE
) 都使用了参数化查询 (%s
占位符和 cursor.execute(query, params)
),这是防止SQL注入的最佳实践。
connection.commit()
: 在执行任何会修改数据的SQL语句后,必须调用 connection.commit()
来使更改生效。否则,当连接关闭或程序结束时,这些更改可能会丢失(取决于MySQL服务器的autocommit设置和连接器的默认行为,但显式提交总是更安全)。
connection.rollback()
: 在 except
块中,如果发生错误,可以调用 connection.rollback()
来撤销当前事务中尚未提交的所有更改。这有助于保持数据的一致性。
cursor.lastrowid
: 对于 INSERT
操作,如果插入的表有一个 AUTO_INCREMENT
主键,cursor.lastrowid
属性会返回最后插入行的ID。这对于获取新生成的记录ID非常有用。注意:如果一次 INSERT
插入了多行,或者主键不是自增的,其行为可能不同或不可靠。
cursor.rowcount
:
对于 INSERT
, UPDATE
, DELETE
,rowcount
返回受这些操作影响的行数。
对于 SELECT
,其行为在DB-API 2.0中没有严格定义,可能返回-1,或者在某些情况下(如使用缓冲游标)返回获取到的行数。不应完全依赖它来判断 SELECT
是否有结果,更好的方式是检查 fetchone()
/fetchall()
的返回值。
4.1.5 使用 with
语句管理连接和游标 (Context Managers)
Python 的 with
语句提供了一种更优雅、更安全的方式来管理资源(如文件句柄、网络连接、数据库连接和游标),它可以确保资源在使用完毕后即使发生异常也能被正确释放。
mysql-connector-python
的连接对象和游标对象都支持上下文管理协议。
def query_with_context_manager(db_config):
"""使用 with 语句管理连接和游标"""
print("
--- 使用 with 语句查询员工 (薪水 > 70000) ---")
query = """
SELECT emp_no, first_name, last_name, salary
FROM employees
WHERE salary > %s
ORDER BY first_name
"""
min_salary = 70000.00
results = [] # 用于存储查询结果
try:
# 'with' 语句确保连接在使用后自动关闭,即使发生错误
with mysql.connector.connect(**db_config) as connection: # db_config 是一个包含连接参数的字典
print("数据库连接已通过 'with' 语句打开。")
# 'with' 语句确保游标在使用后自动关闭
with connection.cursor(dictionary=True) as cursor:
print("游标已通过 'with' 语句创建。")
cursor.execute(query, (min_salary,))
for row in cursor:
results.append(row)
print(f" Fetched: ID={
row['emp_no']}, Name={
row['first_name']}, Salary={
row['salary']}")
print("游标已通过 'with' 语句自动关闭。")
print("数据库连接已通过 'with' 语句自动关闭。")
if not results:
print(f" 没有找到薪水高于 {
min_salary} 的员工。")
except mysql.connector.Error as err:
print(f"使用 'with' 语句查询失败: {
err}")
# 注意: connection.commit() 仍然需要显式调用,'with' 语句不自动提交事务。
# 如果这里有DML操作,且未在with块内commit,则不会保存。
# --- 主程序逻辑 (续) ---
if __name__ == "__main__":
DB_CONFIG = {
# 将连接参数组织成字典
'host': "localhost",
'user': "your_mysql_user",
'password': "your_mysql_password",
'database': "your_database_name",
'charset': 'utf8mb4' # 明确指定字符集是一个好习惯
}
# 假设 employees 表已存在且有数据
# create_sample_employees_table(mysql.connector.connect(**DB_CONFIG)) # 如果需要确保表存在
query_with_context_manager(DB_CONFIG)
代码解释:
with mysql.connector.connect(**db_config) as connection:
:
当进入 with
块时,mysql.connector.connect()
被调用并返回一个连接对象,赋值给 connection
。
当退出 with
块时(无论是正常结束还是发生异常),连接对象的 __exit__
方法会被自动调用,该方法会负责关闭连接 (connection.close()
)。
with connection.cursor(dictionary=True) as cursor:
:
类似地,游标对象也实现了上下文管理协议。当退出内部的 with
块时,游标会自动关闭 (cursor.close()
)。
事务管理: with
语句本身不负责自动提交或回滚事务。对于DML操作,你仍然需要在 with connection ...
块内部显式调用 connection.commit()
或 connection.rollback()
。如果 with
块因为异常而退出,且你没有 commit
,那么未提交的更改(如果事务是活动的)通常会被数据库服务器隐式回滚(当连接断开时),或者你需要更细致地在 except
块中处理 rollback
。
企业级实践: 强烈推荐使用 with
语句来管理数据库连接和游标,因为它可以减少因忘记关闭资源而导致的资源泄露风险,使代码更整洁、更健壮。
四、基本连接与查询操作
4.2 使用 PyMySQL
PyMySQL
是一个非常受欢迎的纯Python MySQL驱动程序,以其易用性、轻量级和良好的社区支持而闻名。它的API在很大程度上遵循Python DB-API 2.0规范,因此很多操作与 mysql-connector-python
类似,但在某些细节和特定功能上会有所不同。
4.2.1 安装 PyMySQL
首先,确保你已经安装了 PyMySQL
。如果需要支持 MySQL 8.0+ 的 caching_sha2_password
认证,建议同时安装 cryptography
。
pip install PyMySQL
# 或者为了更好的认证支持 (推荐):
pip install PyMySQL[rsa]
# 或者分开安装:
# pip install PyMySQL
# pip install cryptography
4.2.2 建立数据库连接
PyMySQL
的连接方式与 mysql-connector-python
类似,但函数名和一些参数可能略有不同。
import pymysql # 导入 pymysql 模块
import pymysql.cursors # 导入 pymysql.cursors 用于指定游标类型,例如字典游标
import os # 用于从环境变量读取敏感信息 (推荐做法)
def pymysql_connect_to_db(db_host, db_user, db_password, db_name, db_port=3306, db_charset='utf8mb4'):
"""使用 PyMySQL 连接到指定的数据库。"""
try:
connection = pymysql.connect( # 调用 pymysql.connect() 函数建立连接
host=db_host, # MySQL 服务器主机名或IP
user=db_user, # MySQL 用户名
password=db_password, # MySQL 密码
database=db_name, # 要连接的数据库名
port=db_port, # MySQL 端口,默认为3306
charset=db_charset, # 指定连接字符集,推荐 utf8mb4 以支持各种字符包括emoji
connect_timeout=10, # 连接超时时间(秒)
# cursorclass=pymysql.cursors.DictCursor # 可以在连接时全局指定默认游标类型
# ssl={'ca': '/path/to/ca.pem'} # SSL连接配置示例,需要提供正确的证书路径
)
print(f"PyMySQL: 成功连接到数据库 '{
db_name}' 在服务器 {
db_host}:{
db_port}")
# PyMySQL 连接对象没有直接的 get_server_info() 方法,可以通过查询获取
with connection.cursor() as cursor: # 使用 with 管理游标
cursor.execute("SELECT VERSION()") # 执行查询服务器版本的SQL
server_version = cursor.fetchone() # 获取查询结果
if server_version:
print(f"PyMySQL: MySQL 服务器版本: {
server_version[0]}") # server_version 是一个元组
return connection # 返回连接对象
except pymysql.Error as err: # 捕获 pymysql 的错误
# PyMySQL 的错误代码和消息格式可能与 mysql.connector 不同
# err.args[0] 通常是错误码, err.args[1] 是错误消息
error_code = err.args[0] if len(err.args) > 0 else "N/A"
error_message = err.args[1] if len(err.args) > 1 else str(err)
print(f"PyMySQL: 连接数据库 '{
db_name}' 失败。错误码: {
error_code}, 消息: {
error_message}")
return None # 连接失败返回 None
# --- 主程序逻辑 (PyMySQL) ---
if __name__ == "__main__":
# 从环境变量读取配置,或者直接在此处设置 (不推荐用于生产)
DB_HOST_PY = os.getenv("PYMYSQL_DB_HOST", "localhost")
DB_USER_PY = os.getenv("PYMYSQL_DB_USER", "your_mysql_user")
DB_PASSWORD_PY = os.getenv("PYMYSQL_DB_PASSWORD", "your_mysql_password")
DB_NAME_PY = os.getenv("PYMYSQL_DB_NAME", "your_database_name")
DB_PORT_PY = int(os.getenv("PYMYSQL_DB_PORT", 3306)) # 端口通常是整数
print("--- PyMySQL 连接测试 ---")
pymysql_conn = pymysql_connect_to_db(DB_HOST_PY, DB_USER_PY, DB_PASSWORD_PY, DB_NAME_PY, DB_PORT_PY)
if pymysql_conn:
print(f"PyMySQL: 连接对象: {
pymysql_conn}")
# 在这里进行后续操作
pymysql_conn.close() # 操作完成后关闭连接
print(f"PyMySQL: 数据库 '{
DB_NAME_PY}' 连接已关闭。")
代码解释:
import pymysql
和 import pymysql.cursors
: 导入必要的 PyMySQL
模块。pymysql.cursors
包含了不同类型的游标类,如 DictCursor
。
pymysql.connect(...)
: 用于建立连接。
核心参数 host
, user
, password
, database
, port
与 mysql-connector-python
类似。
charset='utf8mb4'
: 显式指定连接字符集,这是一个非常好的实践,可以避免很多潜在的字符编码问题。
connect_timeout
: 连接超时设置。
cursorclass=pymysql.cursors.DictCursor
: 这是一个非常有用的选项。如果在连接时设置了 cursorclass
,那么通过此连接创建的所有游标默认都会是指定的类型(这里是字典游标),无需在每次 connection.cursor()
时再指定。
ssl
: 用于配置SSL连接,传入一个字典包含CA证书、客户端证书/密钥等路径。
获取服务器版本: PyMySQL
连接对象没有像 mysql-connector-python
那样的 get_server_info()
方法。通常需要执行 SELECT VERSION()
SQL查询来获取。
错误处理: PyMySQL
的错误对象 err
通常通过 err.args
属性来访问错误码和消息。err.args[0]
是错误码,err.args[1]
是错误消息。这与 mysql-connector-python
的 err.errno
和 err.msg
属性不同。
环境变量: 示例中演示了从环境变量读取数据库凭证,这是比硬编码更安全的方式。
connection.close()
: 同样,操作完成后必须关闭连接。
4.2.3 创建游标对象 (PyMySQL)
PyMySQL
的游标创建与DB-API规范一致。
# (接上文 pymysql_conn 成功建立后)
# if pymysql_conn:
# try:
# # 方式1: 创建默认游标 (返回元组)
# default_cursor_py = pymysql_conn.cursor()
# print("PyMySQL: 默认游标已创建 (返回元组)。")
# default_cursor_py.close()
#
# # 方式2: 创建字典游标
# dict_cursor_py = pymysql_conn.cursor(pymysql.cursors.DictCursor) # 传入游标类
# print("PyMySQL: 字典游标已创建 (返回字典)。")
# dict_cursor_py.close()
#
# # 方式3: 如果连接时已指定 cursorclass=pymysql.cursors.DictCursor
# # dict_cursor_via_class = pymysql_conn.cursor() # 这将直接是一个字典游标
# # print("PyMySQL: 通过连接时指定的cursorclass创建的字典游标。")
# # dict_cursor_via_class.close()
#
# # PyMySQL 还支持其他游标类型,如 SSCursor, SSDictCursor (服务器端游标,用于处理非常大的结果集)
# # ss_cursor_py = pymysql_conn.cursor(pymysql.cursors.SSCursor)
# # print("PyMySQL: 服务器端流式游标 (SSCursor) 已创建。")
# # ss_cursor_py.close()
#
# except pymysql.Error as err:
# print(f"PyMySQL: 创建游标失败: {err.args[1] if len(err.args) > 1 else err}")
# finally:
# if pymysql_conn:
# pymysql_conn.close()
代码解释:
pymysql_conn.cursor()
: 创建一个默认游标,执行查询后返回的每一行是元组。
pymysql_conn.cursor(pymysql.cursors.DictCursor)
: 创建一个字典游标。DictCursor
是 pymysql.cursors
模块中的一个类。
服务器端游标 (SSCursor
, SSDictCursor
):
SSCursor
: “Server-Side Cursor” 的缩写。这是一个非缓冲的游标,它不会一次性将所有结果都下载到客户端。相反,数据在迭代时逐行(或按 arraysize
批量)从服务器流式传输到客户端。
SSDictCursor
: 功能同 SSCursor
,但返回的是字典。
适用场景: 处理可能非常巨大的结果集,无法一次性装入客户端内存。
注意事项: 当使用服务器端游标时,在迭代完所有结果或显式关闭游标之前,连接通常会保持繁忙状态,可能无法用于执行其他查询(除非MySQL服务器和协议版本支持MARS – Multiple Active Result Sets,但这不是PyMySQL服务器端游标的典型用法)。
4.2.4 执行 SELECT
查询并获取结果 (PyMySQL)
查询和获取结果的API与 mysql-connector-python
非常相似,因为它们都遵循DB-API 2.0。
# 使用之前定义的 create_sample_employees_table 函数来准备数据
# (确保已替换 pymysql_connect_to_db 中的连接参数,并且数据库和用户有效)
def pymysql_query_all_employees(connection):
"""使用 PyMySQL 查询所有员工信息"""
print("
--- PyMySQL: 查询所有员工 ---")
try:
# 使用 with 语句管理游标,自动关闭
with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 创建字典游标
query = "SELECT emp_no, first_name, last_name, hire_date, salary FROM employees"
cursor.execute(query) # 执行查询
print(f"PyMySQL: 查询执行完毕。受影响行数 (cursor.rowcount): {
cursor.rowcount}") # rowcount对SELECT行为可能不一致
print("PyMySQL: 列描述:")
if cursor.description: # 确保description不为None
for col_desc in cursor.description:
print(col_desc) # (name, type_code, display_size, internal_size, precision, scale, null_ok)
print("
PyMySQL: 员工列表 (迭代游标):")
count = 0
for row in cursor: # 直接迭代字典游标
count += 1
print(f" ID: {
row['emp_no']}, Name: {
row['first_name']} {
row['last_name']}, "
f"Hired: {
row['hire_date']}, Salary: {
row['salary']}")
if count == 0:
print(" PyMySQL: 没有找到员工数据。")
except pymysql.Error as err:
print(f"PyMySQL: 查询员工失败: {
err.args[1] if len(err.args) > 1 else err}")
def pymysql_query_employees_with_condition(connection, min_salary):
"""使用 PyMySQL 查询薪水高于指定值的员工 (参数化)"""
print(f"
--- PyMySQL: 查询薪水高于 {
min_salary} 的员工 ---")
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
query = """
SELECT emp_no, first_name, last_name, salary
FROM employees
WHERE salary > %s
ORDER BY salary DESC
"""
# PyMySQL 使用 %s 作为占位符,参数以元组形式传递
cursor.execute(query, (min_salary,)) # 执行参数化查询
print("PyMySQL: 符合条件的员工:")
fetched_rows = cursor.fetchall() # 获取所有结果
if not fetched_rows:
print(f" PyMySQL: 没有找到薪水高于 {
min_salary} 的员工。")
else:
for row in fetched_rows:
print(f" ID: {
row['emp_no']}, Name: {
row['first_name']} {
row['last_name']}, Salary: {
row['salary']}")
except pymysql.Error as err:
print(f"PyMySQL: 条件查询员工失败: {
err.args[1] if len(err.args) > 1 else err}")
# --- 主程序逻辑 (PyMySQL SELECT) ---
if __name__ == "__main__":
# (前面的 PyMySQL 连接参数设置)
DB_HOST_PY = os.getenv("PYMYSQL_DB_HOST", "localhost")
DB_USER_PY = os.getenv("PYMYSQL_DB_USER", "your_mysql_user")
DB_PASSWORD_PY = os.getenv("PYMYSQL_DB_PASSWORD", "your_mysql_password")
DB_NAME_PY = os.getenv("PYMYSQL_DB_NAME", "your_database_name")
DB_PORT_PY = int(os.getenv("PYMYSQL_DB_PORT", 3306))
pymysql_conn = pymysql_connect_to_db(DB_HOST_PY, DB_USER_PY, DB_PASSWORD_PY, DB_NAME_PY, DB_PORT_PY)
if pymysql_conn:
# 确保示例表存在并有数据 (可以使用之前为 mysql-connector-python 编写的建表函数,
# 但需要传入 pymysql_conn 对象,并且该函数内部的 commit/rollback 也需要是 pymysql_conn 的方法)
# 为保持独立性,这里假设表已通过其他方式创建或通过mysql-connector-python示例创建。
# 或者我们也可以写一个 PyMySQL 版本的建表函数:
def pymysql_create_sample_employees_table(connection):
try:
with connection.cursor() as cursor:
table_description = """
CREATE TABLE IF NOT EXISTS employees (
emp_no INT AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL, hire_date DATE, salary DECIMAL(10, 2)
) ENGINE=InnoDB CHARACTER SET=utf8mb4
"""
cursor.execute(table_description)
cursor.execute("SELECT COUNT(*) FROM employees")
if cursor.fetchone()[0] == 0:
insert_query = """
INSERT INTO employees (first_name, last_name, hire_date, salary) VALUES
('John', 'Doe', '2020-01-15', 60000.00), ('Jane', 'Smith', '2019-03-01', 75000.00),
('Alice', 'Johnson', '2021-07-22', 80000.00), ('Bob', 'Williams', '2018-06-10', 50000.00),
('Charlie', 'Brown', '2020-01-15', 62000.00)
"""
cursor.execute(insert_query)
connection.commit() # PyMySQL 也需要 commit DML
print(f"PyMySQL: 示例数据已插入 (受影响行数: {
cursor.rowcount})。")
else:
print("PyMySQL: 表 'employees' 已存在数据,未重新插入。")
except pymysql.Error as err:
print(f"PyMySQL: 创建或填充 'employees' 表失败: {
err.args[1] if len(err.args) > 1 else err}")
pymysql_create_sample_employees_table(pymysql_conn)
pymysql_query_all_employees(pymysql_conn)
pymysql_query_employees_with_condition(pymysql_conn, 65000.00)
pymysql_conn.close()
print(f"
PyMySQL: 数据库 '{
DB_NAME_PY}' SELECT演示完成,连接已关闭。")
代码解释:
with connection.cursor(...) as cursor:
: 同样,使用 with
语句管理游标是推荐的做法,确保游标在使用完毕后自动关闭。
参数化查询: PyMySQL
也使用 %s
作为参数占位符,并通过 cursor.execute(query, params_tuple)
的方式传递参数,有效防止SQL注入。
cursor.description
: 与 mysql-connector-python
一样,提供结果集的列信息。
cursor.fetchall()
, cursor.fetchone()
, 直接迭代游标:行为与DB-API 2.0规范一致,与 mysql-connector-python
的使用方式相同。
cursor.rowcount
: 对于 SELECT
语句,PyMySQL
的 rowcount
通常返回匹配 WHERE
子句的行数(如果是非缓冲游标,可能在所有行被读取前是-1或不准确;如果是缓冲游标,会是准确的)。尽管如此,依赖迭代或 fetchall()
的结果来判断是否有数据是更可靠的做法。
4.2.5 执行 INSERT
, UPDATE
, DELETE
操作 (DML with PyMySQL)
DML 操作在 PyMySQL
中的处理方式与 mysql-connector-python
非常相似,关键在于参数化查询和事务提交。
def pymysql_insert_new_employee(connection, first_name, last_name, hire_date, salary):
"""使用 PyMySQL 插入一个新员工记录 (参数化)"""
print(f"
--- PyMySQL: 插入新员工: {
first_name} {
last_name} ---")
new_emp_no_py = None
try:
with connection.cursor() as cursor: # 使用 with 管理游标
insert_query = """
INSERT INTO employees (first_name, last_name, hire_date, salary)
VALUES (%s, %s, %s, %s)
"""
employee_data = (first_name, last_name, hire_date, salary)
cursor.execute(insert_query, employee_data)
# PyMySQL 连接对象默认 autocommit 可能是 False (取决于服务器和连接配置)
# 因此,显式 commit 是必须的
connection.commit() # !! 提交事务以保存更改 !!
new_emp_no_py = cursor.lastrowid # 获取最后插入行的 AUTO_INCREMENT ID
print(f"PyMySQL: 员工 '{
first_name} {
last_name}' 成功插入。受影响行数: {
cursor.rowcount}, 新员工ID: {
new_emp_no_py}")
except pymysql.Error as err:
error_msg = err.args[1] if len(err.args) > 1 else str(err)
print(f"PyMySQL: 插入员工失败: {
error_msg}")
try:
if connection: # 检查连接对象是否存在且可能仍然可用
print("PyMySQL: 尝试回滚事务...")
connection.rollback() # 回滚未提交的更改
except pymysql.Error as rb_err: # 回滚本身也可能失败
print(f"PyMySQL: 回滚事务失败: {
rb_err.args[1] if len(rb_err.args) > 1 else rb_err}")
return new_emp_no_py
def pymysql_update_employee_salary(connection, emp_no, new_salary):
"""使用 PyMySQL 更新指定员工的薪水 (参数化)"""
print(f"
--- PyMySQL: 更新员工 ID {
emp_no} 的薪水为 {
new_salary} ---")
try:
with connection.cursor() as cursor:
update_query = "UPDATE employees SET salary = %s WHERE emp_no = %s"
cursor.execute(update_query, (new_salary, emp_no))
connection.commit()
if cursor.rowcount > 0:
print(f"PyMySQL: 员工 ID {
emp_no} 的薪水已成功更新。受影响行数: {
cursor.rowcount}")
else:
print(f"PyMySQL: 未找到员工 ID {
emp_no},或薪水未改变。受影响行数: {
cursor.rowcount}")
except pymysql.Error as err:
error_msg = err.args[1] if len(err.args) > 1 else str(err)
print(f"PyMySQL: 更新员工薪水失败: {
error_msg}")
try:
if connection: connection.rollback()
except pymysql.Error as rb_err:
print(f"PyMySQL: 回滚事务失败: {
rb_err.args[1] if len(rb_err.args) > 1 else rb_err}")
def pymysql_delete_employee(connection, emp_no):
"""使用 PyMySQL 删除指定员工 (参数化)"""
print(f"
--- PyMySQL: 删除员工 ID {
emp_no} ---")
try:
with connection.cursor() as cursor:
delete_query = "DELETE FROM employees WHERE emp_no = %s"
cursor.execute(delete_query, (emp_no,))
connection.commit()
if cursor.rowcount > 0:
print(f"PyMySQL: 员工 ID {
emp_no} 已成功删除。受影响行数: {
cursor.rowcount}")
else:
print(f"PyMySQL: 未找到员工 ID {
emp_no} 进行删除。受影响行数: {
cursor.rowcount}")
except pymysql.Error as err:
error_msg = err.args[1] if len(err.args) > 1 else str(err)
print(f"PyMySQL: 删除员工失败: {
error_msg}")
try:
if connection: connection.rollback()
except pymysql.Error as rb_err:
print(f"PyMySQL: 回滚事务失败: {
rb_err.args[1] if len(rb_err.args) > 1 else rb_err}")
# --- 主程序逻辑 (PyMySQL DML) ---
if __name__ == "__main__":
# (前面的 PyMySQL 连接参数设置)
DB_HOST_PY = os.getenv("PYMYSQL_DB_HOST", "localhost")
DB_USER_PY = os.getenv("PYMYSQL_DB_USER", "your_mysql_user")
DB_PASSWORD_PY = os.getenv("PYMYSQL_DB_PASSWORD", "your_mysql_password")
DB_NAME_PY = os.getenv("PYMYSQL_DB_NAME", "your_database_name")
DB_PORT_PY = int(os.getenv("PYMYSQL_DB_PORT", 3306))
pymysql_conn = pymysql_connect_to_db(DB_HOST_PY, DB_USER_PY, DB_PASSWORD_PY, DB_NAME_PY, DB_PORT_PY)
if pymysql_conn:
# 确保表存在 (可以使用上面定义的 pymysql_create_sample_employees_table)
# pymysql_create_sample_employees_table(pymysql_conn)
# 执行DML操作
py_new_id = pymysql_insert_new_employee(pymysql_conn, "Sarah", "Miller", "2023-02-20", 72000.00)
if py_new_id:
pymysql_update_employee_salary(pymysql_conn, py_new_id, 73500.00)
# pymysql_query_all_employees(pymysql_conn) # 可用于验证
pymysql_delete_employee(pymysql_conn, py_new_id)
# pymysql_query_all_employees(pymysql_conn) # 可用于验证
else:
print("PyMySQL: 由于插入失败,后续的更新和删除操作未执行。")
pymysql_conn.close()
print(f"
PyMySQL: 数据库 '{
DB_NAME_PY}' DML操作演示完成,连接已关闭。")
代码解释:
事务处理: 与 mysql-connector-python
一样,PyMySQL
对于DML操作也需要显式的 connection.commit()
。如果在发生错误时需要撤销更改,则调用 connection.rollback()
。
cursor.lastrowid
: PyMySQL
游标对象也提供 lastrowid
属性来获取最后插入的自增ID。
cursor.rowcount
: 对于DML操作,返回受影响的行数。
with connection.cursor() as cursor:
: 再次强调,这是管理游标的最佳实践。
4.2.6 PyMySQL
与 mysql-connector-python
的主要区别总结 (基础操作层面)
模块导入: import pymysql
vs import mysql.connector
。
游标类型指定:
PyMySQL
: conn.cursor(pymysql.cursors.DictCursor)
或在连接时 cursorclass=pymysql.cursors.DictCursor
。
mysql-connector-python
: conn.cursor(dictionary=True, buffered=True, named_tuple=True)
等参数。
错误对象属性:
PyMySQL
: err.args[0]
(code), err.args[1]
(message)。
mysql-connector-python
: err.errno
, err.msg
。
获取服务器信息:
PyMySQL
: 通常需要执行 SELECT VERSION()
。
mysql-connector-python
: conn.get_server_info()
。
SSL 配置: 参数名和结构略有不同。PyMySQL
通常使用 ssl={'ca': ...}
字典。
服务器端游标: PyMySQL
提供了明确的 SSCursor
和 SSDictCursor
。mysql-connector-python
的默认游标在某些模式下可能表现出类似流式的行为,但不如 PyMySQL
的服务器端游标那样显式。
尽管存在这些差异,但由于两者都遵循DB-API 2.0,核心的 execute()
, fetchone()
, fetchall()
, commit()
, rollback()
, close()
等方法的行为和用法基本一致。
4.3 mysqlclient
简介与基础操作对比
mysqlclient
是 MySQLdb
的一个分支,是一个C扩展模块,通常被认为性能较高。其API也遵循DB-API 2.0。
安装 (回顾): 需要C编译器和MySQL开发库。pip install mysqlclient
。
连接与使用 (与PyMySQL非常相似):
# import MySQLdb # 这是 mysqlclient 导入时使用的历史名称
# # 或者更现代的用法中,如果你安装了 mysqlclient,有时也可以直接通过
# # import _mysql # 底层C模块,但不推荐直接使用
# # 最常见的是通过 ORM 或框架间接使用,它们会处理导入
# # 如果直接使用,通常是:
# # from MySQLdb import connect, cursors # 假设 MySQLdb 仍然是可用的导入路径
# # 实际上,安装了 mysqlclient 后,它通常会作为 MySQLdb 的替代品。
# # 因此,很多依赖 MySQLdb 的旧代码可以直接运行。
# # 如果你的环境中有 mysqlclient 并且它是 Python 3 的兼容版本:
# try:
# import MySQLdb as DatabaseModule # 尝试导入 MySQLdb (mysqlclient 会扮演这个角色)
# from MySQLdb.cursors import DictCursor # 导入字典游标
# print("mysqlclient (as MySQLdb) 模块已加载。")
# except ImportError:
# print("无法导入 MySQLdb (mysqlclient)。请确保已正确安装。")
# DatabaseModule = None # 设置为None,以便后续代码可以检查
# def mysqlclient_connect_to_db(db_host, db_user, db_password, db_name, db_port=3306):
# if not DatabaseModule:
# return None
# try:
# connection = DatabaseModule.connect( # 使用 MySQLdb.connect()
# host=db_host,
# user=db_user,
# passwd=db_password, # 注意: 参数名是 passwd 而不是 password
# db=db_name, # 注意: 参数名是 db 而不是 database
# port=db_port,
# charset='utf8mb4'
# # cursorclass=DictCursor # 也可以在连接时指定
# )
# print(f"mysqlclient: 成功连接到数据库 '{db_name}' 在服务器 {db_host}:{db_port}")
# # 获取版本信息
# # server_info = connection.get_server_info() # mysqlclient 连接对象有 get_server_info()
# # print(f"mysqlclient: MySQL 服务器版本: {server_info}")
# return connection
# except DatabaseModule.Error as err:
# # 错误处理方式与 PyMySQL 类似,通常通过 err.args
# error_code = err.args[0] if len(err.args) > 0 else "N/A"
# error_message = err.args[1] if len(err.args) > 1 else str(err)
# print(f"mysqlclient: 连接数据库 '{db_name}' 失败。错误码: {error_code}, 消息: {error_message}")
# return None
# if __name__ == "__main__":
# if DatabaseModule:
# DB_HOST_MC = os.getenv("MYSQLCLIENT_DB_HOST", "localhost")
# DB_USER_MC = os.getenv("MYSQLCLIENT_DB_USER", "your_mysql_user")
# DB_PASSWORD_MC = os.getenv("MYSQLCLIENT_DB_PASSWORD", "your_mysql_password") # passwd
# DB_NAME_MC = os.getenv("MYSQLCLIENT_DB_NAME", "your_database_name") # db
# DB_PORT_MC = int(os.getenv("MYSQLCLIENT_DB_PORT", 3306))
#
# print("
--- mysqlclient 连接测试 ---")
# mc_conn = mysqlclient_connect_to_db(DB_HOST_MC, DB_USER_MC, DB_PASSWORD_MC, DB_NAME_MC, DB_PORT_MC)
#
# if mc_conn:
# print(f"mysqlclient: 连接对象: {mc_conn}")
# try:
# with mc_conn.cursor(DictCursor) as cursor: # 使用字典游标
# cursor.execute("SELECT * FROM employees WHERE salary > %s LIMIT %s", (55000, 2)) # 参数化查询
# print("mysqlclient: 查询薪水 > 55000 (前2条):")
# for row in cursor:
# print(row)
# mc_conn.commit() # 如果有DML则需要
# except DatabaseModule.Error as err:
# print(f"mysqlclient: 查询失败: {err.args[1] if len(err.args) > 1 else err}")
# finally:
# mc_conn.close()
# print(f"mysqlclient: 数据库 '{DB_NAME_MC}' 连接已关闭。")
# else:
# print("mysqlclient 模块未加载,跳过相关测试。")
mysqlclient
(MySQLdb) 的关键区别点:
导入: 通常 import MySQLdb
,mysqlclient
会安装为这个名字的兼容替代品。
连接参数:
密码参数是 passwd
(不是 password
)。
数据库名参数是 db
(不是 database
)。
游标: from MySQLdb.cursors import DictCursor
(或其他游标类型)。
错误对象: MySQLdb.Error
,错误信息也通常在 err.args
中。
get_server_info()
: 连接对象通常有此方法。
由于 mysqlclient
是C扩展,其安装和某些特定行为可能更依赖于底层的 MySQL C API 版本。但在遵循DB-API 2.0的核心操作上,其Python层面的使用方式与其他连接器大同小异。
我们已经覆盖了三种主流Python MySQL连接器的基本连接和操作。这为后续深入探讨更高级的主题,如事务管理、错误处理、数据类型映射、性能优化、连接池等,奠定了坚实的基础。
五、参数化查询与SQL注入防范的深层剖析 (In-depth Analysis of Parameterized Queries and SQL Injection Prevention)
在前面的基础操作中,我们已经强调了使用参数化查询(如 cursor.execute(query, params)
)来防止SQL注入。现在,我们将更深入地探讨其内部机制、不同参数样式的细微差别、以及在复杂场景下如何正确和安全地构建查询。
5.1 SQL 注入的原理与危害 (Principle and Dangers of SQL Injection)
在详细讨论防范措施之前,必须深刻理解SQL注入是如何发生的以及它能造成多大的破坏。
原理:
SQL注入发生在当应用程序将用户提供(或外部来源)的不可信数据直接拼接或不当嵌入到SQL查询语句中,而没有进行充分的验证和转义时。攻击者可以通过构造特殊的输入,改变原始SQL语句的语义,从而执行恶意的数据库操作。
一个经典的错误示例 (绝对不要这样做!):
# !!! 这是一个非常不安全的示例,仅用于演示SQL注入原理 !!!
def unsafe_get_user_by_username(connection, username_input):
# 假设 username_input 来自用户在网页表单中的输入
try:
with connection.cursor(dictionary=True) as cursor:
# 直接将用户输入拼接到SQL语句中,极度危险!
sql_query = f"SELECT user_id, username, email FROM users WHERE username = '{
username_input}'" # SQL语句拼接
print(f" [不安全] 执行的SQL: {
sql_query}") # 打印执行的SQL (用于观察)
cursor.execute(sql_query) # 执行拼接后的SQL
user = cursor.fetchone()
if user:
print(f" [不安全] 找到用户: {
user}")
return user
else:
print(f" [不安全] 未找到用户: {
username_input}")
return None
except Exception as e: # 简化错误处理
print(f" [不安全] 查询出错: {
e}")
return None
# 攻击场景:
# 1. 正常输入: username_input = "alice"
# SQL会是: SELECT user_id, username, email FROM users WHERE username = 'alice' (正常)
# 2. 恶意输入1 (绕过认证): username_input = "admin' -- "
# SQL会是: SELECT user_id, username, email FROM users WHERE username = 'admin' -- '
# 解释:
# - 'admin' 被注入。
# - ' (单引号) 闭合了前面 username = ' 的单引号。
# - -- (两个连字符加空格) 是SQL中的行注释符,它会注释掉原始SQL语句中该符号之后的所有内容。
# - 结果是,WHERE子句变成了 WHERE username = 'admin',原始SQL中可能存在的其他条件或末尾的单引号被注释掉了。
# - 如果存在名为 'admin' 的用户,即使密码错误,也可能登录成功(取决于后续逻辑)。
# 3. 恶意输入2 (获取所有用户数据): username_input = "whatever' OR '1'='1"
# SQL会是: SELECT user_id, username, email FROM users WHERE username = 'whatever' OR '1'='1'
# 解释:
# - WHERE子句变成了 username = 'whatever' OR '1'='1'。
# - 因为 '1'='1' 永远为真 (TRUE),所以整个OR条件也永远为真。
# - 这将导致查询返回 `users` 表中的所有行,无视 `username` 的原始条件。
# 4. 恶意输入3 (执行其他命令,如删表 - 如果权限允许且查询结构允许拼接):
# username_input = "nobody'; DROP TABLE users; -- "
# SQL会是: SELECT user_id, username, email FROM users WHERE username = 'nobody'; DROP TABLE users; -- '
# 解释:
# - 通过分号 (;) 注入了第二条SQL命令 `DROP TABLE users`。
# - 如果数据库连接允许执行多语句 (multi-statement queries,某些连接器或配置下可能允许),
# 并且执行该查询的数据库用户有 `DROP TABLE` 权限,这将导致 `users` 表被删除。
# (假设 pymysql_conn 已连接,并且 users 表已创建并包含数据)
# if __name__ == "__main__":
# if pymysql_conn:
# print("
--- SQL注入攻击演示 ---")
# # 准备users表 (简单版本)
# with pymysql_conn.cursor() as cur:
# cur.execute("DROP TABLE IF EXISTS users")
# cur.execute("CREATE TABLE users (user_id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE, email VARCHAR(100), password_hash VARCHAR(255))")
# cur.execute("INSERT INTO users (username, email, password_hash) VALUES ('alice', 'alice@example.com', 'hash1'), ('bob', 'bob@example.com', 'hash2'), ('admin_user', 'admin@example.com', 'admin_hash')")
# pymysql_conn.commit()
# print("
尝试正常输入:")
# unsafe_get_user_by_username(pymysql_conn, "alice")
# print("
尝试恶意输入1 (绕过):")
# unsafe_get_user_by_username(pymysql_conn, "admin_user' -- ") # 尝试获取admin_user的信息
# print("
尝试恶意输入2 (获取所有):")
# unsafe_get_user_by_username(pymysql_conn, "x' OR '1'='1") # 获取所有用户信息
# # 恶意输入3 (删表) 通常需要特定条件才能成功,这里不直接运行以防意外
# # print("
尝试恶意输入3 (删表 - 危险,不执行):")
# # username_for_drop = "y'; DROP TABLE users; -- "
# # print(f" 如果执行: SELECT ... WHERE username = '{username_for_drop}'")
# if pymysql_conn: pymysql_conn.close()
危害:
数据泄露: 攻击者可以获取未授权访问的敏感数据(如用户凭证、信用卡信息、个人隐私)。
数据篡改: 攻击者可以修改或删除数据库中的数据。
拒绝服务 (DoS): 通过执行耗尽资源的查询或破坏数据库结构。
服务器接管: 在某些极端情况下,如果数据库服务器配置不当或存在漏洞,SQL注入可能导致攻击者在数据库服务器上执行任意操作系统命令。
破坏数据完整性和一致性。
声誉损害和法律责任。
5.2 参数化查询的内部工作机制 (How Parameterized Queries Work Internally)
参数化查询(也称为预准备语句 – Prepared Statements,尽管在Python DB-API层面不总是显式地分两步)是防御SQL注入的基石。其核心思想是将SQL代码结构与用户提供的数据严格分离。
SQL模板发送:
当使用 cursor.execute("SELECT ... WHERE col = %s", (value,))
时,Python连接器首先将包含占位符(如 %s
)的SQL模板(或称查询结构)发送到MySQL服务器。
例如,发送的是类似 "SELECT user_id, username FROM users WHERE username = ?"
(服务器端可能会将 %s
转换为 ?
或其内部占位符)。
服务器端解析和编译SQL模板:
MySQL服务器接收到这个SQL模板后,对其进行解析、语法检查和生成执行计划的准备工作。此时,服务器知道这是一个期望接收一个参数的查询结构,但还不知道参数的具体值是什么。
参数值独立发送:
然后,连接器将参数元组 (value,)
中的数据独立地、作为数据本身发送到MySQL服务器。这些数据不会被服务器当作SQL代码的一部分来解析。
服务器端绑定参数并执行:
MySQL服务器接收到参数值后,将这些值安全地绑定到之前已准备好的SQL模板的占位符位置,然后执行完整的查询。由于参数值是在SQL结构确定之后才被绑定的,并且是以数据形式处理,所以即使参数值中包含SQL特殊字符(如 '
, ;
, --
),它们也只会被当作普通的字符串数据(或其他数据类型)来对待,而不会改变SQL语句的原始结构。
为什么这能防止SQL注入?
因为攻击者输入的恶意字符(如 ' OR '1'='1
)是在第3步作为数据发送的。当服务器在第4步绑定参数时,它会将整个 "' OR '1'='1"
字符串视为 username
列应该匹配的值,而不是将其解析为SQL逻辑。
所以,查询会变成类似(概念上):
SELECT ... WHERE username = ('' OR '1'='1')
(引号被正确处理为字符串的一部分)
这通常不会匹配任何合法的用户名,从而阻止了攻击。
5.3 不同连接器的参数样式回顾与实践
mysql-connector-python
:
默认支持 format
(%s
) 和 pyformat
(%(name)s
) 风格。
paramstyle
属性值为 'pyformat'
。
尽管其 paramstyle
是 pyformat
,但它也广泛接受 %s
形式的占位符,并且这是其文档和示例中更常见的用法。
# 使用 mysql-connector-python
# import mysql.connector
# config = {'user': 'u', 'password': 'p', 'host': 'h', 'database': 'd'}
# cnx = mysql.connector.connect(**config)
# cursor = cnx.cursor()
# 使用 %s 风格 (推荐)
query_s = "SELECT name FROM products WHERE category_id = %s AND price < %s"
params_s = (10, 50.00) # 元组形式的参数
# cursor.execute(query_s, params_s)
# 使用 %(name)s 风格 (pyformat)
query_named = "UPDATE orders SET status = %(new_status)s WHERE order_id = %(order_id)s"
params_named = {
'new_status': 'SHIPPED', 'order_id': 123} # 字典形式的参数
# cursor.execute(query_named, params_named)
# cursor.close()
# cnx.close()
PyMySQL
:
也主要支持 format
(%s
) 和 pyformat
(%(name)s
) 风格。
paramstyle
属性通常也是 'pyformat'
或 'format'
(具体可能取决于版本或内部实现)。
使用 %s
是最常见的方式。
# 使用 PyMySQL
# import pymysql
# conn_py = pymysql.connect(host='h', user='u', password='p', database='d')
# cursor_py = conn_py.cursor()
# 使用 %s 风格
query_py_s = "DELETE FROM logs WHERE log_date < %s"
params_py_s = ('2023-01-01',) # 单个参数也要是元组
# cursor_py.execute(query_py_s, params_py_s)
# 使用 %(name)s 风格
query_py_named = "SELECT data FROM config WHERE section = %(section_name)s AND key_name = %(key)s"
params_py_named = {
'section_name': 'network', 'key': 'timeout'}
# cursor_py.execute(query_py_named, params_py_named)
# conn_py.commit() # 如果是DML
# cursor_py.close()
# conn_py.close()
mysqlclient
(MySQLdb):
传统上,MySQLdb
的 paramstyle
是 format
。
因此,%s
是其标准的参数占位符。它通常不直接支持 pyformat
的 %(name)s
风格,除非你使用的版本或其包装库(如Django ORM)做了额外处理。
# 使用 mysqlclient (MySQLdb)
# import MySQLdb
# conn_mc = MySQLdb.connect(host='h', user='u', passwd='p', db='d')
# cursor_mc = conn_mc.cursor()
# 主要使用 %s 风格
query_mc_s = "INSERT INTO events (event_type, user_id, event_data) VALUES (%s, %s, %s)"
params_mc_s = ('login', 1001, '{"ip": "192.168.1.10"}')
# cursor_mc.execute(query_mc_s, params_mc_s)
# conn_mc.commit()
# cursor_mc.close()
# conn_mc.close()
企业级建议:
始终使用参数化: 这是最重要的一点。
优先选择 %s
风格: 它是所有主流Python MySQL连接器都良好支持的、最简单、最通用的参数化风格。
%(name)s
风格的适用场景: 当SQL语句较长,参数较多时,使用命名参数可以提高代码的可读性,减少因参数顺序错误导致的bug。但要确保你选择的连接器支持它。
不要信任任何外部输入: 即使是来自内部系统或看起来“安全”的来源的数据,如果它最终会成为SQL查询的一部分,也应该通过参数化传递,或者进行严格的白名单验证和类型检查。
5.4 处理 LIKE
子句中的参数化
当在 LIKE
子句中使用参数化时,通配符 (%
或 _
) 需要作为参数值的一部分传递,而不是在SQL模板中。
def search_products_by_name_safely(connection, search_term, connector_name="PyMySQL"):
"""安全地使用参数化查询 LIKE 子句搜索产品。"""
print(f"
--- {
connector_name}: 安全搜索产品名包含 '{
search_term}' ---")
try:
# 根据连接器类型选择合适的游标和错误处理 (简化示例,假设字典游标可用)
if connector_name == "mysql.connector":
cursor = connection.cursor(dictionary=True)
else: # PyMySQL or mysqlclient
cursor_class = getattr(pymysql.cursors if connector_name == "PyMySQL" else DatabaseModule.cursors, "DictCursor", None)
cursor = connection.cursor(cursor_class) if cursor_class else connection.cursor()
# 错误的做法 (直接拼接通配符到SQL模板 - 仍然有风险,虽然比完全不参数化好一点,但不推荐):
# query_unsafe_like = f"SELECT product_name, unit_price FROM products_example WHERE product_name LIKE '%{search_term}%'"
# cursor.execute(query_unsafe_like) # 不推荐!如果search_term包含特殊字符可能仍有问题
# 正确的做法: 将通配符作为参数值的一部分
# 模式1: 包含 (contains)
param_contains = f"%{
search_term}%" # 在Python代码中构建带通配符的参数值
query_contains = "SELECT product_id, product_name, unit_price FROM products_example WHERE product_name LIKE %s"
# 模式2: 开头是 (starts with)
param_startswith = f"{
search_term}%"
query_startswith = "SELECT product_id, product_name, unit_price FROM products_example WHERE product_name LIKE %s"
# 模式3: 结尾是 (ends with)
param_endswith = f"%{
search_term}"
query_endswith = "SELECT product_id, product_name, unit_price FROM products_example WHERE product_name LIKE %s"
print(f" 执行 '包含' 搜索 (参数: '{
param_contains}')")
cursor.execute(query_contains, (param_contains,))
results_contain = cursor.fetchall()
if results_contain:
for row in results_contain: print(f" {
row}")
else:
print(" 未找到匹配 '包含' 条件的产品。")
print(f"
执行 '开头是' 搜索 (参数: '{
param_startswith}')")
cursor.execute(query_startswith, (param_startswith,))
results_startswith = cursor.fetchall()
if results_startswith:
for row in results_startswith: print(f" {
row}")
else:
print(" 未找到匹配 '开头是' 条件的产品。")
except Exception as err: # 通用错误捕获 (具体连接器错误应分开处理)
print(f"{
connector_name}: LIKE 查询失败: {
err}")
finally:
if 'cursor' in locals() and cursor:
cursor.close()
# 准备 products_example 表 (如果不存在)
def setup_products_table_for_like(connection, connector_name="PyMySQL"):
try:
with connection.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS products_example")
cursor.execute("""
CREATE TABLE products_example (
product_id INT AUTO_INCREMENT PRIMARY KEY, product_name VARCHAR(100), unit_price DECIMAL(10,2)
) ENGINE=InnoDB CHARACTER SET=utf8mb4
""")
products_data = [
('Laptop Pro X1', 1200.00), ('Wireless Mouse G500', 45.00),
('Mechanical Keyboard K7', 150.00), ('USB-C Hub Pro', 60.00),
('Laptop Stand V2', 25.00), ('Pro Gaming Mousepad', 20.00)
]
insert_query = "INSERT INTO products_example (product_name, unit_price) VALUES (%s, %s)"
cursor.executemany(insert_query, products_data) # 批量插入
connection.commit()
print(f"{
connector_name}: 'products_example' 表已创建并填充。")
except Exception as e:
print(f"{
connector_name}: 设置 products_example 表失败: {
e}")
# if __name__ == "__main__":
# 假设 pymysql_conn 或 mysql_conn (来自 mysql.connector) 已连接
# (需要确保这些连接对象在使用前已经成功初始化)
# # 使用 PyMySQL 示例
# if pymysql_conn:
# setup_products_table_for_like(pymysql_conn, "PyMySQL")
# search_products_by_name_safely(pymysql_conn, "Pro", "PyMySQL")
# search_products_by_name_safely(pymysql_conn, "Mouse", "PyMySQL")
# if pymysql_conn: pymysql_conn.close()
# # 使用 mysql-connector-python 示例 (需要先建立 mysql_conn)
# # config_mc = {'user': 'u', 'password': 'p', 'host': 'h', 'database': 'd'}
# # mysql_conn_official = mysql.connector.connect(**config_mc)
# # if mysql_conn_official:
# # setup_products_table_for_like(mysql_conn_official, "mysql.connector")
# # search_products_by_name_safely(mysql_conn_official, "Laptop", "mysql.connector")
# # if mysql_conn_official: mysql_conn_official.close()
代码解释:
param_contains = f"%{search_term}%"
: 关键点。SQL的 LIKE
通配符 %
是在Python字符串中与用户输入 search_term
结合的。
query_contains = "... WHERE product_name LIKE %s"
: SQL模板本身只包含一个 %s
占位符。
cursor.execute(query_contains, (param_contains,))
: 将构造好的、包含通配符的字符串 param_contains
作为单个数据参数传递给数据库。数据库驱动程序会确保这个参数值被正确地转义和引用,即使 search_term
本身包含特殊字符(如单引号),也不会破坏SQL结构。
这种方法既安全,又能灵活地实现不同的 LIKE
匹配模式(包含、开头是、结尾是)。
5.5 处理 IN
子句中的参数化
IN
子句允许你匹配列值是否在一个给定的值列表中。当这个列表是动态的(例如,来自用户选择的多个ID),参数化会稍微复杂一些,因为SQL模板中的占位符数量需要与参数列表的长度匹配。
错误的做法 (直接拼接):
# ids_list = [1, 2, 3]
# query_unsafe_in = f"SELECT * FROM items WHERE item_id IN ({','.join(map(str, ids_list))})" # 非常危险
如果 ids_list
中的元素不是纯数字,或者可以被恶意构造,这会导致SQL注入。
安全的做法: 动态生成占位符。
def get_items_by_ids_safely(connection, item_ids_list, connector_name="PyMySQL"):
"""安全地使用参数化查询 IN 子句获取项目。"""
if not item_ids_list: # 如果ID列表为空
print(f"
--- {
connector_name}: ID列表为空,无需查询 ---")
return []
print(f"
--- {
connector_name}: 安全查询 item_ids IN {
item_ids_list} ---")
results = []
try:
# 根据连接器类型选择合适的游标和错误处理 (简化示例)
if connector_name == "mysql.connector":
cursor = connection.cursor(dictionary=True)
else:
cursor_class = getattr(pymysql.cursors if connector_name == "PyMySQL" else DatabaseModule.cursors, "DictCursor", None)
cursor = connection.cursor(cursor_class) if cursor_class else connection.cursor()
# 1. 为 IN 子句动态生成占位符字符串
# 例如,如果 item_ids_list 是 [1, 2, 3],那么 placeholders 应该是 "%s, %s, %s"
placeholders = ', '.join(['%s'] * len(item_ids_list)) # 生成逗号分隔的 %s 占位符
# 2. 构建完整的SQL查询模板
query = f"SELECT item_id, item_name FROM items WHERE item_id IN ({
placeholders})" # 将占位符嵌入查询
# 注意: 这里使用f-string嵌入placeholders是安全的,因为placeholders是我们自己生成的固定 "%s" 字符串,
# 不是来自用户输入。最终的 item_ids_list 才是作为参数传递的。
print(f" 执行的SQL模板 (占位符): {
query}")
print(f" 传递的参数: {
tuple(item_ids_list)}") # 参数必须是元组或列表
# 3. 执行查询,将 item_ids_list (转换为元组) 作为参数传递
cursor.execute(query, tuple(item_ids_list)) # 将ID列表转换为元组作为参数传递
fetched_items = cursor.fetchall()
if fetched_items:
print(" 找到的项目:")
for item in fetched_items:
results.append(item)
print(f" {
item}")
else:
print(" 未找到任何匹配ID的项目。")
except Exception as err:
print(f"{
connector_name}: IN 子句查询失败: {
err}")
finally:
if 'cursor' in locals() and cursor:
cursor.close()
return results
# 准备 items 表
def setup_items_table(connection, connector_name="PyMySQL"):
try:
with connection.cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS items")
cursor.execute("""
CREATE TABLE items (item_id INT PRIMARY KEY, item_name VARCHAR(100))
ENGINE=InnoDB CHARACTER SET=utf8mb4
""")
items_data = [(1, 'Apple'), (2, 'Banana'), (3, 'Cherry'), (4, 'Date'), (5, 'Elderberry')]
insert_query = "INSERT INTO items (item_id, item_name) VALUES (%s, %s)"
cursor.executemany(insert_query, items_data)
connection.commit()
print(f"{
connector_name}: 'items' 表已创建并填充。")
except Exception as e:
print(f"{
connector_name}: 设置 items 表失败: {
e}")
# if __name__ == "__main__":
# 假设 pymysql_conn 或 mysql_conn 已连接
# # 使用 PyMySQL 示例
# if pymysql_conn:
# setup_items_table(pymysql_conn, "PyMySQL")
# get_items_by_ids_safely(pymysql_conn, [1, 3, 5], "PyMySQL")
# get_items_by_ids_safely(pymysql_conn, [2, 4], "PyMySQL")
# get_items_by_ids_safely(pymysql_conn, [99], "PyMySQL") # 测试不存在的ID
# get_items_by_ids_safely(pymysql_conn, [], "PyMySQL") # 测试空列表
# if pymysql_conn: pymysql_conn.close()
# # 使用 mysql-connector-python 示例
# # if mysql_conn_official:
# # setup_items_table(mysql_conn_official, "mysql.connector")
# # get_items_by_ids_safely(mysql_conn_official, [1, 2], "mysql.connector")
# # if mysql_conn_official: mysql_conn_official.close()
代码解释:
placeholders = ', '.join(['%s'] * len(item_ids_list))
:
['%s'] * len(item_ids_list)
: 创建一个包含 len(item_ids_list)
个 '%s'
字符串的列表。例如,如果 item_ids_list
有3个元素,这将是 ['%s', '%s', '%s']
。
', '.join(...)
: 将列表中的元素用逗号和空格连接起来,得到如 "%s, %s, %s"
的字符串。
query = f"SELECT ... WHERE item_id IN ({placeholders})"
:
这里使用 f-string 将我们自己安全生成的 placeholders
字符串嵌入到SQL模板中是安全的,因为 placeholders
只包含 ,
和 %s
,不包含任何来自外部的、可能有害的数据。
cursor.execute(query, tuple(item_ids_list))
:
将原始的 item_ids_list
(转换为元组)作为参数传递。连接器会将这个元组中的每个值依次绑定到SQL模板中的每个 %s
。
企业级考量 (IN
子句):
参数数量限制: MySQL(以及大多数数据库)对单个SQL语句中可以使用的参数数量(或 IN
列表中的元素数量)有限制。这个限制通常很大(数千个),但如果 item_ids_list
可能包含成千上万个ID,直接构建一个巨大的 IN
子句可能会达到这个限制或导致性能问题(解析长SQL、优化器处理等)。
MySQL 的 max_allowed_packet
也会影响能发送的SQL语句的大小。
对于非常大的ID列表,考虑分批处理,或者将ID列表插入到一个临时表中,然后使用 JOIN
进行查询。
# 伪代码:使用临时表处理大量ID
# def get_items_by_large_ids_via_temp_table(connection, large_id_list):
# with connection.cursor() as cursor:
# # 1. 创建一个临时表 (会话级别,连接关闭后自动删除)
# cursor.execute("CREATE TEMPORARY TABLE temp_ids (id INT PRIMARY KEY)")
# # 2. 将ID批量插入临时表
# id_tuples = [(id_val,) for id_val in large_id_list] # [(1,), (2,), ...]
# cursor.executemany("INSERT INTO temp_ids (id) VALUES (%s)", id_tuples)
# connection.commit() # 确保临时表数据写入 (如果需要跨多个语句或事务阶段)
#
# # 3. JOIN 查询
# query = "SELECT i.item_id, i.item_name FROM items i JOIN temp_ids t ON i.item_id = t.id"
# cursor.execute(query)
# results = cursor.fetchall()
# # 4. 临时表会自动清理,或显式 DROP TEMPORARY TABLE temp_ids;
# return results
性能: 巨大的 IN
列表也可能导致查询优化器选择次优的执行计划。使用临时表 JOIN
对于非常大的列表通常性能更稳定。
5.6 其他需要注意的参数化场景
ORDER BY
和 GROUP BY
子句:
不能直接参数化列名: ORDER BY %s
或 GROUP BY %s
通常是不安全或不工作的。数据库驱动程序会将 %s
替换为一个字符串字面量,例如 ORDER BY 'column_name'
,这通常会导致按常量字符串排序(无实际效果)或语法错误。
安全处理动态排序/分组:
白名单验证: 如果排序列名或分组列名来自用户输入,必须将其与一个预定义的、安全的列名列表(白名单)进行比较。只有当输入的列名在白名单中时,才将其直接(但安全地,例如确保没有注入)拼接到SQL语句中。
映射: 将用户输入映射到实际的列名。
def get_sorted_employees_safely(connection, sort_column_input, sort_order_input="ASC"):
"""安全地按动态列排序员工。"""
print(f"
--- 安全动态排序: 列='{
sort_column_input}', 顺序='{
sort_order_input}' ---")
# 1. 白名单验证排序列
allowed_sort_columns = {
"first_name", "last_name", "hire_date", "salary"}
if sort_column_input not in allowed_sort_columns:
print(f" 错误: 无效的排序列 '{
sort_column_input}'。将使用默认排序。")
sort_column_sql = "emp_no" # 默认排序列
else:
sort_column_sql = sort_column_input # 确认是安全的列名
# 2. 白名单验证排序顺序
sort_order_sql = "ASC" # 默认升序
if sort_order_input.upper() == "DESC":
sort_order_sql = "DESC"
try:
with connection.cursor(dictionary=True) as cursor:
# 这里拼接 sort_column_sql 和 sort_order_sql 是安全的,因为它们已经过白名单验证
query = f"SELECT emp_no, first_name, last_name, hire_date, salary FROM employees ORDER BY {
sort_column_sql} {
sort_order_sql} LIMIT 5"
print(f" 执行SQL: {
query}")
cursor.execute(query) # 注意:这里没有外部参数了,因为列名和顺序已安全嵌入
for row in cursor:
print(f" {
row}")
except Exception as e:
print(f" 动态排序查询失败: {
e}")
# if __name__ == "__main__":
# # 假设 pymysql_conn 已连接且 employees 表有数据
# if pymysql_conn:
# get_sorted_employees_safely(pymysql_conn, "salary", "DESC")
# get_sorted_employees_safely(pymysql_conn, "last_name")
# get_sorted_employees_safely(pymysql_conn, "invalid_column", "ASC") # 测试无效列名
# get_sorted_employees_safely(pymysql_conn, "hire_date", "WRONG_ORDER") # 测试无效顺序
# if pymysql_conn: pymysql_conn.close()
LIMIT
和 OFFSET
子句:
LIMIT %s
和 OFFSET %s
(或 LIMIT %s, %s
) 是可以安全参数化的,因为这些子句期望的是数字。
def get_paginated_employees(connection, page_number, page_size):
"""获取分页的员工数据。"""
offset = (page_number - 1) * page_size
print(f"
--- 分页查询: 第 {
page_number} 页, 每页 {
page_size} 条 (OFFSET {
offset}) ---")
try:
with connection.cursor(dictionary=True) as cursor:
query = "SELECT emp_no, first_name, last_name FROM employees ORDER BY emp_no LIMIT %s OFFSET %s"
# 或者 LIMIT %s, %s (如果是两个参数的LIMIT形式,则 (offset, page_size))
# 这里使用标准的 LIMIT count OFFSET offset
cursor.execute(query, (page_size, offset)) # 参数顺序: 先是LIMIT的数量,然后是OFFSET
for row in cursor:
print(f" {
row}")
except Exception as e:
print(f" 分页查询失败: {
e}")
# if __name__ == "__main__":
# if pymysql_conn:
# get_paginated_employees(pymysql_conn, 1, 2) # 第1页,每页2条
# get_paginated_employees(pymysql_conn, 2, 2) # 第2页,每页2条
# if pymysql_conn: pymysql_conn.close()
表名和列名:
不能参数化表名或列名。例如 SELECT %s FROM %s WHERE ...
是行不通的。占位符是为数据值准备的,不是为SQL结构(如表名、列名、数据库名、SQL关键字)准备的。
如果需要动态指定表名或列名(例如,在ORM或元数据驱动的工具中),必须:
严格白名单验证: 确保传入的表名/列名是预期的、合法的、已知的。
转义/引用标识符: 如果表名或列名可能包含特殊字符或与SQL关键字冲突,需要使用数据库特定的标识符引用字符(MySQL中是反引号 `
)。Python连接器通常不自动为表名/列名执行此操作。
# 不安全的动态表名 (仅用于说明,不推荐)
# table_name_input = "users" # 或恶意输入 "users; DROP DATABASE students; --"
# query = f"SELECT * FROM `{table_name_input}`" # 即使用反引号,如果输入是恶意的,仍可能出问题
# 更安全的方式:白名单
def query_dynamic_table_safely(connection, table_key, column_key, filter_value):
allowed_tables = {
"users_main": "users", "audit": "audit_log"}
allowed_columns = {
"users_main": {
"id", "name", "email"}, "audit": {
"log_id", "action", "timestamp"}}
if table_key not in allowed_tables or column_key not in allowed_columns.get(table_key, set()):
print("错误: 无效的表名或列名。")
return
actual_table_name = allowed_tables[table_key]
actual_column_name = column_key # 已经是安全的了
# 注意:这里仍然是字符串格式化,但因为表名和列名来自白名单,所以风险受控。
# 参数化仍然用于 WHERE 子句的值。
query = f"SELECT * FROM `{
actual_table_name}` WHERE `{
actual_column_name}` = %s"
print(f" 安全动态查询: {
query} (参数: {
filter_value})")
# ... (执行 cursor.execute(query, (filter_value,))) ...
参数化查询是Python与MySQL安全交互的绝对核心。深刻理解其原理、正确使用不同场景下的参数化方法,是构建健壮、安全的企业级应用的基础。
六、Python 中使用 asyncio
与 aiomysql
进行异步 MySQL 操作 (Asynchronous MySQL Operations with asyncio
and aiomysql
in Python)
在现代 Web 应用和网络服务中,I/O 操作(如数据库查询、API 调用、文件读写)往往是性能瓶颈。传统的同步编程模型下,当一个 I/O 操作发生时,整个线程会被阻塞,直到操作完成。这意味着在等待数据库返回结果时,CPU 资源被闲置,无法处理其他请求,在高并发场景下会导致严重的性能问题。
异步编程,特别是 Python 中的 asyncio
框架,提供了一种解决方案。它允许程序在等待一个 I/O 操作完成时,切换去执行其他任务,而不是完全阻塞。当 I/O 操作准备就绪时,程序可以回来处理结果。这种协作式多任务处理方式能够显著提高应用的吞吐量和响应能力。
aiomysql
是一个流行的 Python 库,它为 PyMySQL
提供了一个异步接口,使其能够与 asyncio
无缝集成,从而实现对 MySQL 数据库的异步访问。
6.1. asyncio
基础回顾 (A Quick Recap of asyncio
Fundamentals)
在深入 aiomysql
之前,简要回顾一下 asyncio
的核心概念至关重要:
事件循环 (Event Loop): asyncio
的核心,负责调度和执行异步任务、处理 I/O 事件和回调。
协程 (Coroutines): 使用 async def
定义的特殊函数。协程可以暂停执行并将控制权交还给事件循环,以便在等待 I/O 操作时执行其他任务。使用 await
关键字来调用另一个协程或等待一个 Future
对象。
Future 和 Task: Future
对象代表一个最终会产生结果的异步操作。Task
是 Future
的一个子类,用于在事件循环中调度和运行协程。
async
/await
语法: async
用于声明一个协程函数,await
用于暂停协程的执行,等待一个 awaitable
对象(如另一个协程、Future
或 Task
)完成。
6.2. 安装 aiomysql
(Installing aiomysql
)
与安装其他 Python 包类似,可以使用 pip 来安装 aiomysql
:
pip install aiomysql
这个命令会下载并安装 aiomysql
及其依赖项(通常包括 PyMySQL
)。
6.3. 建立异步连接 (Establishing Asynchronous Connections)
使用 aiomysql
连接到 MySQL 数据库与使用同步库(如 PyMySQL
或 mysql-connector-python
)在概念上相似,但操作是异步的。
6.3.1. 单个异步连接
import asyncio
import aiomysql
# 定义一个异步函数来创建和测试连接
async def connect_to_mysql():
"""
这个异步函数演示了如何使用 aiomysql 创建一个到 MySQL 数据库的异步连接。
"""
conn = None # 初始化连接变量,以便在 finally 块中可用
try:
# 使用 aiomysql.connect() 异步创建连接
# 注意这里的 await 关键字,表示这是一个异步操作,需要等待其完成
conn = await aiomysql.connect(
host='127.0.0.1', # MySQL 服务器主机地址
port=3306, # MySQL 服务器端口号
user='your_user', # MySQL 用户名
password='your_password', # MySQL 密码
db='your_database', # 要连接的数据库名称
loop=asyncio.get_event_loop() # 可选:显式传递事件循环,通常aiomysql会自动获取
)
print("成功通过 aiomysql 异步连接到 MySQL!")
# 获取一个异步游标对象
# 注意:创建游标本身不是异步操作,但后续使用游标执行SQL是异步的
cur = await conn.cursor()
# 执行一个简单的异步查询
await cur.execute("SELECT VERSION();") # await 用于等待数据库执行查询
version_result = await cur.fetchone() # await 用于等待获取查询结果
print(f"MySQL 版本: {
version_result[0]}")
# 关闭游标
await cur.close() # 关闭游标也是一个异步操作(尽管在aiomysql中通常是瞬时的)
except aiomysql.Error as e:
print(f"aiomysql 连接或操作错误: {
e}")
except Exception as e:
print(f"发生了预料之外的错误: {
e}")
finally:
if conn:
# 关闭连接
conn.close() # 关闭连接。在aiomysql中,close() 本身不是协程,但它会确保异步资源被清理
print("aiomysql 连接已关闭。")
# 主异步函数
async def main():
"""
主异步函数,用于运行 connect_to_mysql 协程。
"""
await connect_to_mysql()
# 运行主异步函数
if __name__ == "__main__":
# 获取事件循环并运行 main() 协程直到完成
asyncio.run(main())
代码解释:
import asyncio
和 import aiomysql
: 导入必要的库。
async def connect_to_mysql():
: 定义一个协程函数。
conn = await aiomysql.connect(...)
: 这是建立连接的核心。await
关键字表明这是一个异步调用,程序会在这里暂停,直到连接建立或发生错误,期间事件循环可以处理其他任务。连接参数与同步库类似。
loop=asyncio.get_event_loop()
: 虽然 aiomysql
通常能自动找到当前事件循环,但显式传递有时在特定场景下是必要的或更清晰。
cur = await conn.cursor()
: 获取一个异步游标。虽然 conn.cursor()
本身可能不是一个耗时的异步操作,但 aiomysql
的设计要求使用 await
来获取游标,以保持 API 的一致性,并为未来可能的异步游标创建逻辑做准备。
await cur.execute("SELECT VERSION();")
: 异步执行 SQL 查询。
version_result = await cur.fetchone()
: 异步获取查询结果。
await cur.close()
: 异步关闭游标。
conn.close()
: 关闭连接。在 aiomysql
中,conn.close()
方法本身不是一个协程(即不需要 await
),但它会触发底层的清理工作,确保所有与连接相关的异步资源得到妥善释放。
async def main()
: 封装了我们的主要异步逻辑。
asyncio.run(main())
: Python 3.7+ 中运行顶层异步程序的标准方式。它负责创建事件循环、运行传入的协程直到完成,并管理事件循环的生命周期。
这个例子展示了最基本的异步连接和查询流程。在真实的企业应用中,直接为每个请求创建和关闭连接效率低下。因此,连接池是必不可少的。
6.3.2. 使用异步连接池 (aiomysql.create_pool
)
连接池预先创建并维护一定数量的数据库连接,应用程序可以按需从池中获取连接,使用完毕后归还给池,而不是频繁地创建和销毁连接。这大大减少了连接建立的开销,提高了性能和资源利用率。
aiomysql
提供了 create_pool
函数来创建异步连接池。
import asyncio
import aiomysql
# 数据库配置
DB_CONFIG = {
'host': '127.0.0.1',
'port': 3306,
'user': 'your_user',
'password': 'your_password',
'db': 'your_database',
}
# 全局变量,用于存储连接池
# 在实际应用中,通常在应用启动时初始化
db_pool = None
async def init_db_pool():
"""
异步初始化 MySQL 连接池。
这个函数应该在应用程序启动时调用一次。
"""
global db_pool
if db_pool is None:
try:
# 使用 aiomysql.create_pool() 异步创建连接池
# minsize: 池中保持的最小空闲连接数
# maxsize: 池中允许的最大连接数
# loop: 事件循环
# autocommit: 设置为 True 表示每个SQL语句执行后自动提交,默认为 False
db_pool = await aiomysql.create_pool(
minsize=5,
maxsize=20,
**DB_CONFIG, # 将字典解包作为关键字参数传递
loop=asyncio.get_event_loop(),
autocommit=True # 示例:启用自动提交,生产环境根据需求调整
)
print("aiomysql 连接池初始化成功。")
print(f"连接池大小: min={
db_pool.minsize}, max={
db_pool.maxsize}, current_size={
db_pool.size}, free={
db_pool.freesize}")
except Exception as e:
print(f"初始化 aiomysql 连接池失败: {
e}")
# 在实际应用中,这里可能需要更健壮的错误处理,例如重试或退出应用
db_pool = None # 确保如果失败,db_pool 仍然是 None
async def close_db_pool():
"""
异步关闭 MySQL 连接池。
这个函数应该在应用程序关闭时调用。
"""
global db_pool
if db_pool:
db_pool.close() # 关闭池,释放所有连接的请求会开始
await db_pool.wait_closed() # 等待所有连接真正关闭
print("aiomysql 连接池已关闭。")
db_pool = None
async def execute_query_from_pool(query: str, args=None):
"""
从连接池获取连接并异步执行查询。
参数:
query (str): 要执行的 SQL 查询语句。
args (tuple, optional): 查询参数 (用于参数化查询)。默认为 None。
返回:
查询结果 (如果适用),否则为 None。
"""
if db_pool is None:
print("错误:数据库连接池未初始化。")
return None
# 使用 async with 语句从连接池获取连接
# 当离开 async with 块时,连接会自动归还给池
async with db_pool.acquire() as conn:
# 从获取的连接创建异步游标
# 同样使用 async with 来确保游标被正确关闭
async with conn.cursor(aiomysql.DictCursor) as cur: # 使用 DictCursor 以字典形式获取结果
try:
# 异步执行查询
await cur.execute(query, args)
print(f"查询 '{
query}' (参数: {
args}) 执行成功。")
# 根据查询类型决定是否获取结果
if query.strip().upper().startswith("SELECT"):
# 如果是 SELECT 查询,获取所有结果
result = await cur.fetchall()
return result
else:
# 对于 INSERT, UPDATE, DELETE 等操作,可以获取影响的行数
affected_rows = cur.rowcount
print(f"影响行数: {
affected_rows}")
# 对于非 SELECT 查询,通常不需要显式提交,因为我们在池创建时设置了 autocommit=True
# 如果 autocommit=False,则需要在此处调用 await conn.commit()
return affected_rows # 或者返回 True 表示成功
except aiomysql.Error as e:
print(f"数据库操作错误: {
e}")
# 如果 autocommit=False 且发生错误,可能需要 await conn.rollback()
return None
except Exception as e:
print(f"执行查询时发生意外错误: {
e}")
return None
async def worker_task(task_id: int):
"""
一个模拟的工作任务,它从连接池执行查询。
"""
print(f"任务 {
task_id} 开始执行...")
# 示例:插入数据
insert_query = "INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);"
employee_name = f"员工_{
task_id}"
department = "技术部" if task_id % 2 == 0 else "市场部"
salary = 5000 + task_id * 100
await execute_query_from_pool(insert_query, (employee_name, department, salary))
# 示例:查询数据
select_query = "SELECT name, salary FROM employees WHERE department = %s ORDER BY salary DESC LIMIT %s;"
target_department = "技术部"
limit = 3
results = await execute_query_from_pool(select_query, (target_department, limit))
if results:
print(f"任务 {
task_id} 查询到部门 '{
target_department}' 薪资最高的 {
limit} 名员工:")
for row in results:
print(f" - 姓名: {
row['name']}, 薪资: {
row['salary']}")
print(f"任务 {
task_id} 执行完毕。")
print(f"当前连接池状态: size={
db_pool.size}, free={
db_pool.freesize}")
async def main_with_pool():
"""
演示使用连接池的主异步函数。
"""
# 确保表存在 (仅为演示目的,实际应用中表结构应提前管理好)
await init_db_pool() # 在所有操作之前初始化连接池
if not db_pool:
print("无法初始化连接池,程序退出。")
return
# 模拟一个简单的表结构
create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_name (name)
) ENGINE=InnoDB;
"""
await execute_query_from_pool(create_table_query)
print("员工表 'employees' 已确保存在或创建。")
# 创建并运行多个并发任务
# 这些任务将共享同一个连接池
num_tasks = 10
tasks = [worker_task(i) for i in range(1, num_tasks + 1)]
await asyncio.gather(*tasks) # 并发运行所有 worker_task
# 所有任务完成后,关闭连接池
await close_db_pool()
if __name__ == "__main__":
asyncio.run(main_with_pool())
代码解释:
DB_CONFIG
: 将数据库连接参数存储在字典中,方便管理。
db_pool = None
: 全局变量,用于持有连接池对象。
async def init_db_pool()
:
global db_pool
: 声明 db_pool
为全局变量。
db_pool = await aiomysql.create_pool(...)
: 异步创建连接池。
minsize
: 池中维护的最小空闲连接数。池会尝试保持至少这么多的连接可用。
maxsize
: 池中允许的最大连接数。当所有连接都在使用中且达到此数量时,新的获取连接请求可能需要等待。
**DB_CONFIG
: 将 DB_CONFIG
字典解包,传递主机、用户、密码等参数。
loop
: 传递事件循环。
autocommit=True
: 设置为 True
后,每个 SQL 语句执行后会自动提交,无需显式调用 conn.commit()
。这简化了代码,但对于需要精细事务控制的场景,应设为 False
。
打印连接池状态信息:db_pool.size
(当前池中总连接数), db_pool.freesize
(当前池中空闲连接数)。
async def close_db_pool()
:
db_pool.close()
: 开始关闭连接池的过程。此调用会立即返回,但池不会立即关闭,它会等待所有已获取的连接被归还。
await db_pool.wait_closed()
: 异步等待,直到池中的所有连接都已实际关闭。
async def execute_query_from_pool(query: str, args=None)
:
async with db_pool.acquire() as conn:
: 这是从池中获取连接的关键。db_pool.acquire()
是一个协程,返回一个连接对象。async with
语句确保在代码块结束时(无论正常结束还是发生异常),连接都会通过 conn.release()
(async with
自动处理)归还给池。
async with conn.cursor(aiomysql.DictCursor) as cur:
: 从获取的连接创建游标。aiomysql.DictCursor
使查询结果以字典形式返回(列名为键),更易于使用。同样,async with
确保游标在使用完毕后自动关闭 (await cur.close()
)。
await cur.execute(query, args)
: 异步执行参数化的查询。
await cur.fetchall()
: 如果是 SELECT
查询,异步获取所有结果。
cur.rowcount
: 对于 INSERT
, UPDATE
, DELETE
等操作,可以通过 cur.rowcount
获取受影响的行数。
事务注意: 如果 autocommit=False
(在 create_pool
时设置),则在执行修改数据的操作后,需要显式调用 await conn.commit()
来提交事务,或在发生错误时调用 await conn.rollback()
来回滚。
async def worker_task(task_id: int)
: 模拟一个并发执行的任务单元。它会调用 execute_query_from_pool
来执行数据库操作。这展示了多个异步任务如何共享同一个连接池。
async def main_with_pool()
:
await init_db_pool()
: 在程序开始时初始化连接池。
create_table_query
: 一个创建示例表的 DDL 语句。
tasks = [worker_task(i) for i in range(1, num_tasks + 1)]
: 创建多个 worker_task
协程。
await asyncio.gather(*tasks)
: 并发运行列表中的所有协程。asyncio.gather
会等待所有任务完成后再继续。
await close_db_pool()
: 在程序结束前关闭连接池。
if __name__ == "__main__": asyncio.run(main_with_pool())
: 运行主程序。
这个连接池的例子更接近真实应用。连接池的参数(minsize
, maxsize
, timeout
等)需要根据应用的具体负载和 MySQL 服务器的能力进行仔细调优。
6.4. 执行异步查询 (Executing Asynchronous Queries)
一旦获得了异步游标 (cur
),执行查询的方式与同步库非常相似,主要区别在于每个可能引起 I/O 等待的操作都需要 await
。
6.4.1. 执行 SELECT
查询并获取结果
import asyncio
import aiomysql
# (假设 DB_CONFIG 和 db_pool 已按前例设置和初始化)
# (假设 employees 表已创建并有一些数据)
async def fetch_employee_data(department_name: str, min_salary: float):
"""
异步查询指定部门和最低薪资的员工信息。
"""
if db_pool is None:
print("错误:数据库连接池未初始化。")
return []
query = """
SELECT id, name, department, salary, hire_date
FROM employees
WHERE department = %s AND salary >= %s
ORDER BY salary DESC;
"""
args = (department_name, min_salary)
async with db_pool.acquire() as conn:
# 使用 SSCursor (Server-Side Cursor) 来处理可能的大结果集,逐行流式获取
# 注意:aiomysql 的 SSCursor 行为可能与 mysql-connector-python 的不完全一致,
# 并且对于非常大的结果集,仍然需要小心内存管理。
# 对于普通大小的结果集,aiomysql.Cursor 或 aiomysql.DictCursor 通常足够。
# 这里我们为了演示多样性,使用 aiomysql.DictCursor,它更常用。
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
print(f"执行查询: {
query} 参数: {
args}")
await cur.execute(query, args)
# 获取单行结果
# first_employee = await cur.fetchone()
# if first_employee:
# print(f"第一位符合条件的员工: {first_employee}")
# 获取所有结果
employees = await cur.fetchall()
print(f"查询到 {
len(employees)} 位符合条件的员工。")
# for emp in employees:
# print(f" ID: {emp['id']}, 姓名: {emp['name']}, 部门: {emp['department']}, 薪资: {emp['salary']}")
return employees
# 或者,逐行处理 (如果结果集非常大,这可能更优)
# print("逐行处理结果:")
# results = []
# async for row in cur: # aiomysql 的游标支持异步迭代
# print(f" 处理员工: {row['name']}")
# results.append(row)
# return results
except aiomysql.Error as e:
print(f"查询员工数据时出错: {
e}")
return []
except Exception as e:
print(f"意外错误: {
e}")
return []
async def main_select_example():
await init_db_pool()
if not db_pool: return
# 创建一些示例数据
await execute_query_from_pool(
"INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
("张三", "技术部", 12000.00)
)
await execute_query_from_pool(
"INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
("李四", "技术部", 15000.00)
)
await execute_query_from_pool(
"INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
("王五", "市场部", 11000.00)
)
await execute_query_from_pool(
"INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
("赵六", "技术部", 9000.00)
)
print("
--- 查询技术部薪资大于等于10000的员工 ---")
tech_employees = await fetch_employee_data("技术部", 10000.00)
for emp in tech_employees:
print(f" -> 姓名: {
emp['name']}, 薪资: {
emp['salary']}")
print("
--- 查询市场部薪资大于等于5000的员工 ---")
market_employees = await fetch_employee_data("市场部", 5000.00)
for emp in market_employees:
print(f" -> 姓名: {
emp['name']}, 薪资: {
emp['salary']}")
await close_db_pool()
if __name__ == "__main__":
# 运行前确保 employees 表已存在 (可复用上一个例子的 main_with_pool 中的建表逻辑)
# 简单起见,这里直接调用,但更好的做法是分离应用逻辑和一次性设置
async def setup_and_run():
await init_db_pool()
if not db_pool:
print("无法初始化连接池,程序退出。")
return
create_table_query = """
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_name (name)
) ENGINE=InnoDB;
"""
# execute_query_from_pool 内部会从池中获取连接
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(create_table_query)
print("员工表 'employees' 已确保存在或创建。")
await close_db_pool() # 关闭初始化时打开的池
# 现在运行select的例子
await main_select_example()
asyncio.run(setup_and_run())
代码解释:
fetch_employee_data
:
定义了参数化的 SELECT
查询。
使用 aiomysql.DictCursor
,这样 cur.fetchall()
返回的是字典列表,cur.fetchone()
返回单个字典。
await cur.execute(query, args)
: 异步执行。
await cur.fetchall()
: 异步获取所有匹配的行。
异步迭代: aiomysql
的游标对象实现了异步迭代协议 (__aiter__
和 __anext__
),因此可以使用 async for row in cur:
来逐行处理结果。这对于非常大的结果集特别有用,可以避免一次性将所有数据加载到内存中。上面的代码中注释掉了这部分,但这是一个重要的特性。
main_select_example
:
调用 init_db_pool()
初始化连接池。
插入一些示例数据以便查询。这里复用了之前定义的 execute_query_from_pool
函数,但需要注意它内部的 print
语句。
调用 fetch_employee_data
执行查询并打印结果。
调用 close_db_pool()
关闭连接池。
setup_and_run
和 if __name__ == "__main__":
:
由于 fetch_employee_data
依赖于 init_db_pool
和 close_db_pool
,并且可能依赖于 employees
表的存在,setup_and_run
协程用于协调这些步骤。它首先确保表存在,然后运行主要的 SELECT
示例。
注意,init_db_pool
和 close_db_pool
被多次调用。在更复杂的应用中,这些通常是应用生命周期的一部分,init_db_pool
在启动时调用一次,close_db_pool
在关闭时调用一次。
获取结果的方法:
await cur.fetchone()
: 获取下一行结果。如果没有更多行,则返回 None
。
await cur.fetchall()
: 获取所有剩余的行。返回一个元组的元组 (默认游标) 或字典列表 (DictCursor
)。如果结果集为空,返回一个空元组/列表。
await cur.fetchmany(size=cur.arraysize)
: 获取指定数量的行。arraysize
是游标的一个属性,默认为1。
async for row in cur:
: 通过异步迭代逐行获取。
6.4.2. 执行 INSERT
, UPDATE
, DELETE
操作
这些操作的执行方式与 SELECT
类似,但通常我们更关心的是操作是否成功以及影响了多少行。如果连接池或连接未设置为 autocommit=True
,则需要显式提交事务。
import asyncio
import aiomysql
from datetime import datetime
# (假设 DB_CONFIG, db_pool, init_db_pool, close_db_pool 已定义)
# (假设 employees 表已创建)
async def add_new_employee(name: str, department: str, salary: float, hire_date: datetime = None):
"""
异步向 employees 表插入一条新员工记录。
如果连接池的 autocommit=False,则需要显式事务处理。
为了演示,假设我们的连接池配置为 autocommit=True。
"""
if db_pool is None:
print("错误:数据库连接池未初始化。")
return None
query = """
INSERT INTO employees (name, department, salary, hire_date)
VALUES (%s, %s, %s, %s);
"""
# 如果 hire_date 未提供,则使用数据库的默认值 (CURRENT_TIMESTAMP) 或 Python 生成的当前时间
# 为了能插入 NULL 以便让数据库使用默认值,需要传递 None
effective_hire_date = hire_date if hire_date else datetime.now() # 或者直接传 None
args = (name, department, salary, effective_hire_date)
async with db_pool.acquire() as conn:
async with conn.cursor() as cur: # 不需要 DictCursor,因为 INSERT 不返回复杂结构
try:
await cur.execute(query, args)
# 如果 autocommit=False (在 create_pool 中设置), 需要在这里提交:
# await conn.commit()
# print("事务已提交。")
inserted_id = cur.lastrowid # 获取最后插入行的自增 ID
affected_rows = cur.rowcount # 获取影响的行数 (对于 INSERT 通常是 1)
print(f"成功插入员工: {
name}, ID: {
inserted_id}, 影响行数: {
affected_rows}")
return inserted_id
except aiomysql.IntegrityError as e: # 例如,违反唯一约束
print(f"插入员工 '{
name}' 失败 (数据完整性错误): {
e}")
# 如果 autocommit=False, 可能需要回滚:
# await conn.rollback()
# print("事务已回滚。")
return None
except aiomysql.Error as e:
print(f"插入员工 '{
name}' 时数据库操作错误: {
e}")
# if conn and not conn.autocommit: await conn.rollback()
return None
except Exception as e:
print(f"插入员工 '{
name}' 时发生意外错误: {
e}")
# if conn and not conn.autocommit: await conn.rollback()
return None
async def update_employee_salary(name: str, new_salary: float):
"""
异步更新指定员工的薪资。
"""
if db_pool is None: return 0
query = "UPDATE employees SET salary = %s WHERE name = %s;"
args = (new_salary, name)
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(query, args)
# if not conn.autocommit: await conn.commit()
affected_rows = cur.rowcount
print(f"更新员工 '{
name}' 薪资为 {
new_salary}。影响行数: {
affected_rows}")
return affected_rows
except Exception as e:
print(f"更新员工 '{
name}' 薪资时发生错误: {
e}")
# if conn and not conn.autocommit: await conn.rollback()
return 0
async def delete_employee(name: str):
"""
异步删除指定员工。
"""
if db_pool is None: return 0
query = "DELETE FROM employees WHERE name = %s;"
args = (name,)
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(query, args)
# if not conn.autocommit: await conn.rollback()
affected_rows = cur.rowcount
print(f"删除员工 '{
name}'。影响行数: {
affected_rows}")
return affected_rows
except Exception as e:
print(f"删除员工 '{
name}' 时发生错误: {
e}")
# if conn and not conn.autocommit: await conn.rollback()
return 0
async def main_dml_example():
# 这里假设 init_db_pool 和 close_db_pool 的定义与之前连接池示例一致
# 并且连接池创建时 autocommit=True
await init_db_pool()
if not db_pool: return
# 清理并创建表 (确保从干净状态开始)
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("DROP TABLE IF EXISTS employees;")
await cur.execute("""
CREATE TABLE employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
""")
print("员工表 'employees' 已重建。")
# 添加新员工
emp_id1 = await add_new_employee("刘备", "管理层", 25000.00, datetime(2023, 1, 10))
emp_id2 = await add_new_employee("关羽", "战斗部", 22000.00) # 使用默认 hire_date
emp_id3 = await add_new_employee("张飞", "战斗部", 21000.00)
# 尝试添加一个同名员工 (应失败,因为 name 字段有唯一约束)
await add_new_employee("刘备", "后勤部", 10000.00)
# 更新员工薪资
await update_employee_salary("关羽", 23000.00)
await update_employee_salary("诸葛亮", 30000.00) # 尝试更新不存在的员工
# 查询当前所有员工
print("
--- 当前员工列表 ---")
all_employees = await execute_query_from_pool("SELECT name, department, salary, DATE_FORMAT(hire_date, '%Y-%m-%d') as formatted_hire_date FROM employees;") # execute_query_from_pool 来自之前的例子
if all_employees:
for emp in all_employees:
print(f" {
emp}")
# 删除员工
await delete_employee("张飞")
await delete_employee("孙权") # 尝试删除不存在的员工
print("
--- 删除张飞后的员工列表 ---")
remaining_employees = await execute_query_from_pool("SELECT name, department, salary FROM employees;")
if remaining_employees:
for emp in remaining_employees:
print(f" {
emp}")
await close_db_pool()
if __name__ == "__main__":
# 为了能复用 execute_query_from_pool,我们需要其定义
# 在实际项目中,这些函数会组织在模块中
DB_CONFIG = {
'host': '127.0.0.1', 'port': 3306, 'user': 'your_user',
'password': 'your_password', 'db': 'your_database',
}
db_pool = None
async def init_db_pool_local(): # 避免与全局冲突,或确保全局db_pool被此函数设置
global db_pool
if db_pool is None:
db_pool = await aiomysql.create_pool(
minsize=2, maxsize=5, **DB_CONFIG,
loop=asyncio.get_event_loop(), autocommit=True # 关键:设置为True
)
print("本地DML示例连接池初始化成功 (autocommit=True)。")
async def close_db_pool_local():
global db_pool
if db_pool:
db_pool.close()
await db_pool.wait_closed()
print("本地DML示例连接池已关闭。")
db_pool = None # 重置,以便下次运行可以重新初始化
# 重新定义 execute_query_from_pool 以便此脚本独立运行
async def execute_query_from_pool(query: str, args=None):
if db_pool is None: return None
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query, args)
if query.strip().upper().startswith("SELECT"):
return await cur.fetchall()
else:
return cur.rowcount
# 更新 main_dml_example 以使用本地化的池管理函数
async def main_dml_example_runner():
await init_db_pool_local()
if not db_pool:
print("连接池未能初始化 (DML)。")
return
try:
await main_dml_example_content() # 将原 main_dml_example 逻辑放入新函数
finally:
await close_db_pool_local()
async def main_dml_example_content(): # 原 main_dml_example 的内容
async with db_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("DROP TABLE IF EXISTS employees;")
await cur.execute("""
CREATE TABLE employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
""")
print("员工表 'employees' 已重建。")
emp_id1 = await add_new_employee("刘备", "管理层", 25000.00, datetime(2023, 1, 10))
emp_id2 = await add_new_employee("关羽", "战斗部", 22000.00)
emp_id3 = await add_new_employee("张飞", "战斗部", 21000.00)
await add_new_employee("刘备", "后勤部", 10000.00)
await update_employee_salary("关羽", 23000.00)
await update_employee_salary("诸葛亮", 30000.00)
print("
--- 当前员工列表 ---")
all_employees = await execute_query_from_pool("SELECT name, department, salary, DATE_FORMAT(hire_date, '%Y-%m-%d') as formatted_hire_date FROM employees;")
if all_employees:
for emp in all_employees: print(f" {
emp}")
await delete_employee("张飞")
await delete_employee("孙权")
print("
--- 删除张飞后的员工列表 ---")
remaining_employees = await execute_query_from_pool("SELECT name, department, salary FROM employees;")
if remaining_employees:
for emp in remaining_employees: print(f" {
emp}")
# 将 init_db_pool 和 close_db_pool 的调用移到 main_dml_example_runner 内部
# 并重命名 main_dml_example 为 main_dml_example_content
# 这是为了确保池的正确生命周期管理在这个特定示例中。
# 在实际应用中,池的生命周期通常与整个应用的生命周期绑定。
asyncio.run(main_dml_example_runner())
代码解释:
add_new_employee
:
cur.lastrowid
: 在执行 INSERT
语句后,如果表有自增主键,可以通过 cur.lastrowid
获取刚插入行的 ID。
cur.rowcount
: 对于 INSERT
, UPDATE
, DELETE
,cur.rowcount
返回受操作影响的行数。
错误处理: 捕捉 aiomysql.IntegrityError
可以处理像唯一键冲突这样的特定数据库错误。
事务: 注释中提到了如果 autocommit=False
,则需要 await conn.commit()
和 await conn.rollback()
。由于我们的连接池示例设置了 autocommit=True
,所以这里不需要显式调用。
update_employee_salary
和 delete_employee
: 逻辑与 add_new_employee
类似,执行相应的 SQL 语句并返回影响的行数。
main_dml_example_content
:
演示了 DML 操作的完整流程:重建表、插入数据(包括一次预期的失败插入)、更新数据(包括一次对不存在记录的更新尝试)、查询数据、删除数据(包括一次对不存在记录的删除尝试)。
DATE_FORMAT(hire_date, '%Y-%m-%d')
: 在 SQL 查询中格式化日期,使其更易读。
独立运行的调整:
为了使 main_dml_example
部分能够独立运行并管理自己的连接池,添加了 init_db_pool_local
、close_db_pool_local
和 main_dml_example_runner
。这是为了演示的便利性,实际项目中,连接池管理通常是更集中的。
autocommit=True
是关键,它简化了单个 DML 操作,因为每个成功的 execute
都会被自动提交。
6.4.3. 参数化查询 (Parameterized Queries)
如前面的例子所示,aiomysql
(继承自 PyMySQL
) 使用 %s
作为占位符进行参数化查询。参数以元组或列表的形式作为第二个参数传递给 cur.execute()
。
# ... (在协程函数内部,已获取 cur)
user_input_name = "危险的' OR '1'='1" # 一个潜在的SQL注入尝试
user_input_salary = 1000.0
# 安全的方式:使用参数化查询
query = "SELECT * FROM employees WHERE name = %s AND salary > %s;"
args = (user_input_name, user_input_salary)
await cur.execute(query, args) # aiomysql/PyMySQL 会正确处理特殊字符,防止注入
# 不安全的方式:字符串拼接 (绝对禁止!)
# unsafe_query = f"SELECT * FROM employees WHERE name = '{user_input_name}' AND salary > {user_input_salary};"
# await cur.execute(unsafe_query) # 极易受到SQL注入攻击!
始终使用参数化查询是防止 SQL 注入攻击的最重要手段。 aiomysql
底层的 PyMySQL
会负责对传入的参数进行适当的转义和处理,确保它们被当作数据值而不是 SQL 代码片段来对待。
七、aiomysql
中的异步事务管理 (Asynchronous Transaction Management in aiomysql
)
事务是一系列数据库操作,这些操作要么全部成功执行,要么在任何一步失败时全部回滚到初始状态,从而保证数据的原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability) – 即 ACID 属性。在异步环境中,事务管理需要特别注意,因为协程的并发执行可能引入新的复杂性。
7.1. 显式事务控制 (Explicit Transaction Control)
默认情况下,当你通过 aiomysql.create_pool
创建连接池或 aiomysql.connect
创建单个连接时,可以通过 autocommit
参数控制事务行为。
autocommit=True
(某些驱动或配置下的默认值,或如我们之前示例中显式设置的): 每个 SQL 语句执行后都会被视为一个独立的事务并自动提交。这简化了简单操作,但对于需要多条语句原子执行的场景则不适用。
autocommit=False
: 这是进行显式事务管理的必要设置。在此模式下,数据库操作在执行后不会立即生效,除非显式调用 commit()
。
7.1.1. 配置 autocommit=False
对于连接池:
import asyncio
import aiomysql
DB_CONFIG_NO_AUTOCOMMIT = {
'host': '127.0.0.1',
'port': 3306,
'user': 'your_user',
'password': 'your_password',
'db': 'your_database_tx', # 使用一个专门用于事务测试的数据库
}
# 全局变量,用于存储事务型连接池
tx_db_pool = None
async def init_tx_db_pool():
"""
异步初始化 MySQL 连接池,并禁用自动提交 (autocommit=False)。
"""
global tx_db_pool
if tx_db_pool is None:
try:
print("正在初始化事务型连接池 (autocommit=False)...")
tx_db_pool = await aiomysql.create_pool(
minsize=2,
maxsize=10,
**DB_CONFIG_NO_AUTOCOMMIT,
loop=asyncio.get_event_loop(),
autocommit=False # 关键:禁用自动提交以启用手动事务控制
)
print("事务型连接池 (autocommit=False) 初始化成功。")
print(f"连接池状态: min={
tx_db_pool.minsize}, max={
tx_db_pool.maxsize}")
except Exception as e:
print(f"初始化事务型连接池失败: {
e}")
tx_db_pool = None
async def close_tx_db_pool():
"""
异步关闭事务型 MySQL 连接池。
"""
global tx_db_pool
if tx_db_pool:
tx_db_pool.close()
await tx_db_pool.wait_closed()
print("事务型连接池已关闭。")
tx_db_pool = None
代码解释:
DB_CONFIG_NO_AUTOCOMMIT
: 数据库连接配置,建议为事务测试使用独立的数据库或表前缀,避免干扰其他数据。
tx_db_pool
: 用于存储配置为 autocommit=False
的连接池。
init_tx_db_pool()
:
autocommit=False
: 这是核心设置。当从这个池中获取连接后,所有的数据库操作都需要显式的 commit
或 rollback
。
close_tx_db_pool()
: 关闭连接池的辅助函数。
7.1.2. 事务操作:begin
, commit
, rollback
一旦获得了 autocommit=False
的连接,就可以使用以下协程方法来控制事务:
await conn.begin()
: 显式开始一个新的事务。虽然在 autocommit=False
模式下,第一个数据操作语句会自动开始一个事务(如果当前没有活动事务),但显式调用 await conn.begin()
是一个良好的编程习惯,它使得事务的边界更加清晰。对于某些数据库系统和驱动,它甚至是必需的。
await conn.commit()
: 提交当前事务,将自上次提交/回滚或事务开始以来所做的所有更改永久保存到数据库中。
await conn.rollback()
: 回滚当前事务,撤销自上次提交/回滚或事务开始以来所做的所有更改,使数据库恢复到事务开始前的状态。
企业场景:银行转账操作
这是一个经典的事务场景。假设我们有一个 accounts
表,包含用户账户和余额。从一个账户向另一个账户转账需要两个 UPDATE
操作:一个减少源账户余额,一个增加目标账户余额。这两个操作必须要么都成功,要么都失败。
-- 用于事务演示的表结构
CREATE TABLE IF NOT EXISTS accounts (
account_id VARCHAR(50) PRIMARY KEY,
owner_name VARCHAR(100) NOT NULL,
balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0) -- 余额不能为负
) ENGINE=InnoDB;
INSERT IGNORE INTO accounts (account_id, owner_name, balance) VALUES
('ACC001', 'Alice', 1000.00),
('ACC002', 'Bob', 500.00),
('ACC003', 'Charlie_Insufficient', 50.00);
现在,让我们用 aiomysql
实现这个转账操作,并进行详尽的事务控制:
import asyncio
import aiomysql
# --- 前面定义的 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool, init_tx_db_pool, close_tx_db_pool ---
# (确保这些已在您的环境中定义并可访问)
# DB_CONFIG_NO_AUTOCOMMIT = { 'host': '127.0.0.1', ... 'db': 'your_database_tx' }
# tx_db_pool = None
# async def init_tx_db_pool(): ...
# async def close_tx_db_pool(): ...
async def transfer_funds(from_account_id: str, to_account_id: str, amount: float):
"""
异步执行资金转账操作,包含完整的事务控制。
参数:
from_account_id (str): 转出账户ID。
to_account_id (str): 转入账户ID。
amount (float): 转账金额,必须为正数。
返回:
bool: 转账成功返回 True,否则返回 False。
"""
if tx_db_pool is None:
print("错误:事务型数据库连接池未初始化。")
return False
if amount <= 0:
print("错误:转账金额必须为正数。")
return False
if from_account_id == to_account_id:
print("错误:不能给自己转账。") # 业务逻辑校验
return False
# 从连接池获取一个连接,用于执行整个事务
# 注意:这里的连接将保持 autocommit=False 的状态
async with tx_db_pool.acquire() as conn:
# 从连接获取游标
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
# 1. 显式开始事务 (良好实践)
await conn.begin()
print(f"事务开始:从 {
from_account_id} 转账 {
amount} 到 {
to_account_id}")
# 2. 检查转出账户余额是否充足
await cur.execute(
"SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE;",
(from_account_id,)
)
# "FOR UPDATE" 会对选定的行加锁,防止其他事务在此期间修改这些行,
# 这对于防止并发条件下的余额检查和更新之间出现问题至关重要。
# 注意:不是所有场景都需要 FOR UPDATE,过度使用可能导致锁竞争。
from_account = await cur.fetchone()
if not from_account:
print(f"错误:转出账户 {
from_account_id} 不存在。")
await conn.rollback() # 回滚事务
print("事务已回滚。")
return False
if from_account['balance'] < amount:
print(f"错误:转出账户 {
from_account_id} 余额不足。当前余额: {
from_account['balance']}, 需要: {
amount}")
await conn.rollback() # 回滚事务
print("事务已回滚。")
return False
# 3. 检查转入账户是否存在 (可选,取决于业务需求)
await cur.execute(
"SELECT account_id FROM accounts WHERE account_id = %s FOR UPDATE;",
(to_account_id,)
)
# 对目标账户也加锁,如果后续操作非常依赖其状态,或者防止其被删除等。
to_account_exists = await cur.fetchone()
if not to_account_exists:
print(f"错误:转入账户 {
to_account_id} 不存在。")
await conn.rollback() # 回滚事务
print("事务已回滚。")
return False
# 4. 从转出账户扣款
update_from_query = "UPDATE accounts SET balance = balance - %s WHERE account_id = %s;"
await cur.execute(update_from_query, (amount, from_account_id))
if cur.rowcount != 1: # 确保确实有一行被更新了
print(f"错误:从账户 {
from_account_id} 扣款失败或账户状态异常。")
await conn.rollback()
print("事务已回滚 (扣款影响行数非1)。")
return False
print(f"账户 {
from_account_id} 已扣款 {
amount}。")
# --- 模拟一个可能发生的错误,以测试回滚逻辑 ---
# if to_account_id == "ACC_FAIL_TRANSFER": # 特定账户ID用于触发模拟失败
# raise Exception("模拟向目标账户增款时发生意外故障!")
# --- 模拟结束 ---
# 5. 向转入账户拨款
update_to_query = "UPDATE accounts SET balance = balance + %s WHERE account_id = %s;"
await cur.execute(update_to_query, (amount, to_account_id))
if cur.rowcount != 1: # 确保确实有一行被更新了
print(f"错误:向账户 {
to_account_id} 拨款失败或账户状态异常。")
await conn.rollback()
print("事务已回滚 (拨款影响行数非1)。")
return False
print(f"账户 {
to_account_id} 已收款 {
amount}。")
# 6. 所有操作成功,提交事务
await conn.commit()
print(f"转账成功!事务已提交。从 {
from_account_id} 转账 {
amount} 到 {
to_account_id}")
return True
except aiomysql.MySQLError as db_err: # 捕获 aiomysql 特有的数据库错误
print(f"数据库操作错误,事务回滚: {
db_err}")
if conn.connected and not conn.closed: # 检查连接状态以安全回滚
try:
await conn.rollback()
print("事务因数据库错误已回滚。")
except Exception as rb_err:
print(f"回滚事务时发生错误: {
rb_err}")
return False
except Exception as e: # 捕获其他任何Python异常
print(f"发生意外错误,事务回滚: {
e}")
if conn.connected and not conn.closed:
try:
await conn.rollback()
print("事务因意外错误已回滚。")
except Exception as rb_err:
print(f"回滚事务时发生错误: {
rb_err}")
return False
async def check_balances(*account_ids):
"""
异步查询并打印指定账户的余额。
"""
if tx_db_pool is None:
print("错误:事务型数据库连接池未初始化。")
return
async with tx_db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
# 在 autocommit=False 的连接上执行 SELECT 通常不需要显式事务
# 但如果希望读取的是已提交的数据(而不是当前未提交事务中的数据,这取决于隔离级别)
# 则此查询本身也应在事务之外,或者确保之前的事务已结束。
# 对于简单的余额检查,通常读取的是最新提交状态。
# 如果在事务中执行 SELECT,它将遵循该事务的隔离级别。
print("
--- 当前账户余额 ---")
for acc_id in account_ids:
await cur.execute("SELECT owner_name, balance FROM accounts WHERE account_id = %s;", (acc_id,))
account = await cur.fetchone()
if account:
print(f" 账户: {
acc_id} ({
account['owner_name']}), 余额: {
account['balance']}")
else:
print(f" 账户: {
acc_id} 未找到。")
async def main_transaction_example():
"""
主函数,演示事务性转账操作。
"""
# 初始化事务型连接池
await init_tx_db_pool()
if not tx_db_pool:
print("事务型连接池初始化失败,程序退出。")
return
# 准备演示表 (确保表存在且有初始数据)
async with tx_db_pool.acquire() as conn:
async with conn.cursor() as cur:
# 因为 autocommit=False, DDL 和初始数据插入也需要 commit
await conn.begin() # 开始一个事务来设置环境
await cur.execute("DROP TABLE IF EXISTS accounts;") # 清理旧表
await cur.execute("""
CREATE TABLE IF NOT EXISTS accounts (
account_id VARCHAR(50) PRIMARY KEY,
owner_name VARCHAR(100) NOT NULL,
balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0)
) ENGINE=InnoDB;
""")
print("表 'accounts' 已创建/重建。")
initial_data = [
('ACC001', 'Alice', 1000.00),
('ACC002', 'Bob', 500.00),
('ACC003', 'Charlie_Insufficient', 50.00),
('ACC004', 'David_Receiver', 200.00),
('ACC_FAIL_TRANSFER', 'Eve_Fail', 100.00) # 用于模拟失败的账户
]
for acc_id, owner, bal in initial_data:
await cur.execute(
"INSERT INTO accounts (account_id, owner_name, balance) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE balance = VALUES(balance);",
(acc_id, owner, bal)
)
await conn.commit() # 提交表结构和初始数据
print("初始账户数据已插入并提交。")
# 场景1: 成功转账
print("
--- 场景1: Alice 向 Bob 转账 100.00 ---")
await check_balances('ACC001', 'ACC002')
success1 = await transfer_funds('ACC001', 'ACC002', 100.00)
print(f"转账操作结果: {
'成功' if success1 else '失败'}")
await check_balances('ACC001', 'ACC002')
# 场景2: 余额不足导致转账失败
print("
--- 场景2: Charlie 向 David 转账 200.00 (余额不足) ---")
await check_balances('ACC003', 'ACC004')
success2 = await transfer_funds('ACC003', 'ACC004', 200.00)
print(f"转账操作结果: {
'成功' if success2 else '失败'}")
await check_balances('ACC003', 'ACC004') # 余额应保持不变
# 场景3: 转出账户不存在
print("
--- 场景3: 不存在的账户 NONEXIST001 向 Bob 转账 50.00 ---")
await check_balances('NONEXIST001', 'ACC002')
success3 = await transfer_funds('NONEXIST001', 'ACC002', 50.00)
print(f"转账操作结果: {
'成功' if success3 else '失败'}")
await check_balances('ACC002') # Bob的余额应保持不变
# 场景4: 转账过程中发生模拟的意外错误 (如果启用了模拟错误代码)
# print("
--- 场景4: Alice 向 Eve_Fail 转账 50.00 (模拟拨款失败) ---")
# await check_balances('ACC001', 'ACC_FAIL_TRANSFER')
# success4 = await transfer_funds('ACC001', 'ACC_FAIL_TRANSFER', 50.00)
# print(f"转账操作结果: {'成功' if success4 else '失败'}")
# await check_balances('ACC001', 'ACC_FAIL_TRANSFER') # 双方余额都应保持不变,因为事务会回滚
# 场景5: 转账金额为0或负数 (业务逻辑校验)
print("
--- 场景5: Alice 向 Bob 转账 -50.00 (无效金额) ---")
success5 = await transfer_funds('ACC001', 'ACC002', -50.00)
print(f"转账操作结果: {
'成功' if success5 else '失败'}")
await check_balances('ACC001', 'ACC002') # 余额应保持不变
# 清理并关闭连接池
await close_tx_db_pool()
if __name__ == "__main__":
asyncio.run(main_transaction_example())
代码解释 transfer_funds
函数:
获取连接: async with tx_db_pool.acquire() as conn:
从配置为 autocommit=False
的池中获取连接。
获取游标: async with conn.cursor(aiomysql.DictCursor) as cur:
。
await conn.begin()
: 显式启动一个事务。这是一个好习惯,能清晰地标记事务的开始。
余额检查与 FOR UPDATE
:
SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE;
: 在读取转出账户余额时,使用了 FOR UPDATE
子句。
作用: FOR UPDATE
会对查询匹配到的行(在这里是 from_account_id
对应的行)施加一个排他锁(X锁)。这意味着在当前事务提交或回滚之前,其他尝试读取这些行(如果也用 FOR UPDATE
或 LOCK IN SHARE MODE
)或尝试更新/删除这些行的事务将会被阻塞。
原因: 这是为了防止并发场景下的“丢失更新”或“脏读”问题。例如,如果没有 FOR UPDATE
,两个并发的转账请求可能同时读取到足够的余额,然后都进行扣款,导致余额透支。FOR UPDATE
确保了在检查余额和执行扣款之间,该账户的余额不会被其他事务修改。
条件判断与回滚:
如果账户不存在或余额不足,打印错误信息,然后调用 await conn.rollback()
来撤销事务开始以来的所有操作(在这个阶段,实际上还没有执行任何修改操作,但回滚是标准流程)。
执行 UPDATE
:
UPDATE accounts SET balance = balance - %s WHERE account_id = %s;
UPDATE accounts SET balance = balance + %s WHERE account_id = %s;
cur.rowcount != 1
: 检查每个 UPDATE
语句是否确实影响了预期的行数(通常是1行)。如果不是,可能意味着账户ID错误,或者账户在此期间被意外删除(尽管 FOR UPDATE
应该能部分防止这种情况),这通常表示一个异常状态,应该回滚事务。
模拟错误: 代码中注释掉的部分 if to_account_id == "ACC_FAIL_TRANSFER": raise Exception(...)
可以用来测试当事务中间发生Python异常时的回滚逻辑。
await conn.commit()
: 如果所有操作都成功执行(余额充足,账户存在,更新操作影响了正确的行数),则调用此方法将所有更改永久写入数据库。
异常处理与回滚:
try...except aiomysql.MySQLError as db_err:
: 捕获特定于数据库驱动的错误。
try...except Exception as e:
: 捕获任何其他Python层面可能发生的异常。
在任何捕获到异常的情况下,都会尝试调用 await conn.rollback()
来确保数据的一致性。
if conn.connected and not conn.closed:
: 在尝试回滚前检查连接状态是一个好习惯,避免在已关闭的连接上操作。
代码解释 main_transaction_example
函数:
初始化与清理: 调用 init_tx_db_pool()
。在开始时,它会清理并重建 accounts
表,并插入一些初始数据。注意,由于连接池是 autocommit=False
,所以 DDL 和初始 INSERT
操作也需要包裹在 conn.begin()
和 conn.commit()
之间。
场景演示: 通过多个转账尝试来演示事务的各种行为:成功、因余额不足失败、因账户不存在失败等。
check_balances
: 一个辅助函数,用于在操作前后查询并显示账户余额,以验证事务的效果。
关闭连接池: await close_tx_db_pool()
。
这个转账示例清晰地展示了在 aiomysql
中如何通过 begin
, commit
, rollback
以及结合错误处理来实现可靠的异步事务。FOR UPDATE
的使用是处理并发修改的关键点,但它也可能增加锁竞争,需要根据实际应用的并发模式和性能要求来权衡。
7.2. 使用异步上下文管理器进行事务管理 (Transactions with Asynchronous Context Managers)
虽然显式的 begin
, commit
, rollback
调用是有效的,但它们可能会使代码显得冗长,并且容易忘记在所有可能的代码路径(包括异常路径)中正确地调用 commit
或 rollback
。Python 的异步上下文管理器 (async with
) 提供了一种更优雅、更安全的方式来管理资源,包括事务。
我们可以创建一个自定义的异步上下文管理器来封装事务逻辑。
import asyncio
import aiomysql
# --- 前面定义的 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool, init_tx_db_pool, close_tx_db_pool ---
class AsyncTransactionManager:
"""
一个异步上下文管理器,用于简化 aiomysql 中的事务处理。
它从连接池获取连接,并自动处理事务的开始、提交和回滚。
"""
def __init__(self, pool, cursor_type=aiomysql.DictCursor):
"""
初始化事务管理器。
参数:
pool: aiomysql 连接池 (必须配置为 autocommit=False)。
cursor_type: 希望游标返回结果的类型,例如 aiomysql.DictCursor 或 aiomysql.Cursor。
"""
if pool is None or pool.autocommit is True:
raise ValueError("连接池必须提供且配置为 autocommit=False")
self._pool = pool
self._conn = None # 将在 __aenter__ 中获取
self._cursor_type = cursor_type
self.cur = None # 游标将在 __aenter__ 中创建并对外暴露
async def __aenter__(self):
"""
异步进入上下文时调用。获取连接,开始事务,并返回游标。
"""
self._conn = await self._pool.acquire() # 从池中获取连接
try:
await self._conn.begin() # 显式开始事务
self.cur = await self._conn.cursor(self._cursor_type) # 创建游标
print("AsyncTransactionManager: 事务开始,游标已创建。")
return self.cur # 返回游标供 'as' 子句使用
except Exception as e:
# 如果在 __aenter__ 中发生错误(例如 begin() 失败),需要释放连接
if self._conn:
self._pool.release(self._conn) # 归还连接给池
self._conn = None
print(f"AsyncTransactionManager: __aenter__ 失败: {
e}")
raise # 重新抛出异常
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
异步退出上下文时调用。根据是否发生异常来提交或回滚事务,并关闭游标和连接。
参数:
exc_type: 异常类型 (如果上下文中没有异常,则为 None)。
exc_val: 异常实例 (如果上下文中没有异常,则为 None)。
exc_tb: 异常的 traceback (如果上下文中没有异常,则为 None)。
"""
if self.cur:
await self.cur.close() # 关闭游标
print("AsyncTransactionManager: 游标已关闭。")
if not self._conn: # 如果连接在 aenter 中未能成功获取
return False # 指示不抑制异常 (如果存在)
try:
if exc_type is not None: # 如果在 'async with' 块内发生了异常
print(f"AsyncTransactionManager: 检测到异常 ({
exc_type}),正在回滚事务...")
await self._conn.rollback()
print("AsyncTransactionManager: 事务已回滚。")
else: # 如果没有异常发生
print("AsyncTransactionManager: 未检测到异常,正在提交事务...")
await self._conn.commit()
print("AsyncTransactionManager: 事务已提交。")
except Exception as e_tx:
# 如果提交或回滚本身失败,这是一个严重的问题
print(f"AsyncTransactionManager: 在提交/回滚事务时发生错误: {
e_tx}")
# 即使提交/回滚失败,也要确保连接被释放
# 决定是否抑制原始异常 (exc_type) 还是这个新的异常 (e_tx)
# 通常,不应抑制原始异常,除非这个新异常更关键
if exc_type is None: # 如果原始块没有异常,但提交/回滚有,则抛出新的
raise e_tx
# 否则,原始异常 (exc_type) 将被 async with 语句重新抛出
finally:
if self._conn:
self._pool.release(self._conn) # 无论如何,都要将连接归还给池
self._conn = None
print("AsyncTransactionManager: 连接已释放回池。")
# 返回 False 表示不抑制在 'async with' 块中发生的异常 (如果有的话)
# 如果返回 True,则会抑制异常 (通常不推荐,除非明确处理了它)
return False
async def transfer_funds_with_cm(from_account_id: str, to_account_id: str, amount: float):
"""
使用 AsyncTransactionManager 上下文管理器执行资金转账。
"""
if tx_db_pool is None: return False
if amount <= 0: return False
if from_account_id == to_account_id: return False
try:
# 使用异步上下文管理器
async with AsyncTransactionManager(tx_db_pool) as cur: # cur 就是 __aenter__ 返回的游标
print(f"上下文管理器事务:从 {
from_account_id} 转账 {
amount} 到 {
to_account_id}")
# 1. 检查转出账户余额 (逻辑与之前类似,但现在在 'async with' 块内)
await cur.execute(
"SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE;",
(from_account_id,)
)
from_account = await cur.fetchone()
if not from_account:
print(f"错误:转出账户 {
from_account_id} 不存在。")
# 不需要手动 rollback,上下文管理器会处理
raise ValueError(f"转出账户 {
from_account_id} 不存在") # 抛出异常触发回滚
if from_account['balance'] < amount:
print(f"错误:转出账户 {
from_account_id} 余额不足。")
raise ValueError(f"账户 {
from_account_id} 余额不足") # 抛出异常触发回滚
# 2. 检查转入账户是否存在
await cur.execute(
"SELECT account_id FROM accounts WHERE account_id = %s FOR UPDATE;",
(to_account_id,)
)
if not await cur.fetchone():
print(f"错误:转入账户 {
to_account_id} 不存在。")
raise ValueError(f"转入账户 {
to_account_id} 不存在")
# 3. 从转出账户扣款
await cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE account_id = %s;",
(amount, from_account_id)
)
if cur.rowcount != 1:
raise RuntimeError(f"从账户 {
from_account_id} 扣款失败")
print(f"账户 {
from_account_id} 已通过上下文管理器扣款 {
amount}。")
# 模拟一个可能发生的错误
# if to_account_id == "ACC_FAIL_TRANSFER_CM":
# raise Exception("模拟上下文管理器内拨款失败!")
# 4. 向转入账户拨款
await cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE account_id = %s;",
(amount, to_account_id)
)
if cur.rowcount != 1:
raise RuntimeError(f"向账户 {
to_account_id} 拨款失败")
print(f"账户 {
to_account_id} 已通过上下文管理器收款 {
amount}。")
# 如果代码执行到这里没有抛出任何异常,__aexit__ 将会自动提交事务
# 如果 async with 块成功完成(没有未捕获的异常)
print(f"上下文管理器转账成功!从 {
from_account_id} 转账 {
amount} 到 {
to_account_id}")
return True
except ValueError as ve: # 捕获我们自己定义的业务逻辑错误
print(f"转账业务逻辑错误 (上下文管理器): {
ve}")
return False
except RuntimeError as rte: # 捕获我们自己定义的运行时错误
print(f"转账运行时错误 (上下文管理器): {
rte}")
return False
except aiomysql.MySQLError as db_err: # 捕获数据库特定错误
print(f"数据库操作错误 (上下文管理器): {
db_err}")
return False
except Exception as e: # 捕获其他任何意外异常
print(f"意外错误 (上下文管理器): {
e}")
return False
async def main_context_manager_example():
await init_tx_db_pool() # 使用之前定义的事务型连接池初始化
if not tx_db_pool:
print("事务型连接池 (上下文管理器示例) 初始化失败,程序退出。")
return
# 确保表和数据存在 (与之前 main_transaction_example 中的设置类似)
async with tx_db_pool.acquire() as conn:
async with conn.cursor() as cur:
await conn.begin()
await cur.execute("DROP TABLE IF EXISTS accounts;")
await cur.execute("""
CREATE TABLE IF NOT EXISTS accounts (
account_id VARCHAR(50) PRIMARY KEY,
owner_name VARCHAR(100) NOT NULL,
balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0)
) ENGINE=InnoDB;
""")
initial_data = [
('ACC_CM_001', 'Alice_CM', 1000.00),
('ACC_CM_002', 'Bob_CM', 500.00),
('ACC_CM_003', 'Charlie_CM_Insufficient', 50.00),
('ACC_FAIL_TRANSFER_CM', 'Eve_CM_Fail', 100.00)
]
for acc_id, owner, bal in initial_data:
await cur.execute(
"INSERT INTO accounts (account_id, owner_name, balance) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE balance = VALUES(balance);",
(acc_id, owner, bal)
)
await conn.commit()
print("初始账户数据已为上下文管理器示例插入并提交。")
print("
--- 场景CM 1: Alice_CM 向 Bob_CM 转账 100.00 (成功) ---")
await check_balances('ACC_CM_001', 'ACC_CM_002') # check_balances 复用之前的定义
res_cm1 = await transfer_funds_with_cm('ACC_CM_001', 'ACC_CM_002', 100.00)
print(f"上下文管理器转账结果: {
'成功' if res_cm1 else '失败'}")
await check_balances('ACC_CM_001', 'ACC_CM_002')
print("
--- 场景CM 2: Charlie_CM 向 Bob_CM 转账 200.00 (余额不足) ---")
await check_balances('ACC_CM_003', 'ACC_CM_002')
res_cm2 = await transfer_funds_with_cm('ACC_CM_003', 'ACC_CM_002', 200.00)
print(f"上下文管理器转账结果: {
'成功' if res_cm2 else '失败'}")
await check_balances('ACC_CM_003', 'ACC_CM_002') # 余额应不变
# print("
--- 场景CM 3: Alice_CM 向 Eve_CM_Fail 转账 50.00 (模拟内部错误) ---")
# await check_balances('ACC_CM_001', 'ACC_FAIL_TRANSFER_CM')
# res_cm3 = await transfer_funds_with_cm('ACC_CM_001', 'ACC_FAIL_TRANSFER_CM', 50.00)
# print(f"上下文管理器转账结果: {'成功' if res_cm3 else '失败'}")
# await check_balances('ACC_CM_001', 'ACC_FAIL_TRANSFER_CM') # 余额应不变
await close_tx_db_pool()
if __name__ == "__main__":
# 为了独立运行此示例,需要包含或模拟 init_tx_db_pool, close_tx_db_pool, check_balances
# 和 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool
# (此处假设它们已在全局或可访问范围内定义,如同一个脚本中的前一部分)
DB_CONFIG_NO_AUTOCOMMIT = {
'host': '127.0.0.1', 'port': 3306, 'user': 'your_user',
'password': 'your_password', 'db': 'your_database_tx_cm', # 使用新库
}
tx_db_pool = None
# (复用/重定义 init_tx_db_pool, close_tx_db_pool, check_balances 以便脚本可独立运行)
async def init_tx_db_pool():
global tx_db_pool
if tx_db_pool is None:
try:
tx_db_pool = await aiomysql.create_pool(
minsize=1, maxsize=5, **DB_CONFIG_NO_AUTOCOMMIT,
loop=asyncio.get_event_loop(), autocommit=False
)
print("事务型连接池 (CM example) 初始化成功。")
except Exception as e:
print(f"初始化事务型连接池 (CM example) 失败: {
e}")
tx_db_pool = None
async def close_tx_db_pool():
global tx_db_pool
if tx_db_pool:
tx_db_pool.close()
await tx_db_pool.wait_closed()
print("事务型连接池 (CM example) 已关闭。")
tx_db_pool = None
async def check_balances(*account_ids): # 简化版,实际应复用之前的
if tx_db_pool is None: return
async with tx_db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
print("
--- 当前账户余额 (CM Example) ---")
for acc_id in account_ids:
await cur.execute("SELECT owner_name, balance FROM accounts WHERE account_id = %s;", (acc_id,))
account = await cur.fetchone()
if account: print(f" 账户: {
acc_id}, 余额: {
account['balance']}")
else: print(f" 账户: {
acc_id} 未找到。")
asyncio.run(main_context_manager_example())
代码解释 AsyncTransactionManager
:
__init__(self, pool, cursor_type)
:
接收一个已配置为 autocommit=False
的 aiomysql
连接池。
如果池未提供或 autocommit
为 True
,则抛出 ValueError
。
存储池和期望的游标类型。
async def __aenter__(self)
:
self._conn = await self._pool.acquire()
: 从池中异步获取一个数据库连接。
await self._conn.begin()
: 显式开始一个新事务。
self.cur = await self._conn.cursor(self._cursor_type)
: 创建一个游标。
return self.cur
: 将游标返回,这样在 async with ... as cur:
语句中,cur
就可以直接使用了。
错误处理: 如果在 __aenter__
(例如 begin()
失败)中发生错误,它会确保释放已获取的连接,然后重新抛出异常,这样 async with
块就不会执行。
async def __aexit__(self, exc_type, exc_val, exc_tb)
:
当 async with
块执行完毕或块内发生未捕获的异常时,此方法会被调用。
exc_type
, exc_val
, exc_tb
: 这三个参数包含了异常信息。如果 async with
块正常完成(没有异常),它们都将是 None
。
关闭游标: if self.cur: await self.cur.close()
。
事务处理:
if exc_type is not None:
: 如果 async with
块内部发生了异常,意味着事务应该回滚。调用 await self._conn.rollback()
。
else:
: 如果没有异常发生,意味着事务中的所有操作都成功了(或者至少没有抛出Python异常),调用 await self._conn.commit()
。
错误处理中的错误处理: 如果 commit()
或 rollback()
本身失败,它会打印错误。
finally
块: 无论 commit
或 rollback
是否成功,self._pool.release(self._conn)
都会被调用,以确保连接总是被归还到池中。这是至关重要的,否则连接池中的连接会耗尽。
return False
: 表示如果 async with
块内发生了异常,__aexit__
方法不会抑制(swallow)这个异常,异常会继续向上传播。这是标准行为。如果你想抑制异常(例如,因为你已经完全处理了它),你可以返回 True
。
transfer_funds_with_cm
函数:
基本逻辑与 transfer_funds
相同,但事务的开始、提交、回滚以及连接和游标的管理都由 AsyncTransactionManager
自动处理。
如果发生业务逻辑错误(如余额不足),函数会 raise ValueError(...)
。这个异常会被 AsyncTransactionManager
的 __aexit__
捕获,导致事务回滚。
代码更加简洁,因为不需要显式的 conn.begin()
, conn.commit()
, conn.rollback()
调用,也不需要在每个错误路径上手动管理连接释放。
外层的 try...except
块现在主要用于捕获并记录由 AsyncTransactionManager
传播出来的或在管理器之外发生的错误,并返回一个表示操作成功与否的布尔值。
优势:
代码更简洁: 减少了事务管理的样板代码。
更安全: 自动处理提交和回滚,减少了因忘记调用这些方法而导致数据不一致的风险。连接和游标的释放也得到了保证。
已关注业务逻辑: 开发者可以更专注于实现核心的数据库操作逻辑。
何时选择:
对于任何涉及多步数据库修改且需要原子性的操作,使用事务上下文管理器通常是更推荐的方式。
如果需要非常细致的、非典型的事务控制流程(例如,在事务中多次提交部分工作,这通常不推荐),则可能需要显式控制。
7.3. 异步事务中的保存点 (Savepoints in Asynchronous Transactions)
事务保存点 (Savepoints) 提供了一种在事务内部创建“检查点”的机制。如果后续操作失败,你可以选择回滚到某个保存点,而不是回滚整个事务。这对于实现更复杂的、可能包含可选操作或多阶段提交逻辑的事务非常有用。
aiomysql
(通过其底层的 PyMySQL
) 支持 SQL标准的保存点命令:
await conn.savepoint(name: str)
: 在当前事务中创建一个名为 name
的保存点。
await conn.rollback_savepoint(name: str)
: 回滚到名为 name
的保存点。此操作会撤销在该保存点之后执行的所有更改,但保存点本身仍然存在,事务也仍然活动。在该保存点之前所做的更改不受影响。
await conn.release_savepoint(name: str)
: 销毁名为 name
的保存点。这通常在不再需要回滚到该保存点时执行,可以释放一些数据库资源。注意:在某些数据库系统中,回滚到某个保存点后,其后的保存点可能会自动被销毁。提交整个事务或回滚整个事务也会销毁所有保存点。
企业场景:复杂订单处理流程
假设一个订单处理系统,创建订单涉及多个步骤:
创建订单主记录 (核心操作,必须成功)。
尝试扣减商品库存 (核心操作,必须成功)。
可选:为用户应用优惠券 (如果失败,不应导致整个订单失败,但应记录下来)。
可选:记录用户积分变更 (如果失败,也不应导致整个订单失败)。
最终确认订单。
如果核心操作(1和2)失败,整个事务回滚。如果可选操作(3或4)失败,我们可能只想撤销该可选操作本身,然后继续处理订单,或者标记该可选操作失败并继续。
import asyncio
import aiomysql
# --- 假设 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool, init_tx_db_pool, close_tx_db_pool,
# --- AsyncTransactionManager, check_balances (修改为检查库存和优惠券) 已定义 ---
# 模拟表结构 (简化)
# accounts 表 (复用,假设有用户账户用于支付)
# products 表
# CREATE TABLE IF NOT EXISTS products (
# product_id VARCHAR(50) PRIMARY KEY,
# name VARCHAR(100),
# stock INT UNSIGNED NOT NULL,
# price DECIMAL(10,2) NOT NULL
# ) ENGINE=InnoDB;
# orders 表
# CREATE TABLE IF NOT EXISTS orders (
# order_id VARCHAR(50) PRIMARY KEY,
# user_account_id VARCHAR(50),
# total_amount DECIMAL(10,2),
# status VARCHAR(20) DEFAULT 'PENDING', -- PENDING, PROCESSING, COMPLETED, FAILED_COUPON, FAILED_POINTS
# created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
# FOREIGN KEY (user_account_id) REFERENCES accounts(account_id)
# ) ENGINE=InnoDB;
# order_items 表
# CREATE TABLE IF NOT EXISTS order_items (
# item_id INT AUTO_INCREMENT PRIMARY KEY,
# order_id VARCHAR(50),
# product_id VARCHAR(50),
# quantity INT,
# price_per_unit DECIMAL(10,2),
# FOREIGN KEY (order_id) REFERENCES orders(order_id),
# FOREIGN KEY (product_id) REFERENCES products(product_id)
# ) ENGINE=InnoDB;
# coupons 表
# CREATE TABLE IF NOT EXISTS coupons (
# coupon_code VARCHAR(50) PRIMARY KEY,
# discount_amount DECIMAL(10,2),
# is_valid BOOLEAN DEFAULT TRUE
# ) ENGINE=InnoDB;
async def process_order_with_savepoints(
user_id: str,
product_details: list, # [{'product_id': 'P001', 'quantity': 1}, ...]
coupon_code: str = None
):
"""
使用保存点处理复杂订单流程。
"""
if tx_db_pool is None: return "连接池未初始化"
order_id = f"ORD_{
asyncio.get_running_loop().time()}" # 简易订单ID
final_order_status = "PROCESSING" # 初始期望状态
# 使用我们之前定义的 AsyncTransactionManager
# 注意:conn 对象本身在 AsyncTransactionManager 内部,如果需要直接调用 conn.savepoint 等,
# Manager 需要暴露 conn 或者提供包装方法。
# 为简单起见,我们这里直接在获取连接后操作,不使用 AsyncTransactionManager,
# 以便清晰展示 savepoint 的用法。如果用 Manager,Manager 需要扩展。
async with tx_db_pool.acquire() as conn: # 确保 autocommit=False
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
await conn.begin() # 开始主事务
print(f"订单 {
order_id}: 主事务开始。")
# 1. 创建订单主记录
total_amount_calculated = 0 # 稍后计算
await cur.execute(
"INSERT INTO orders (order_id, user_account_id, total_amount, status) VALUES (%s, %s, %s, %s);",
(order_id, user_id, 0, 'PENDING_CALCULATION') # 初始总金额为0,状态待计算
)
if cur.rowcount != 1:
raise RuntimeError("创建订单主记录失败。")
print(f"订单 {
order_id}: 主记录已创建。")
# 2. 处理订单项并扣减库存 (核心)
for item in product_details:
pid, qty = item['product_id'], item['quantity']
# 获取产品价格和当前库存 (加锁以确保一致性)
await cur.execute(
"SELECT price, stock FROM products WHERE product_id = %s FOR UPDATE;", (pid,)
)
product_info = await cur.fetchone()
if not product_info:
raise ValueError(f"产品 {
pid} 不存在。")
if product_info['stock'] < qty:
raise ValueError(f"产品 {
pid} 库存不足 (需要 {
qty}, 现有 {
product_info['stock']})。")
# 扣减库存
await cur.execute(
"UPDATE products SET stock = stock - %s WHERE product_id = %s;", (qty, pid)
)
if cur.rowcount != 1:
raise RuntimeError(f"扣减产品 {
pid} 库存失败。")
item_price = product_info['price']
total_amount_calculated += item_price * qty
# 插入订单项
await cur.execute(
"INSERT INTO order_items (order_id, product_id, quantity, price_per_unit) VALUES (%s, %s, %s, %s);",
(order_id, pid, qty, item_price)
)
print(f"订单 {
order_id}: 产品 {
pid} (数量 {
qty}) 已添加,库存已扣减。")
# 更新订单总金额
await cur.execute(
"UPDATE orders SET total_amount = %s, status = 'PENDING_PAYMENT' WHERE order_id = %s;",
(total_amount_calculated, order_id)
)
print(f"订单 {
order_id}: 总金额 {
total_amount_calculated} 已计算。")
# --- 可选操作:应用优惠券 ---
if coupon_code:
savepoint_coupon = "sp_coupon_application"
await conn.savepoint(savepoint_coupon) # 创建优惠券应用的保存点
print(f"订单 {
order_id}: 创建保存点 '{
savepoint_coupon}'。")
try:
await cur.execute(
"SELECT discount_amount, is_valid FROM coupons WHERE coupon_code = %s FOR UPDATE;",
(coupon_code,)
)
coupon_info = await cur.fetchone()
if not coupon_info or not coupon_info['is_valid']:
raise ValueError(f"优惠券 '{
coupon_code}' 无效或不存在。")
discount = coupon_info['discount_amount']
# 假设优惠券使用后失效
await cur.execute("UPDATE coupons SET is_valid = FALSE WHERE coupon_code = %s;", (coupon_code,))
if cur.rowcount != 1:
raise RuntimeError(f"更新优惠券 '{
coupon_code}' 状态失败。")
total_amount_calculated -= discount
if total_amount_calculated < 0: total_amount_calculated = 0 # 防止负金额
await cur.execute(
"UPDATE orders SET total_amount = %s, status = 'PENDING_PAYMENT_COUPON_APPLIED' WHERE order_id = %s;",
(total_amount_calculated, order_id)
)
print(f"订单 {
order_id}: 优惠券 '{
coupon_code}' 应用成功,折扣 {
discount}。新总额: {
total_amount_calculated}")
await conn.release_savepoint(savepoint_coupon) # 成功,释放保存点 (可选)
print(f"订单 {
order_id}: 释放保存点 '{
savepoint_coupon}'。")
except (ValueError, RuntimeError) as e_coupon:
print(f"订单 {
order_id}: 应用优惠券 '{
coupon_code}' 失败: {
e_coupon}。回滚到 '{
savepoint_coupon}'。")
await conn.rollback_savepoint(savepoint_coupon) # 回滚优惠券操作
# 即使优惠券失败,订单本身可能仍然有效,但状态需要更新
final_order_status = "PENDING_PAYMENT_COUPON_FAILED"
await cur.execute( # 更新订单状态以反映优惠券失败
"UPDATE orders SET status = %s WHERE order_id = %s;",
(final_order_status, order_id)
)
print(f"订单 {
order_id}: 优惠券操作已回滚,订单状态更新为 '{
final_order_status}'。")
# 注意:此时 total_amount_calculated 仍然是应用优惠券前的金额,因为回滚了。
# 需要重新获取或基于之前的值。为简化,我们假设主流程继续。
# --- 假设还有其他可选操作,如积分 (类似逻辑,使用另一个savepoint) ---
# 最终提交或根据情况更新订单状态
if final_order_status.endswith("_FAILED"): # 如果有可选操作失败了
print(f"订单 {
order_id}: 由于可选操作失败,最终状态为 '{
final_order_status}'。")
else:
final_order_status = "PROCESSING" # 如果一切顺利
await cur.execute(
"UPDATE orders SET status = %s WHERE order_id = %s;",
(final_order_status, order_id)
)
print(f"订单 {
order_id}: 所有操作完成。准备提交主事务。最终状态: '{
final_order_status}'")
await conn.commit() # 提交整个事务
print(f"订单 {
order_id}: 主事务已提交。")
return f"订单 {
order_id} 处理完成,状态: {
final_order_status}, 总金额: {
total_amount_calculated if not final_order_status.endswith('_FAILED') else '查看订单详情'}"
except (ValueError, RuntimeError, aiomysql.MySQLError) as e_main:
print(f"订单 {
order_id}: 主事务发生错误: {
e_main}。正在回滚整个事务...")
if conn.connected and not conn.closed: # 确保连接有效
await conn.rollback()
print(f"订单 {
order_id}: 主事务已回滚。")
return f"订单 {
order_id} 处理失败: {
e_main}"
except Exception as e_unexpected:
print(f"订单 {
order_id}: 发生意外错误: {
e_unexpected}。正在回滚整个事务...")
if conn.connected and not conn.closed:
await conn.rollback()
print(f"订单 {
order_id}: 主事务因意外错误已回滚。")
return f"订单 {
order_id} 处理失败 (意外): {
e_unexpected}"
async def setup_savepoint_demo_tables():
if tx_db_pool is None: await init_tx_db_pool() # 确保池已初始化
async with tx_db_pool.acquire() as conn:
async with conn.cursor() as cur:
await conn.begin()
await cur.execute("DROP TABLE IF EXISTS order_items;")
await cur.execute("DROP TABLE IF EXISTS orders;")
await cur.execute("DROP TABLE IF EXISTS coupons;")
await cur.execute("DROP TABLE IF EXISTS products;")
# accounts 表通常已存在,但确保它存在
await cur.execute("""
CREATE TABLE IF NOT EXISTS accounts (
account_id VARCHAR(50) PRIMARY KEY, owner_name VARCHAR(100) NOT NULL,
balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0)
) ENGINE=InnoDB;
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS products (
product_id VARCHAR(50) PRIMARY KEY, name VARCHAR(100),
stock INT UNSIGNED NOT NULL, price DECIMAL(10,2) NOT NULL
) ENGINE=InnoDB;
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS coupons (
coupon_code VARCHAR(50) PRIMARY KEY, discount_amount DECIMAL(10,2),
is_valid BOOLEAN DEFAULT TRUE
) ENGINE=InnoDB;
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR(50) PRIMARY KEY, user_account_id VARCHAR(50),
total_amount DECIMAL(10,2), status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_account_id) REFERENCES accounts(account_id)
) ENGINE=InnoDB;
""")
await cur.execute("""
CREATE TABLE IF NOT EXISTS order_items (
item_id INT AUTO_INCREMENT PRIMARY KEY, order_id VARCHAR(50),
product_id VARCHAR(50), quantity INT, price_per_unit DECIMAL(10,2),
FOREIGN KEY (order_id) REFERENCES orders(order_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) ENGINE=InnoDB;
""")
# 插入初始数据
users = [('USER001', 'Test User 1', 5000.00), ('USER002', 'Test User 2', 300.00)]
products_data = [('P001', 'Laptop', 10, 1200.00), ('P002', 'Mouse', 50, 25.00), ('P003', 'Keyboard', 30, 75.00)]
coupons_data = [('SAVE10', 10.00, True), ('INVALID20', 20.00, False), ('BIG50', 50.00, True)]
for acc_id, owner, bal in users: await cur.execute("INSERT IGNORE INTO accounts VALUES (%s,%s,%s)", (acc_id,owner,bal))
for pid, name, stock, price in products_data: await cur.execute("INSERT IGNORE INTO products VALUES (%s,%s,%s,%s)", (pid,name,stock,price))
for code, disc, valid in coupons_data: await cur.execute("INSERT IGNORE INTO coupons VALUES (%s,%s,%s)", (code,disc,valid))
await conn.commit()
print("演示保存点所需的表和初始数据已设置/重置。")
async def main_savepoint_example():
global tx_db_pool # 确保我们可以访问和修改全局的连接池
tx_db_pool = None # 重置池,以便 init_tx_db_pool 可以创建一个新的
DB_CONFIG_NO_AUTOCOMMIT['db'] = 'your_database_tx_sp' # 使用特定数据库
await init_tx_db_pool() # 初始化 `autocommit=False` 的连接池
if not tx_db_pool: return
await setup_savepoint_demo_tables()
print("
--- 场景SP 1: 成功订单,使用有效优惠券 'SAVE10' ---")
order1_items = [{
'product_id': 'P001', 'quantity': 1}, {
'product_id': 'P002', 'quantity': 2}]
result1 = await process_order_with_savepoints('USER001', order1_items, coupon_code='SAVE10')
print(f"场景SP 1 结果: {
result1}")
print("
--- 场景SP 2: 成功订单,使用无效优惠券 'INVALID20' (优惠券部分应回滚) ---")
order2_items = [{
'product_id': 'P003', 'quantity': 1}]
result2 = await process_order_with_savepoints('USER001', order2_items, coupon_code='INVALID20')
print(f"场景SP 2 结果: {
result2}")
print("
--- 场景SP 3: 订单因库存不足失败 (主事务应回滚) ---")
order3_items = [{
'product_id': 'P001', 'quantity': 20}] # P001 库存只有10
result3 = await process_order_with_savepoints('USER002', order3_items, coupon_code='BIG50')
print(f"场景SP 3 结果: {
result3}")
print("
--- 场景SP 4: 成功订单,不使用优惠券 ---")
order4_items = [{
'product_id': 'P002', 'quantity': 1}]
result4 = await process_order_with_savepoints('USER002', order4_items)
print(f"场景SP 4 结果: {
result4}")
# 检查最终库存和优惠券状态
async with tx_db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
print("
--- 最终产品库存 ---")
await cur.execute("SELECT product_id, stock FROM products;")
for row in await cur.fetchall(): print(f" {
row['product_id']}: {
row['stock']}")
print("
--- 最终优惠券状态 ---")
await cur.execute("SELECT coupon_code, is_valid FROM coupons;")
for row in await cur.fetchall(): print(f" {
row['coupon_code']}: {
'有效' if row['is_valid'] else '失效'}")
await close_tx_db_pool()
if __name__ == "__main__":
asyncio.run(main_savepoint_example())
代码解释 process_order_with_savepoints
:
主事务: await conn.begin()
启动整个订单处理事务。
核心操作: 创建订单记录、处理订单项、扣减库存。这些操作如果失败,会抛出异常,导致整个事务回滚(在最外层的 except
块中处理)。
保存点 sp_coupon_application
:
await conn.savepoint(savepoint_coupon)
: 在尝试应用优惠券之前创建一个保存点。
try...except
块包裹了优惠券应用逻辑。
如果优惠券应用成功:
await conn.release_savepoint(savepoint_coupon)
: 显式释放保存点。虽然不是严格必需(事务结束时所有保存点都会消失),但这是良好实践。
如果优惠券应用失败 (例如,优惠券无效或更新数据库时出错):
await conn.rollback_savepoint(savepoint_coupon)
: 将事务状态回滚到 sp_coupon_application
创建的时刻。这意味着对 orders
表(总金额)和 coupons
表(有效性)所做的与此优惠券相关的更改都被撤销了。
核心订单操作(如库存扣减)不受此保存点回滚的影响。
final_order_status
被更新以反映优惠券失败,订单继续处理。
提交或回滚主事务:
如果所有核心操作都成功(并且任何可选操作失败后都已妥善处理,例如通过保存点回滚并更新状态),则 await conn.commit()
提交整个事务。
如果任何主流程中的操作(非保存点保护的)或未被捕获的异常发生,最外层的 except
块会执行 await conn.rollback()
来回滚整个事务。
main_savepoint_example
和 setup_savepoint_demo_tables
:
setup_savepoint_demo_tables
创建了更复杂的表结构 (products
, coupons
, orders
, order_items
) 并填充了初始数据,以支持保存点场景。
main_savepoint_example
演示了不同情况:
订单成功且优惠券成功。
订单核心成功,但优惠券无效(优惠券操作回滚,订单继续)。
订单因核心操作(库存不足)失败(整个事务回滚)。
订单成功且不使用优惠券。
最后检查数据库状态以验证操作的正确性。
使用保存点的注意事项:
复杂性: 保存点增加了事务逻辑的复杂性。只在确实需要部分回滚的场景下使用。
命名: 保存点名称在事务内必须唯一。
数据库支持: 确保你的 MySQL 版本和存储引擎 (通常是 InnoDB) 支持保存点。
性能: 过多地创建和回滚保存点可能会有轻微的性能开销。
与 AsyncTransactionManager
的集成: 如果要将保存点与我们之前创建的 AsyncTransactionManager
一起使用,AsyncTransactionManager
需要进行扩展,例如提供一个方法来访问底层的 conn
对象,或者直接在 AsyncTransactionManager
上实现 savepoint
, rollback_savepoint
, release_savepoint
的包装方法。
通过这两个深入的事务管理示例(显式控制和上下文管理器)以及保存点的应用,我们已经覆盖了在 Python 和 aiomysql
中进行健壮的异步数据库操作的关键方面。确保数据的一致性是任何企业级应用的核心要求。
暂无评论内容