【Python】Python接入MySQL数据库

Python 接入 MySQL 数据库

一、Python 与 MySQL 数据库交互概览 (Overview of Python-MySQL Interaction)

1. 为何选择 Python 与 MySQL 结合 (Why Combine Python with MySQL?)

Python 以其简洁的语法、丰富的库生态系统以及在Web开发、数据分析、人工智能、自动化脚本等领域的广泛应用而著称。MySQL 则是世界上最流行的开源关系型数据库之一,以其稳定性、高性能和易用性而受到青睐。将两者结合,可以构建出功能强大、数据驱动的应用程序。

Web 开发: 诸如 Django、Flask、FastAPI 等流行的 Python Web 框架都能够与 MySQL 无缝集成,用于存储用户信息、应用数据、会话管理等。
数据分析与科学: Python 的 Pandas、NumPy、SciPy 等库与 MySQL 结合,可以方便地从数据库中提取数据进行分析、处理和可视化。
自动化与脚本: Python 脚本可以轻松连接 MySQL,执行数据迁移、备份、报表生成、ETL(Extract, Transform, Load)等任务。
企业应用: 许多企业级应用后台使用 Python 处理业务逻辑,并依赖 MySQL 作为其核心数据存储。
易用性与社区支持: Python 提供了多种成熟的 MySQL 连接器(驱动程序),简化了开发过程。同时,Python 和 MySQL 都有庞大而活跃的开发者社区,可以方便地找到解决方案和支持。

2. Python 访问 MySQL 的基本架构 (Basic Architecture of Python Accessing MySQL)

Python 应用程序通过一个称为数据库连接器 (Database Connector)驱动程序 (Driver) 的特定库来与 MySQL 服务器进行通信。这个连接器扮演着翻译官的角色,将 Python 代码中的数据库操作指令转换为 MySQL 服务器能够理解的协议和SQL语句,并将服务器返回的结果转换回 Python 的数据结构。

其基本交互流程如下:

加载连接器: Python 程序导入所需的 MySQL 连接器库。
建立连接 (Connection): Python 程序使用连接器提供的函数,传入必要的参数(如主机名、端口、用户名、密码、数据库名等)来与 MySQL 服务器建立一个网络连接。
创建游标 (Cursor): 一旦连接建立,程序会创建一个游标对象。游标可以看作是数据库操作的一个句柄,它允许你执行SQL语句并处理结果集。
执行 SQL 语句: 通过游标对象,程序可以执行各种 SQL 语句(SELECT, INSERT, UPDATE, DELETE, DDL语句等)。
处理结果:

对于 SELECT 查询,游标可以用来获取查询结果(一行、多行或所有行)。
对于 DML 语句(INSERT, UPDATE, DELETE),可以获取受影响的行数。

事务处理: 对于需要原子性的操作序列,程序可以通过连接对象来管理事务(提交 commit 或回滚 rollback)。
关闭游标和连接: 操作完成后,为了释放资源,程序需要关闭游标和数据库连接。

图片[1] - 【Python】Python接入MySQL数据库 - 宋马
(图片仅为示意,表示Python应用通过Connector与MySQL Server通信)

二、选择合适的 Python MySQL 连接器 (Choosing the Right Python MySQL Connector)

Python 社区提供了多个用于连接 MySQL 的库。选择哪个连接器取决于项目的具体需求,如性能要求、部署环境、Python 版本兼容性、特定特性需求以及个人偏好。

2.1 主流 Python MySQL 连接器

以下是几个最常用和推荐的连接器:

mysql-connector-python (官方连接器)

开发者: Oracle Corporation (MySQL 的母公司)
类型: 纯 Python 实现 (Pure Python)。这意味着它不依赖外部C库(除了可选的C扩展以提高性能),通常更容易安装在各种平台上。
特性:

完全实现了 Python DB-API 2.0 规范 (PEP 249)。
支持 MySQL 的所有最新特性,包括X Plugin (用于文档存储和异步操作)、新的认证插件 (如 caching_sha2_password)。
提供了连接池功能。
支持压缩、SSL连接。
可以不安装 MySQL客户端库。
可选的 C 扩展 (mysql-connector-python-cext) 可以提高性能,但会引入编译依赖。

许可: 通常是 GPLv2 或商业许可(取决于你如何获取和使用)。
安装: pip install mysql-connector-python

PyMySQL

开发者: PyMySQL 社区
类型: 纯 Python 实现。
特性:

几乎完全实现了 Python DB-API 2.0 规范。
轻量级,易于安装和部署,尤其在没有C编译器或受限环境中。
被许多第三方库(如 SQLAlchemy, Django – 作为备选驱动)良好支持。
支持 Python 3.x (Python 2.x 支持已在较旧版本中)。
支持 caching_sha2_password 认证 (需要 cryptography 库)。

许可: MIT License (非常宽松)。
安装: pip install PyMySQL

mysqlclient (MySQLdb 的现代分支)

开发者: mysqlclient 维护者 (基于 MySQLdb)
类型: C 扩展模块。它包装了 MySQL C 客户端库。
特性:

实现了 Python DB-API 2.0 规范。
性能通常被认为是最高的,因为它直接使用C库。
是 Django 框架默认推荐的 MySQL 连接器。
支持 Python 3.x (Python 2.x 支持通过旧版 MySQLdb)。
支持最新的 MySQL 特性(取决于链接的C客户端库版本)。

缺点:

安装可能更复杂: 需要系统上安装了 MySQL 开发库 (C headers and libraries) 和 C 编译器。在某些操作系统(尤其是Windows)上直接通过 pip 安装可能遇到编译问题,可能需要预编译的二进制包 (wheels)。

许可: GPLv2。
安装:

Linux: sudo apt-get install python3-dev default-libmysqlclient-dev build-essential (Debian/Ubuntu) 或 sudo yum install python3-devel mysql-devel gcc (CentOS/RHEL) 然后 pip install mysqlclient
macOS: brew install mysql-client (或 mysql) 然后 pip install mysqlclient
Windows: 通常建议从 Christoph Gohlke’s Unofficial Windows Binaries for Python Extension Packages 下载预编译的 .whl 文件安装,或者使用 pip install mysqlclient (如果环境中配置好了编译工具和库)。

2.2 连接器选择考量因素
特性/考量因素 mysql-connector-python (官方) PyMySQL mysqlclient 中文解释
实现类型 纯Python (可选C扩展) 纯Python C扩展 (包装C库) 纯Python易安装部署,C扩展通常性能更好但安装可能复杂。
性能 中等到高 (使用C扩展时) 中等 通常最高 C扩展因接近底层而快;纯Python实现会有Python解释器开销。
安装便捷性 高 (纯Python部分) 非常高 中等到低 (依赖C库和编译器) 纯Python库通常pip install即可;C扩展可能需要系统依赖。
Python 版本 Python 3.x (旧版支持2.x) Python 3.x (旧版支持2.x) Python 3.x (旧版MySQLdb支持2.x) 主流库都已转向Python 3。
DB-API 2.0 兼容性 完全兼容 几乎完全兼容 完全兼容 DB-API 2.0是Python数据库编程的标准接口。
MySQL 最新特性支持 优秀 (官方维护) 良好 (社区驱动,可能稍有延迟) 良好 (依赖C客户端库版本) 官方连接器通常最快支持MySQL新特性。
依赖 无 (纯Python核心) / C编译器(C扩展) 可选cryptography (用于新认证) MySQL C 开发库, C编译器 依赖越少,部署越简单。
连接池 内置 无内置 (需第三方库) 无内置 (需第三方库) 对于Web应用或高并发服务,连接池很重要。
许可 GPLv2 / 商业 MIT GPLv2 MIT许可最宽松,GPL对商业应用有特定要求。
社区与文档 良好 (官方) 良好 (活跃社区) 良好 (广泛使用) 都有不错的社区支持和文档资源。
Django/SQLAlchemy集成 支持 广泛支持 Django首选, SQLAlchemy支持 主流ORM和框架通常支持多种连接器。

企业级项目选择建议:

追求极致性能且能处理C扩展安装: mysqlclient 往往是首选,尤其在Django项目中。
要求纯Python实现、易于部署、许可宽松: PyMySQL 是一个非常好的选择,并且由于其轻量和纯粹性,在Serverless、容器化等环境中也受欢迎。
需要MySQL官方支持、最新特性、内置连接池且不排斥GPL: mysql-connector-python 是一个可靠的选择。如果性能是瓶颈,可以尝试其C扩展。
初学者或快速原型: PyMySQLmysql-connector-python (纯Python模式) 通常更容易上手。

在后续的示例中,我们将主要使用 mysql-connector-pythonPyMySQL 进行演示,因为它们代表了纯Python实现的便利性,并且功能全面。如果需要,也会提及 mysqlclient 的特定之处。

2.3 安装连接器

使用 pip (Python的包安装器) 来安装这些库。建议在虚拟环境 (virtual environment) 中安装项目依赖。

安装 mysql-connector-python:

pip install mysql-connector-python

或者,如果需要其C扩展以获得更好性能 (可能需要编译环境):

# pip install mysql-connector-python-cext # 通常这个包名不直接用
# 正确的方式是,mysql-connector-python 会尝试自动编译C扩展如果可用
# 如果你想要一个只包含C扩展的版本,你可能需要寻找特定的wheel或从源码编译
# 一般情况下,直接安装 mysql-connector-python 即可,它会按需处理

注意: Oracle 官方有时会提供名为 mysql-connector-python-rf (release candidate for features) 或其他后缀的包,通常应使用稳定版 mysql-connector-python

安装 PyMySQL:

pip install PyMySQL

如果需要支持 caching_sha2_password 认证方式(MySQL 8.0+ 默认),PyMySQL 可能需要 cryptography 库:

pip install PyMySQL[rsa]
# 或者
pip install PyMySQL cryptography

安装 mysqlclient:
确保你已经安装了必要的系统依赖 (如前文所述的C开发库和编译器)。

pip install mysqlclient

如果遇到编译错误,请根据错误信息查找特定于你操作系统的解决方案,或者寻找预编译的wheel包。

三、Python DB-API 2.0 核心规范 (Core Specifications of Python DB-API 2.0)

Python DB-API 2.0 (PEP 249) 定义了一个标准的数据库接口,大多数Python数据库连接器都遵循这个规范。理解它有助于你编写更具移植性的数据库访问代码(尽管不同连接器的某些扩展特性可能不同)。

3.1 规范的目标

一致性: 为不同的数据库提供一个统一的编程接口。
易用性: 提供简单直观的方法来连接数据库、执行SQL和获取结果。
功能性: 包含处理事务、错误、多种数据类型等基本数据库操作所需的功能。

3.2 关键组件和方法

全局变量:

apilevel: 字符串,表示API级别(通常是 “2.0”)。
threadsafety: 整数,表示线程安全级别 (0到3)。
paramstyle: 字符串,表示SQL参数占位符的风格(如 qmark, numeric, named, format, pyformat)。

connect() 函数 (模块级别):
用于建立到数据库的连接。它返回一个连接对象 (Connection Object)

参数通常包括:dsn (数据源名称字符串,可选),user, password, host, database, port 等。不同连接器支持的参数名和具体细节可能略有差异,但核心参数类似。

连接对象 (Connection Object):
代表与数据库的一个会话。

close(): 关闭数据库连接。
commit(): 提交当前事务中所有挂起的操作。
rollback(): 回滚当前事务中所有挂起的操作。
cursor([cursor_type]): 返回一个新的游标对象 (Cursor Object)。可以指定游标类型(如 buffered, dictionary cursor,具体支持依赖连接器)。
Error (异常类): 连接器定义的异常基类。
Warning, InterfaceError, DatabaseError, DataError, OperationalError, IntegrityError, InternalError, ProgrammingError, NotSupportedError (标准异常类,通常是 Connection.Error 的子类)。

游标对象 (Cursor Object):
用于执行SQL语句并管理结果集。

description: 只读属性。在执行 SELECT 查询后,它是一个包含7个元素的序列的序列,每个内部序列描述结果集中的一列(name, type_code, display_size, internal_size, precision, scale, null_ok)。
rowcount: 只读属性。对于 DML 语句(INSERT, UPDATE, DELETE),返回受影响的行数。对于 SELECT 语句,其行为未被严格规定,可能返回 -1 或实际行数(取决于连接器和游标类型,例如buffered cursor可能知道行数)。
callproc(procname [, parameters]): (可选) 调用存储过程。
close(): 关闭游标。
execute(operation [, parameters]): 执行一个数据库操作(SQL语句)。parameters 是一个可选的序列或映射,用于替换 operation 字符串中的参数占位符,以防止SQL注入。
executemany(operation, seq_of_parameters): 对参数序列中的每一项重复执行同一个数据库操作。通常用于批量插入或更新。
fetchone(): 获取查询结果集中的下一行,返回一个序列(通常是元组),或者在没有更多行时返回 None
fetchmany([size=cursor.arraysize]): 获取查询结果集中的下一批行,返回一个序列的序列(列表的列表或元组的列表)。size 参数指定要获取的行数,默认为 cursor.arraysize。如果剩余行数少于 size,则返回所有剩余行。如果没有更多行,返回空序列。
fetchall(): 获取查询结果集中的所有(剩余)行,返回一个序列的序列。
nextset(): (可选) 移动到下一个结果集(如果执行的操作返回多个结果集,例如某些存储过程)。
arraysize: 可读写属性。fetchmany() 默认获取的行数。默认为1。
setinputsizes(sizes): (可选) 声明输入参数的大小。
setoutputsize(size [, column]): (可选) 声明大列(如LOB)的缓冲区大小。

3.3 参数样式 (paramstyle)

DB-API 2.0 规范定义了多种参数化查询的占位符风格,连接器会声明它支持哪种。

qmark: 问号风格。例如:SELECT * FROM users WHERE id = ? AND name = ? 参数:(1, 'Alice')
numeric: 数字,位置风格。例如:SELECT * FROM users WHERE id = :1 AND name = :2 参数:(1, 'Alice')
named: 命名风格。例如:SELECT * FROM users WHERE id = :user_id AND name = :user_name 参数:{'user_id': 1, 'user_name': 'Alice'}
format: ANSI C printf 格式代码。例如:SELECT * FROM users WHERE id = %s AND name = %s 参数:(1, 'Alice')
pyformat: Python 扩展 printf 格式代码。例如:SELECT * FROM users WHERE id = %(user_id)s AND name = %(user_name)s 参数:{'user_id': 1, 'user_name': 'Alice'}

mysql-connector-pythonPyMySQL 通常支持 format (%s) 和 pyformat (%(name)s) 风格。mysqlclient 也类似。 使用 %s 是最常见和推荐的方式,因为它简单且不易出错。

理解 DB-API 2.0 是掌握 Python 数据库编程的基础。即使你将来使用 ORM(如 SQLAlchemy),了解其底层是如何通过 DB-API 与数据库交互也是非常有益的。

四、基本连接与查询操作 (Basic Connection and Query Operations)

现在,我们将通过具体的代码示例来演示如何使用 Python 连接到 MySQL 数据库并执行基本的CRUD(Create, Read, Update, Delete)操作。我们将首先以 mysql-connector-python 为例,然后展示 PyMySQL 的等效实现。

4.1 使用 mysql-connector-python
4.1.1 建立数据库连接

首先,你需要安装 mysql-connector-python

pip install mysql-connector-python

然后,在 Python 脚本中导入并使用它:

import mysql.connector # 导入 mysql.connector 模块
from mysql.connector import errorcode # 导入 errorcode 用于更具体的错误处理

def connect_to_mysql_server_only(db_host, db_user, db_password):
    """尝试只连接到MySQL服务器,不指定数据库。"""
    try:
        cnx = mysql.connector.connect( # 调用 connect 函数建立连接
            host=db_host, # MySQL 服务器主机名或IP地址
            user=db_user, # MySQL 用户名
            password=db_password # MySQL 用户密码
            # port=3306, # 可选,MySQL 服务器端口,默认为3306
            # connection_timeout=10 # 可选,连接超时时间(秒)
        )
        print(f"成功连接到 MySQL 服务器 {
              db_host} (版本: {
              cnx.get_server_info()})") # 打印服务器信息
        return cnx # 返回连接对象
    except mysql.connector.Error as err: # 捕获 mysql.connector 的错误
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: # 如果是访问被拒绝错误
            print(f"连接失败: 用户名或密码不正确 (错误码: {
              err.errno}, {
              err.msg})")
        elif err.errno == errorcode.ER_BAD_DB_ERROR: # 如果是数据库不存在错误 (这里不应发生,因为未指定数据库)
            print(f"连接失败: 数据库不存在 (错误码: {
              err.errno}, {
              err.msg})")
        else: # 其他 MySQL 连接错误
            print(f"连接 MySQL 服务器失败: {
              err} (错误码: {
              err.errno})")
        return None # 连接失败返回 None

def connect_to_mysql_database(db_host, db_user, db_password, db_name):
    """尝试连接到MySQL服务器上的特定数据库。"""
    try:
        cnx = mysql.connector.connect( # 调用 connect 函数建立连接
            host=db_host,
            user=db_user,
            password=db_password,
            database=db_name # 指定要连接的数据库名称
            # auth_plugin='mysql_native_password' # 如果遇到认证插件问题 (如caching_sha2_password),可尝试指定
        )
        print(f"成功连接到数据库 '{
              db_name}' 在服务器 {
              db_host}")
        return cnx
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print(f"连接失败: 用户名或密码不正确 (错误码: {
              err.errno}, {
              err.msg})")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print(f"连接失败: 数据库 '{
              db_name}' 不存在 (错误码: {
              err.errno}, {
              err.msg})")
        else:
            print(f"连接数据库 '{
              db_name}' 失败: {
              err} (错误码: {
              err.errno})")
        return None

# --- 主程序逻辑 ---
if __name__ == "__main__":
    # 请替换为你的MySQL服务器的实际连接信息
    DB_HOST = "localhost"  # 或者你的MySQL服务器IP
    DB_USER = "your_mysql_user" # 替换为你的MySQL用户名
    DB_PASSWORD = "your_mysql_password" # 替换为你的MySQL密码
    DB_NAME = "your_database_name" # 替换为你要连接的数据库名

    # 示例1: 只连接服务器
    server_connection = connect_to_mysql_server_only(DB_HOST, DB_USER, DB_PASSWORD)
    if server_connection:
        print(f"服务器连接对象: {
              server_connection}")
        server_connection.close() # 操作完成后关闭连接
        print("服务器连接已关闭。")
    print("-" * 30)

    # 示例2: 连接到特定数据库
    db_connection = connect_to_mysql_database(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)
    if db_connection:
        print(f"数据库连接对象: {
              db_connection}")
        # 在这里可以进行后续的数据库操作 (创建游标、执行SQL等)
        db_connection.close() # 操作完成后关闭连接
        print(f"数据库 '{
              DB_NAME}' 连接已关闭。")

    # 准备一个示例表用于后续操作
    # 为了运行后续示例,请确保你的MySQL中存在名为 your_database_name 的数据库,
    # 并且该用户 your_mysql_user 有权限在该数据库中创建表和执行DML。
    # 你可以使用 phpMyAdmin, MySQL Workbench, 或 MySQL命令行客户端来创建数据库。
    # CREATE DATABASE IF NOT EXISTS your_database_name;

代码解释:

import mysql.connectorfrom mysql.connector import errorcode: 导入必要的模块。errorcode 包含MySQL服务器返回的错误代码常量,便于进行更精确的错误判断。
mysql.connector.connect(...): 这是建立连接的核心函数。

host, user, password, database: 是最基本的连接参数。
其他可选参数如 port, connection_timeout, auth_plugin 等可以根据需要配置。例如,如果MySQL 8.0+ 使用 caching_sha2_password 认证,而你的连接器版本较旧或配置不当,可能需要显式指定 auth_plugin='mysql_native_password'(但这通常是针对旧客户端兼容旧服务器的情况,或反之)。

错误处理: 使用 try...except mysql.connector.Error as err: 来捕获连接过程中可能发生的各种MySQL错误。

err.errno: 错误码,可以与 errorcode 中的常量比较。
err.msg: 错误的文本描述。

cnx.get_server_info(): 获取MySQL服务器的版本信息。
cnx.close(): 非常重要。在完成所有数据库操作后,必须关闭连接以释放服务器和客户端的资源。未关闭的连接可能会导致资源泄露,最终耗尽服务器的最大连接数。

企业级连接配置考量:

配置文件/环境变量: 不应将数据库凭证(用户名、密码)硬编码在源代码中。应使用配置文件 (如 INI, YAML, JSON, .env 文件) 或环境变量来存储这些敏感信息,并在程序启动时读取。

# 示例:从环境变量读取 (更安全)
import os
# DB_USER = os.getenv("DB_USER_PROD")
# DB_PASSWORD = os.getenv("DB_PASS_PROD")

SSL/TLS 加密连接: 对于生产环境,特别是当Python应用和MySQL服务器不在同一台机器或安全网络中时,应使用SSL/TLS加密连接以保护数据传输安全。mysql-connector-python 支持SSL连接,需要在 connect() 函数中配置 ssl_ca, ssl_cert, ssl_key 等参数。

# config = {
              
#     'user': 'your_user',
#     'password': 'your_password',
#     'host': 'your_host',
#     'database': 'your_db',
#     'ssl_ca': '/path/to/ca.pem',       # CA证书路径
#     'ssl_cert': '/path/to/client-cert.pem', # 客户端证书路径 (如果需要双向SSL)
#     'ssl_key': '/path/to/client-key.pem',   # 客户端私钥路径 (如果需要双向SSL)
#     'ssl_verify_cert': True # (MySQL 8.0.27+/Connector 8.0.27+) 强制验证服务器证书,更安全
# }
# cnx = mysql.connector.connect(**config)

连接超时: 设置合理的 connection_timeout 可以防止程序在网络问题时无限期等待。
字符集: 确保Python连接器使用的字符集(通常默认为 utf8mb4 或可配置)与数据库/表的字符集匹配,以避免乱码问题。可以在 connect() 时指定 charset='utf8mb4'
时区: 处理日期时间数据时,要注意Python应用、MySQL服务器、连接器之间的时区设置,避免混淆。

4.1.2 创建游标对象

连接建立后,需要创建一个游标对象来执行SQL。

# (接上文 db_connection 成功建立后)
# if db_connection and db_connection.is_connected(): # 确保连接是活动的
#     try:
#         cursor = db_connection.cursor() # 创建一个默认类型的游标
#         print("默认游标已创建。")
#
#         # 不同类型的游标 (mysql-connector-python 支持)
#         buffered_cursor = db_connection.cursor(buffered=True) # 缓冲游标,获取所有结果到客户端内存
#         print("缓冲游标已创建。当结果集不大时,或需要在获取所有数据后释放服务器资源时有用。")
#
#         dictionary_cursor = db_connection.cursor(dictionary=True) # 字典游标,每行结果作为字典返回 (列名: 值)
#         print("字典游标已创建。方便按列名访问数据。")
#
#         named_tuple_cursor = db_connection.cursor(named_tuple=True) # 命名元组游标,每行结果作为命名元组返回
#         print("命名元组游标已创建。提供类似字典的属性访问,但更轻量。")
#
#     except mysql.connector.Error as err:
#         print(f"创建游标失败: {err}")
#     finally:
#         if 'cursor' in locals() and cursor: cursor.close()
#         if 'buffered_cursor' in locals() and buffered_cursor: buffered_cursor.close()
#         if 'dictionary_cursor' in locals() and dictionary_cursor: dictionary_cursor.close()
#         if 'named_tuple_cursor' in locals() and named_tuple_cursor: named_tuple_cursor.close()
#         if db_connection and db_connection.is_connected():
#             db_connection.close()
#             print("数据库连接已关闭。")

代码解释:

db_connection.cursor(): 返回一个标准的游标对象。默认情况下,对于 SELECT 查询,它可能是非缓冲的,意味着数据逐行从服务器获取。
buffered=True: 创建一个缓冲游标。它会在执行查询后立即将整个结果集从服务器下载到客户端内存中。

优点: 获取数据后,可以立即释放服务器端的资源(与该查询相关的锁或临时表)。可以多次遍历结果集(如果需要)。
缺点: 如果结果集非常大,会消耗大量客户端内存。

dictionary=True: 创建一个字典游标fetchone()fetchall() 等方法返回的每一行都是一个字典,键是列名,值是对应的数据。

# row = dictionary_cursor.fetchone()
# if row:
#     print(row['column_name'])

named_tuple=True: 创建一个命名元组游标。每一行都是一个命名元tuple,可以通过列名作为属性访问,也可以通过索引访问。

# from collections import namedtuple # 命名元组游标内部会使用类似机制
# row = named_tuple_cursor.fetchone()
# if row:
#     print(row.column_name)
#     print(row[0])

游标也需要关闭: cursor.close()。与连接类似,游标也占用资源,使用完毕后应关闭。使用 with 语句管理游标是更好的做法(后续会讲)。

企业级游标选择:

默认游标: 适用于大多数情况,特别是当结果集可能很大,或者只需要逐行处理时。
缓冲游标 (buffered=True): 当结果集确定不大,并且需要在获取数据后快速与服务器断开(或执行其他操作),或者需要多次迭代结果时使用。例如,获取少量配置数据。
字典游标 (dictionary=True): 提高代码可读性,因为可以直接通过列名访问数据,无需记住列的索引顺序。但相比元组,字典会有轻微的额外开销。
命名元组游标 (named_tuple=True): 兼具字典游标的可读性和元组的轻量级特性。

4.1.3 执行 SELECT 查询并获取结果
def create_sample_employees_table(connection):
    """在数据库中创建一个示例 employees 表 (如果不存在)"""
    try:
        cursor = connection.cursor() # 创建游标
        table_description = """
        CREATE TABLE IF NOT EXISTS employees (
            emp_no INT AUTO_INCREMENT PRIMARY KEY,
            first_name VARCHAR(50) NOT NULL,
            last_name VARCHAR(50) NOT NULL,
            hire_date DATE,
            salary DECIMAL(10, 2)
        ) ENGINE=InnoDB CHARACTER SET=utf8mb4
        """
        cursor.execute(table_description) # 执行建表语句
        print("表 'employees' 已检查/创建。")

        # 检查表是否为空,如果为空则插入一些示例数据
        cursor.execute("SELECT COUNT(*) FROM employees") # 查询员工数量
        if cursor.fetchone()[0] == 0: # 如果员工数量为0
            print("表 'employees' 为空,插入示例数据...")
            insert_query = """
            INSERT INTO employees (first_name, last_name, hire_date, salary) VALUES
            ('John', 'Doe', '2020-01-15', 60000.00),
            ('Jane', 'Smith', '2019-03-01', 75000.00),
            ('Alice', 'Johnson', '2021-07-22', 80000.00),
            ('Bob', 'Williams', '2018-06-10', 50000.00),
            ('Charlie', 'Brown', '2020-01-15', 62000.00)
            """
            cursor.execute(insert_query) # 执行插入语句
            connection.commit() # 提交事务 (DML操作后需要)
            print(f"{
              cursor.rowcount} 条示例数据已插入。")
        cursor.close() # 关闭游标
    except mysql.connector.Error as err:
        print(f"创建或填充 'employees' 表失败: {
              err}")

def query_all_employees(connection):
    """查询所有员工信息"""
    print("
--- 查询所有员工 ---")
    try:
        # 使用字典游标方便按列名访问
        cursor = connection.cursor(dictionary=True) # 创建字典游标
        query = "SELECT emp_no, first_name, last_name, hire_date, salary FROM employees"
        cursor.execute(query) # 执行 SELECT 查询

        print(f"查询找到 {
              cursor.rowcount} 行 (注意: rowcount 对SELECT的行为可能不一致,建议迭代判断)")

        # 打印列描述信息 (可选)
        print("列描述:")
        for col_desc in cursor.description: # 遍历列描述
            print(col_desc) # 打印每一列的描述 (name, type_code, etc.)

        print("
员工列表:")
        # 方式1: fetchall() 获取所有行
        # all_rows = cursor.fetchall()
        # if not all_rows:
        # print("没有找到员工数据。")
        # else:
        # for row in all_rows:
        # print(f"  ID: {row['emp_no']}, Name: {row['first_name']} {row['last_name']}, "
        #              f"Hired: {row['hire_date']}, Salary: {row['salary']}")

        # 方式2: 逐行获取 fetchone()
        # print("
员工列表 (逐行获取):")
        # row = cursor.fetchone()
        # if not row:
        # print("  没有找到员工数据。")
        # while row:
        # print(f"  ID: {row['emp_no']}, Name: {row['first_name']} {row['last_name']}, "
        #              f"Hired: {row['hire_date']}, Salary: {row['salary']}")
        #     row = cursor.fetchone()

        # 方式3: 直接迭代游标 (最Pythonic的方式)
        print("
员工列表 (迭代游标):")
        count = 0
        for row in cursor: # 直接迭代游标对象
            count += 1
            print(f"  ID: {
              row['emp_no']}, Name: {
              row['first_name']} {
              row['last_name']}, "
                  f"Hired: {
              row['hire_date']}, Salary: {
              row['salary']}")
        if count == 0:
            print("  没有找到员工数据。")

    except mysql.connector.Error as err:
        print(f"查询员工失败: {
              err}")
    finally:
        if 'cursor' in locals() and cursor: # 检查cursor是否已定义且非None
            cursor.close() # 确保游标被关闭

def query_employees_with_condition(connection, min_salary):
    """查询薪水高于指定值的员工 (参数化查询)"""
    print(f"
--- 查询薪水高于 {
              min_salary} 的员工 ---")
    try:
        cursor = connection.cursor(dictionary=True)
        # 参数化查询: 使用 %s 作为占位符
        query = """
        SELECT emp_no, first_name, last_name, salary
        FROM employees
        WHERE salary > %s
        ORDER BY salary DESC
        """
        cursor.execute(query, (min_salary,)) # 第二个参数是元组,包含要替换占位符的值

        print("符合条件的员工:")
        fetched_rows = cursor.fetchall() # 获取所有结果
        if not fetched_rows:
            print(f"  没有找到薪水高于 {
              min_salary} 的员工。")
        else:
            for row in fetched_rows:
                print(f"  ID: {
              row['emp_no']}, Name: {
              row['first_name']} {
              row['last_name']}, Salary: {
              row['salary']}")

    except mysql.connector.Error as err:
        print(f"条件查询员工失败: {
              err}")
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()

# --- 主程序逻辑 (续) ---
if __name__ == "__main__":
    # (前面的连接代码)
    DB_HOST = "localhost"
    DB_USER = "your_mysql_user"
    DB_PASSWORD = "your_mysql_password"
    DB_NAME = "your_database_name"

    db_connection = connect_to_mysql_database(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)

    if db_connection and db_connection.is_connected():
        create_sample_employees_table(db_connection) # 创建/准备示例表

        query_all_employees(db_connection) # 执行查询所有员工
        query_employees_with_condition(db_connection, 60000.00) # 执行条件查询

        db_connection.close() # 完成所有操作后关闭连接
        print(f"
数据库 '{
              DB_NAME}' 连接已彻底关闭。")

代码解释:

create_sample_employees_table(): 一个辅助函数,用于创建示例表并填充数据,确保后续查询可以执行。

CREATE TABLE IF NOT EXISTS ...: 避免表已存在时报错。
cursor.execute("SELECT COUNT(*)...")cursor.fetchone()[0]: 用于检查表是否为空。fetchone() 返回一行,如果表为空或查询无结果,它返回 None;否则返回一个元组(对于默认游标)或字典(对于字典游标)。这里假设默认游标,所以用 [0] 取第一个元素(即COUNT(*)的值)。
connection.commit(): 对于 INSERT, UPDATE, DELETE 等修改数据的操作,执行后必须调用 connection.commit() 才能将更改永久保存到数据库。 否则,在连接关闭或事务回滚时,这些更改会丢失。MySQL 默认可能是自动提交模式(autocommit=True),但这取决于服务器和连接器配置。显式提交是良好实践。

query_all_employees():

cursor = connection.cursor(dictionary=True): 使用字典游标。
cursor.execute(query): 执行 SELECT 语句。
cursor.description: 打印列信息。每个元组包含 (name, type_code, display_size, internal_size, precision, scale, null_ok)。
获取结果的方式:

cursor.fetchall(): 一次性获取所有行,返回一个列表,列表中的每个元素是代表一行的字典(因为是字典游标)。适合结果集不大时。
cursor.fetchone(): 每次获取一行,直到返回 None。适合逐行处理,或结果集非常大时,避免一次性加载到内存。
直接迭代游标 (for row in cursor:): 这是最 Pythonic 且通常最高效的方式来处理多行结果。它在内部可能类似于 while True: row = cursor.fetchone(); if not row: break; ...

query_employees_with_condition(): 演示了参数化查询 (Parameterized Query)

query = "SELECT ... WHERE salary > %s ...": SQL 语句中使用 %s 作为占位符。
cursor.execute(query, (min_salary,)): execute 方法的第二个参数是一个元组(或列表),其元素会按顺序替换SQL语句中的 %s 占位符。这是防止 SQL 注入攻击的正确方法。 绝不要使用Python的字符串格式化(如 f-string 或 % 操作符)将变量直接拼接到SQL语句中。

即使只有一个参数,也要传入一个单元素的元组,例如 (value,)

finally 块: 确保无论 try 块中是否发生异常,游标 (cursor.close()) 和连接 (connection.close()) 都会被关闭。这是非常重要的资源管理实践。

4.1.4 执行 INSERT, UPDATE, DELETE 操作 (DML)
def insert_new_employee(connection, first_name, last_name, hire_date, salary):
    """插入一个新员工记录 (参数化)"""
    print(f"
--- 插入新员工: {
              first_name} {
              last_name} ---")
    new_emp_no = None
    try:
        cursor = connection.cursor()
        insert_query = """
        INSERT INTO employees (first_name, last_name, hire_date, salary)
        VALUES (%s, %s, %s, %s)
        """
        employee_data = (first_name, last_name, hire_date, salary) # 准备要插入的数据元组
        cursor.execute(insert_query, employee_data) # 执行插入语句,传入数据

        connection.commit() # !! 提交事务以保存更改 !!
        new_emp_no = cursor.lastrowid # 获取最后插入行的 AUTO_INCREMENT ID (如果主键是自增的)
        print(f"员工 '{
              first_name} {
              last_name}' 成功插入。受影响行数: {
              cursor.rowcount}, 新员工ID: {
              new_emp_no}")
    except mysql.connector.Error as err:
        print(f"插入员工失败: {
              err}")
        if connection.is_connected(): # 如果连接仍然有效,可以选择回滚
            print("尝试回滚事务...")
            connection.rollback() # 回滚未提交的更改
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()
    return new_emp_no # 返回新员工的ID

def update_employee_salary(connection, emp_no, new_salary):
    """更新指定员工的薪水 (参数化)"""
    print(f"
--- 更新员工 ID {
              emp_no} 的薪水为 {
              new_salary} ---")
    try:
        cursor = connection.cursor()
        update_query = "UPDATE employees SET salary = %s WHERE emp_no = %s"
        cursor.execute(update_query, (new_salary, emp_no)) # 执行更新语句,传入参数

        connection.commit() # 提交事务
        if cursor.rowcount > 0:
            print(f"员工 ID {
              emp_no} 的薪水已成功更新。受影响行数: {
              cursor.rowcount}")
        else:
            print(f"未找到员工 ID {
              emp_no},或薪水未改变。受影响行数: {
              cursor.rowcount}")
    except mysql.connector.Error as err:
        print(f"更新员工薪水失败: {
              err}")
        if connection.is_connected(): connection.rollback()
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()

def delete_employee(connection, emp_no):
    """删除指定员工 (参数化)"""
    print(f"
--- 删除员工 ID {
              emp_no} ---")
    try:
        cursor = connection.cursor()
        delete_query = "DELETE FROM employees WHERE emp_no = %s"
        cursor.execute(delete_query, (emp_no,)) # 执行删除语句,传入参数

        connection.commit() # 提交事务
        if cursor.rowcount > 0:
            print(f"员工 ID {
              emp_no} 已成功删除。受影响行数: {
              cursor.rowcount}")
        else:
            print(f"未找到员工 ID {
              emp_no} 进行删除。受影响行数: {
              cursor.rowcount}")
    except mysql.connector.Error as err:
        print(f"删除员工失败: {
              err}")
        if connection.is_connected(): connection.rollback()
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()

# --- 主程序逻辑 (续) ---
if __name__ == "__main__":
    # (前面的连接和建表示例代码)
    DB_HOST = "localhost"
    DB_USER = "your_mysql_user"
    DB_PASSWORD = "your_mysql_password"
    DB_NAME = "your_database_name"

    db_connection = connect_to_mysql_database(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)

    if db_connection and db_connection.is_connected():
        create_sample_employees_table(db_connection) # 确保表存在

        # 执行DML操作
        new_id = insert_new_employee(db_connection, "Peter", "Jones", "2023-01-10", 95000.00)
        if new_id:
            update_employee_salary(db_connection, new_id, 98000.00)
            # query_all_employees(db_connection) # 可以取消注释查看更新后的结果
            delete_employee(db_connection, new_id)
            # query_all_employees(db_connection) # 可以取消注释查看删除后的结果
        else:
            print("由于插入失败,后续的更新和删除操作未执行。")

        # 演示插入一个不存在的ID的更新和删除
        update_employee_salary(db_connection, 9999, 100000.00) # 更新不存在的员工
        delete_employee(db_connection, 9999) # 删除不存在的员工

        db_connection.close()
        print(f"
数据库 '{
              DB_NAME}' DML操作演示完成,连接已关闭。")

代码解释:

参数化查询: 所有DML语句 (INSERT, UPDATE, DELETE) 都使用了参数化查询 (%s 占位符和 cursor.execute(query, params)),这是防止SQL注入的最佳实践。
connection.commit(): 在执行任何会修改数据的SQL语句后,必须调用 connection.commit() 来使更改生效。否则,当连接关闭或程序结束时,这些更改可能会丢失(取决于MySQL服务器的autocommit设置和连接器的默认行为,但显式提交总是更安全)。
connection.rollback(): 在 except 块中,如果发生错误,可以调用 connection.rollback() 来撤销当前事务中尚未提交的所有更改。这有助于保持数据的一致性。
cursor.lastrowid: 对于 INSERT 操作,如果插入的表有一个 AUTO_INCREMENT 主键,cursor.lastrowid 属性会返回最后插入行的ID。这对于获取新生成的记录ID非常有用。注意:如果一次 INSERT 插入了多行,或者主键不是自增的,其行为可能不同或不可靠。
cursor.rowcount:

对于 INSERT, UPDATE, DELETErowcount 返回受这些操作影响的行数。
对于 SELECT,其行为在DB-API 2.0中没有严格定义,可能返回-1,或者在某些情况下(如使用缓冲游标)返回获取到的行数。不应完全依赖它来判断 SELECT 是否有结果,更好的方式是检查 fetchone()/fetchall() 的返回值。

4.1.5 使用 with 语句管理连接和游标 (Context Managers)

Python 的 with 语句提供了一种更优雅、更安全的方式来管理资源(如文件句柄、网络连接、数据库连接和游标),它可以确保资源在使用完毕后即使发生异常也能被正确释放。

mysql-connector-python 的连接对象和游标对象都支持上下文管理协议。

def query_with_context_manager(db_config):
    """使用 with 语句管理连接和游标"""
    print("
--- 使用 with 语句查询员工 (薪水 > 70000) ---")
    query = """
    SELECT emp_no, first_name, last_name, salary
    FROM employees
    WHERE salary > %s
    ORDER BY first_name
    """
    min_salary = 70000.00
    results = [] # 用于存储查询结果

    try:
        # 'with' 语句确保连接在使用后自动关闭,即使发生错误
        with mysql.connector.connect(**db_config) as connection: # db_config 是一个包含连接参数的字典
            print("数据库连接已通过 'with' 语句打开。")
            # 'with' 语句确保游标在使用后自动关闭
            with connection.cursor(dictionary=True) as cursor:
                print("游标已通过 'with' 语句创建。")
                cursor.execute(query, (min_salary,))
                for row in cursor:
                    results.append(row)
                    print(f"  Fetched: ID={
              row['emp_no']}, Name={
              row['first_name']}, Salary={
              row['salary']}")
            print("游标已通过 'with' 语句自动关闭。")
        print("数据库连接已通过 'with' 语句自动关闭。")

        if not results:
            print(f"  没有找到薪水高于 {
              min_salary} 的员工。")

    except mysql.connector.Error as err:
        print(f"使用 'with' 语句查询失败: {
              err}")
    # 注意: connection.commit() 仍然需要显式调用,'with' 语句不自动提交事务。
    # 如果这里有DML操作,且未在with块内commit,则不会保存。

# --- 主程序逻辑 (续) ---
if __name__ == "__main__":
    DB_CONFIG = {
             # 将连接参数组织成字典
        'host': "localhost",
        'user': "your_mysql_user",
        'password': "your_mysql_password",
        'database': "your_database_name",
        'charset': 'utf8mb4' # 明确指定字符集是一个好习惯
    }
    # 假设 employees 表已存在且有数据
    # create_sample_employees_table(mysql.connector.connect(**DB_CONFIG)) # 如果需要确保表存在

    query_with_context_manager(DB_CONFIG)

代码解释:

with mysql.connector.connect(**db_config) as connection::

当进入 with 块时,mysql.connector.connect() 被调用并返回一个连接对象,赋值给 connection
当退出 with 块时(无论是正常结束还是发生异常),连接对象的 __exit__ 方法会被自动调用,该方法会负责关闭连接 (connection.close())。

with connection.cursor(dictionary=True) as cursor::

类似地,游标对象也实现了上下文管理协议。当退出内部的 with 块时,游标会自动关闭 (cursor.close())。

事务管理: with 语句本身不负责自动提交或回滚事务。对于DML操作,你仍然需要在 with connection ... 块内部显式调用 connection.commit()connection.rollback()。如果 with 块因为异常而退出,且你没有 commit,那么未提交的更改(如果事务是活动的)通常会被数据库服务器隐式回滚(当连接断开时),或者你需要更细致地在 except 块中处理 rollback

企业级实践: 强烈推荐使用 with 语句来管理数据库连接和游标,因为它可以减少因忘记关闭资源而导致的资源泄露风险,使代码更整洁、更健壮。

四、基本连接与查询操作

4.2 使用 PyMySQL

PyMySQL 是一个非常受欢迎的纯Python MySQL驱动程序,以其易用性、轻量级和良好的社区支持而闻名。它的API在很大程度上遵循Python DB-API 2.0规范,因此很多操作与 mysql-connector-python 类似,但在某些细节和特定功能上会有所不同。

4.2.1 安装 PyMySQL

首先,确保你已经安装了 PyMySQL。如果需要支持 MySQL 8.0+ 的 caching_sha2_password 认证,建议同时安装 cryptography

pip install PyMySQL
# 或者为了更好的认证支持 (推荐):
pip install PyMySQL[rsa]
# 或者分开安装:
# pip install PyMySQL
# pip install cryptography
4.2.2 建立数据库连接

PyMySQL 的连接方式与 mysql-connector-python 类似,但函数名和一些参数可能略有不同。

import pymysql # 导入 pymysql 模块
import pymysql.cursors # 导入 pymysql.cursors 用于指定游标类型,例如字典游标
import os # 用于从环境变量读取敏感信息 (推荐做法)

def pymysql_connect_to_db(db_host, db_user, db_password, db_name, db_port=3306, db_charset='utf8mb4'):
    """使用 PyMySQL 连接到指定的数据库。"""
    try:
        connection = pymysql.connect( # 调用 pymysql.connect() 函数建立连接
            host=db_host, # MySQL 服务器主机名或IP
            user=db_user, # MySQL 用户名
            password=db_password, # MySQL 密码
            database=db_name, # 要连接的数据库名
            port=db_port, # MySQL 端口,默认为3306
            charset=db_charset, # 指定连接字符集,推荐 utf8mb4 以支持各种字符包括emoji
            connect_timeout=10, # 连接超时时间(秒)
            # cursorclass=pymysql.cursors.DictCursor # 可以在连接时全局指定默认游标类型
            # ssl={'ca': '/path/to/ca.pem'} # SSL连接配置示例,需要提供正确的证书路径
        )
        print(f"PyMySQL: 成功连接到数据库 '{
              db_name}' 在服务器 {
              db_host}:{
              db_port}")
        # PyMySQL 连接对象没有直接的 get_server_info() 方法,可以通过查询获取
        with connection.cursor() as cursor: # 使用 with 管理游标
            cursor.execute("SELECT VERSION()") # 执行查询服务器版本的SQL
            server_version = cursor.fetchone() # 获取查询结果
            if server_version:
                print(f"PyMySQL: MySQL 服务器版本: {
              server_version[0]}") # server_version 是一个元组
        return connection # 返回连接对象
    except pymysql.Error as err: # 捕获 pymysql 的错误
        # PyMySQL 的错误代码和消息格式可能与 mysql.connector 不同
        # err.args[0] 通常是错误码, err.args[1] 是错误消息
        error_code = err.args[0] if len(err.args) > 0 else "N/A"
        error_message = err.args[1] if len(err.args) > 1 else str(err)
        print(f"PyMySQL: 连接数据库 '{
              db_name}' 失败。错误码: {
              error_code}, 消息: {
              error_message}")
        return None # 连接失败返回 None

# --- 主程序逻辑 (PyMySQL) ---
if __name__ == "__main__":
    # 从环境变量读取配置,或者直接在此处设置 (不推荐用于生产)
    DB_HOST_PY = os.getenv("PYMYSQL_DB_HOST", "localhost")
    DB_USER_PY = os.getenv("PYMYSQL_DB_USER", "your_mysql_user")
    DB_PASSWORD_PY = os.getenv("PYMYSQL_DB_PASSWORD", "your_mysql_password")
    DB_NAME_PY = os.getenv("PYMYSQL_DB_NAME", "your_database_name")
    DB_PORT_PY = int(os.getenv("PYMYSQL_DB_PORT", 3306)) # 端口通常是整数

    print("--- PyMySQL 连接测试 ---")
    pymysql_conn = pymysql_connect_to_db(DB_HOST_PY, DB_USER_PY, DB_PASSWORD_PY, DB_NAME_PY, DB_PORT_PY)

    if pymysql_conn:
        print(f"PyMySQL: 连接对象: {
              pymysql_conn}")
        # 在这里进行后续操作
        pymysql_conn.close() # 操作完成后关闭连接
        print(f"PyMySQL: 数据库 '{
              DB_NAME_PY}' 连接已关闭。")

代码解释:

import pymysqlimport pymysql.cursors: 导入必要的 PyMySQL 模块。pymysql.cursors 包含了不同类型的游标类,如 DictCursor
pymysql.connect(...): 用于建立连接。

核心参数 host, user, password, database, portmysql-connector-python 类似。
charset='utf8mb4': 显式指定连接字符集,这是一个非常好的实践,可以避免很多潜在的字符编码问题。
connect_timeout: 连接超时设置。
cursorclass=pymysql.cursors.DictCursor: 这是一个非常有用的选项。如果在连接时设置了 cursorclass,那么通过此连接创建的所有游标默认都会是指定的类型(这里是字典游标),无需在每次 connection.cursor() 时再指定。
ssl: 用于配置SSL连接,传入一个字典包含CA证书、客户端证书/密钥等路径。

获取服务器版本: PyMySQL 连接对象没有像 mysql-connector-python 那样的 get_server_info() 方法。通常需要执行 SELECT VERSION() SQL查询来获取。
错误处理: PyMySQL 的错误对象 err 通常通过 err.args 属性来访问错误码和消息。err.args[0] 是错误码,err.args[1] 是错误消息。这与 mysql-connector-pythonerr.errnoerr.msg 属性不同。
环境变量: 示例中演示了从环境变量读取数据库凭证,这是比硬编码更安全的方式。
connection.close(): 同样,操作完成后必须关闭连接。

4.2.3 创建游标对象 (PyMySQL)

PyMySQL 的游标创建与DB-API规范一致。

# (接上文 pymysql_conn 成功建立后)
# if pymysql_conn:
#     try:
#         # 方式1: 创建默认游标 (返回元组)
#         default_cursor_py = pymysql_conn.cursor()
#         print("PyMySQL: 默认游标已创建 (返回元组)。")
#         default_cursor_py.close()
#
#         # 方式2: 创建字典游标
#         dict_cursor_py = pymysql_conn.cursor(pymysql.cursors.DictCursor) # 传入游标类
#         print("PyMySQL: 字典游标已创建 (返回字典)。")
#         dict_cursor_py.close()
#
#         # 方式3: 如果连接时已指定 cursorclass=pymysql.cursors.DictCursor
#         # dict_cursor_via_class = pymysql_conn.cursor() # 这将直接是一个字典游标
#         # print("PyMySQL: 通过连接时指定的cursorclass创建的字典游标。")
#         # dict_cursor_via_class.close()
#
#         # PyMySQL 还支持其他游标类型,如 SSCursor, SSDictCursor (服务器端游标,用于处理非常大的结果集)
#         # ss_cursor_py = pymysql_conn.cursor(pymysql.cursors.SSCursor)
#         # print("PyMySQL: 服务器端流式游标 (SSCursor) 已创建。")
#         # ss_cursor_py.close()
#
#     except pymysql.Error as err:
#         print(f"PyMySQL: 创建游标失败: {err.args[1] if len(err.args) > 1 else err}")
#     finally:
#         if pymysql_conn:
#             pymysql_conn.close()

代码解释:

pymysql_conn.cursor(): 创建一个默认游标,执行查询后返回的每一行是元组。
pymysql_conn.cursor(pymysql.cursors.DictCursor): 创建一个字典游标。DictCursorpymysql.cursors 模块中的一个类。
服务器端游标 (SSCursor, SSDictCursor):

SSCursor: “Server-Side Cursor” 的缩写。这是一个非缓冲的游标,它不会一次性将所有结果都下载到客户端。相反,数据在迭代时逐行(或按 arraysize 批量)从服务器流式传输到客户端。
SSDictCursor: 功能同 SSCursor,但返回的是字典。
适用场景: 处理可能非常巨大的结果集,无法一次性装入客户端内存。
注意事项: 当使用服务器端游标时,在迭代完所有结果或显式关闭游标之前,连接通常会保持繁忙状态,可能无法用于执行其他查询(除非MySQL服务器和协议版本支持MARS – Multiple Active Result Sets,但这不是PyMySQL服务器端游标的典型用法)。

4.2.4 执行 SELECT 查询并获取结果 (PyMySQL)

查询和获取结果的API与 mysql-connector-python 非常相似,因为它们都遵循DB-API 2.0。

# 使用之前定义的 create_sample_employees_table 函数来准备数据
# (确保已替换 pymysql_connect_to_db 中的连接参数,并且数据库和用户有效)

def pymysql_query_all_employees(connection):
    """使用 PyMySQL 查询所有员工信息"""
    print("
--- PyMySQL: 查询所有员工 ---")
    try:
        # 使用 with 语句管理游标,自动关闭
        with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 创建字典游标
            query = "SELECT emp_no, first_name, last_name, hire_date, salary FROM employees"
            cursor.execute(query) # 执行查询
            print(f"PyMySQL: 查询执行完毕。受影响行数 (cursor.rowcount): {
              cursor.rowcount}") # rowcount对SELECT行为可能不一致

            print("PyMySQL: 列描述:")
            if cursor.description: # 确保description不为None
                for col_desc in cursor.description:
                    print(col_desc) # (name, type_code, display_size, internal_size, precision, scale, null_ok)

            print("
PyMySQL: 员工列表 (迭代游标):")
            count = 0
            for row in cursor: # 直接迭代字典游标
                count += 1
                print(f"  ID: {
              row['emp_no']}, Name: {
              row['first_name']} {
              row['last_name']}, "
                      f"Hired: {
              row['hire_date']}, Salary: {
              row['salary']}")
            if count == 0:
                print("  PyMySQL: 没有找到员工数据。")
    except pymysql.Error as err:
        print(f"PyMySQL: 查询员工失败: {
              err.args[1] if len(err.args) > 1 else err}")

def pymysql_query_employees_with_condition(connection, min_salary):
    """使用 PyMySQL 查询薪水高于指定值的员工 (参数化)"""
    print(f"
--- PyMySQL: 查询薪水高于 {
              min_salary} 的员工 ---")
    try:
        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
            query = """
            SELECT emp_no, first_name, last_name, salary
            FROM employees
            WHERE salary > %s
            ORDER BY salary DESC
            """
            # PyMySQL 使用 %s 作为占位符,参数以元组形式传递
            cursor.execute(query, (min_salary,)) # 执行参数化查询

            print("PyMySQL: 符合条件的员工:")
            fetched_rows = cursor.fetchall() # 获取所有结果
            if not fetched_rows:
                print(f"  PyMySQL: 没有找到薪水高于 {
              min_salary} 的员工。")
            else:
                for row in fetched_rows:
                    print(f"  ID: {
              row['emp_no']}, Name: {
              row['first_name']} {
              row['last_name']}, Salary: {
              row['salary']}")
    except pymysql.Error as err:
        print(f"PyMySQL: 条件查询员工失败: {
              err.args[1] if len(err.args) > 1 else err}")

# --- 主程序逻辑 (PyMySQL SELECT) ---
if __name__ == "__main__":
    # (前面的 PyMySQL 连接参数设置)
    DB_HOST_PY = os.getenv("PYMYSQL_DB_HOST", "localhost")
    DB_USER_PY = os.getenv("PYMYSQL_DB_USER", "your_mysql_user")
    DB_PASSWORD_PY = os.getenv("PYMYSQL_DB_PASSWORD", "your_mysql_password")
    DB_NAME_PY = os.getenv("PYMYSQL_DB_NAME", "your_database_name")
    DB_PORT_PY = int(os.getenv("PYMYSQL_DB_PORT", 3306))

    pymysql_conn = pymysql_connect_to_db(DB_HOST_PY, DB_USER_PY, DB_PASSWORD_PY, DB_NAME_PY, DB_PORT_PY)

    if pymysql_conn:
        # 确保示例表存在并有数据 (可以使用之前为 mysql-connector-python 编写的建表函数,
        # 但需要传入 pymysql_conn 对象,并且该函数内部的 commit/rollback 也需要是 pymysql_conn 的方法)
        # 为保持独立性,这里假设表已通过其他方式创建或通过mysql-connector-python示例创建。
        # 或者我们也可以写一个 PyMySQL 版本的建表函数:
        def pymysql_create_sample_employees_table(connection):
            try:
                with connection.cursor() as cursor:
                    table_description = """
                    CREATE TABLE IF NOT EXISTS employees (
                        emp_no INT AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(50) NOT NULL,
                        last_name VARCHAR(50) NOT NULL, hire_date DATE, salary DECIMAL(10, 2)
                    ) ENGINE=InnoDB CHARACTER SET=utf8mb4
                    """
                    cursor.execute(table_description)
                    cursor.execute("SELECT COUNT(*) FROM employees")
                    if cursor.fetchone()[0] == 0:
                        insert_query = """
                        INSERT INTO employees (first_name, last_name, hire_date, salary) VALUES
                        ('John', 'Doe', '2020-01-15', 60000.00), ('Jane', 'Smith', '2019-03-01', 75000.00),
                        ('Alice', 'Johnson', '2021-07-22', 80000.00), ('Bob', 'Williams', '2018-06-10', 50000.00),
                        ('Charlie', 'Brown', '2020-01-15', 62000.00)
                        """
                        cursor.execute(insert_query)
                        connection.commit() # PyMySQL 也需要 commit DML
                        print(f"PyMySQL: 示例数据已插入 (受影响行数: {
              cursor.rowcount})。")
                    else:
                        print("PyMySQL: 表 'employees' 已存在数据,未重新插入。")
            except pymysql.Error as err:
                print(f"PyMySQL: 创建或填充 'employees' 表失败: {
              err.args[1] if len(err.args) > 1 else err}")

        pymysql_create_sample_employees_table(pymysql_conn)

        pymysql_query_all_employees(pymysql_conn)
        pymysql_query_employees_with_condition(pymysql_conn, 65000.00)

        pymysql_conn.close()
        print(f"
PyMySQL: 数据库 '{
              DB_NAME_PY}' SELECT演示完成,连接已关闭。")

代码解释:

with connection.cursor(...) as cursor:: 同样,使用 with 语句管理游标是推荐的做法,确保游标在使用完毕后自动关闭。
参数化查询: PyMySQL 也使用 %s 作为参数占位符,并通过 cursor.execute(query, params_tuple) 的方式传递参数,有效防止SQL注入。
cursor.description: 与 mysql-connector-python 一样,提供结果集的列信息。
cursor.fetchall(), cursor.fetchone(), 直接迭代游标:行为与DB-API 2.0规范一致,与 mysql-connector-python 的使用方式相同。
cursor.rowcount: 对于 SELECT 语句,PyMySQLrowcount 通常返回匹配 WHERE 子句的行数(如果是非缓冲游标,可能在所有行被读取前是-1或不准确;如果是缓冲游标,会是准确的)。尽管如此,依赖迭代或 fetchall() 的结果来判断是否有数据是更可靠的做法。

4.2.5 执行 INSERT, UPDATE, DELETE 操作 (DML with PyMySQL)

DML 操作在 PyMySQL 中的处理方式与 mysql-connector-python 非常相似,关键在于参数化查询和事务提交。

def pymysql_insert_new_employee(connection, first_name, last_name, hire_date, salary):
    """使用 PyMySQL 插入一个新员工记录 (参数化)"""
    print(f"
--- PyMySQL: 插入新员工: {
              first_name} {
              last_name} ---")
    new_emp_no_py = None
    try:
        with connection.cursor() as cursor: # 使用 with 管理游标
            insert_query = """
            INSERT INTO employees (first_name, last_name, hire_date, salary)
            VALUES (%s, %s, %s, %s)
            """
            employee_data = (first_name, last_name, hire_date, salary)
            cursor.execute(insert_query, employee_data)

            # PyMySQL 连接对象默认 autocommit 可能是 False (取决于服务器和连接配置)
            # 因此,显式 commit 是必须的
            connection.commit() # !! 提交事务以保存更改 !!

            new_emp_no_py = cursor.lastrowid # 获取最后插入行的 AUTO_INCREMENT ID
            print(f"PyMySQL: 员工 '{
              first_name} {
              last_name}' 成功插入。受影响行数: {
              cursor.rowcount}, 新员工ID: {
              new_emp_no_py}")
    except pymysql.Error as err:
        error_msg = err.args[1] if len(err.args) > 1 else str(err)
        print(f"PyMySQL: 插入员工失败: {
              error_msg}")
        try:
            if connection: # 检查连接对象是否存在且可能仍然可用
                print("PyMySQL: 尝试回滚事务...")
                connection.rollback() # 回滚未提交的更改
        except pymysql.Error as rb_err: # 回滚本身也可能失败
            print(f"PyMySQL: 回滚事务失败: {
              rb_err.args[1] if len(rb_err.args) > 1 else rb_err}")
    return new_emp_no_py

def pymysql_update_employee_salary(connection, emp_no, new_salary):
    """使用 PyMySQL 更新指定员工的薪水 (参数化)"""
    print(f"
--- PyMySQL: 更新员工 ID {
              emp_no} 的薪水为 {
              new_salary} ---")
    try:
        with connection.cursor() as cursor:
            update_query = "UPDATE employees SET salary = %s WHERE emp_no = %s"
            cursor.execute(update_query, (new_salary, emp_no))
            connection.commit()
            if cursor.rowcount > 0:
                print(f"PyMySQL: 员工 ID {
              emp_no} 的薪水已成功更新。受影响行数: {
              cursor.rowcount}")
            else:
                print(f"PyMySQL: 未找到员工 ID {
              emp_no},或薪水未改变。受影响行数: {
              cursor.rowcount}")
    except pymysql.Error as err:
        error_msg = err.args[1] if len(err.args) > 1 else str(err)
        print(f"PyMySQL: 更新员工薪水失败: {
              error_msg}")
        try:
            if connection: connection.rollback()
        except pymysql.Error as rb_err:
            print(f"PyMySQL: 回滚事务失败: {
              rb_err.args[1] if len(rb_err.args) > 1 else rb_err}")


def pymysql_delete_employee(connection, emp_no):
    """使用 PyMySQL 删除指定员工 (参数化)"""
    print(f"
--- PyMySQL: 删除员工 ID {
              emp_no} ---")
    try:
        with connection.cursor() as cursor:
            delete_query = "DELETE FROM employees WHERE emp_no = %s"
            cursor.execute(delete_query, (emp_no,))
            connection.commit()
            if cursor.rowcount > 0:
                print(f"PyMySQL: 员工 ID {
              emp_no} 已成功删除。受影响行数: {
              cursor.rowcount}")
            else:
                print(f"PyMySQL: 未找到员工 ID {
              emp_no} 进行删除。受影响行数: {
              cursor.rowcount}")
    except pymysql.Error as err:
        error_msg = err.args[1] if len(err.args) > 1 else str(err)
        print(f"PyMySQL: 删除员工失败: {
              error_msg}")
        try:
            if connection: connection.rollback()
        except pymysql.Error as rb_err:
            print(f"PyMySQL: 回滚事务失败: {
              rb_err.args[1] if len(rb_err.args) > 1 else rb_err}")

# --- 主程序逻辑 (PyMySQL DML) ---
if __name__ == "__main__":
    # (前面的 PyMySQL 连接参数设置)
    DB_HOST_PY = os.getenv("PYMYSQL_DB_HOST", "localhost")
    DB_USER_PY = os.getenv("PYMYSQL_DB_USER", "your_mysql_user")
    DB_PASSWORD_PY = os.getenv("PYMYSQL_DB_PASSWORD", "your_mysql_password")
    DB_NAME_PY = os.getenv("PYMYSQL_DB_NAME", "your_database_name")
    DB_PORT_PY = int(os.getenv("PYMYSQL_DB_PORT", 3306))

    pymysql_conn = pymysql_connect_to_db(DB_HOST_PY, DB_USER_PY, DB_PASSWORD_PY, DB_NAME_PY, DB_PORT_PY)

    if pymysql_conn:
        # 确保表存在 (可以使用上面定义的 pymysql_create_sample_employees_table)
        # pymysql_create_sample_employees_table(pymysql_conn)

        # 执行DML操作
        py_new_id = pymysql_insert_new_employee(pymysql_conn, "Sarah", "Miller", "2023-02-20", 72000.00)
        if py_new_id:
            pymysql_update_employee_salary(pymysql_conn, py_new_id, 73500.00)
            # pymysql_query_all_employees(pymysql_conn) # 可用于验证
            pymysql_delete_employee(pymysql_conn, py_new_id)
            # pymysql_query_all_employees(pymysql_conn) # 可用于验证
        else:
            print("PyMySQL: 由于插入失败,后续的更新和删除操作未执行。")

        pymysql_conn.close()
        print(f"
PyMySQL: 数据库 '{
              DB_NAME_PY}' DML操作演示完成,连接已关闭。")

代码解释:

事务处理: 与 mysql-connector-python 一样,PyMySQL 对于DML操作也需要显式的 connection.commit()。如果在发生错误时需要撤销更改,则调用 connection.rollback()
cursor.lastrowid: PyMySQL 游标对象也提供 lastrowid 属性来获取最后插入的自增ID。
cursor.rowcount: 对于DML操作,返回受影响的行数。
with connection.cursor() as cursor:: 再次强调,这是管理游标的最佳实践。

4.2.6 PyMySQLmysql-connector-python 的主要区别总结 (基础操作层面)

模块导入: import pymysql vs import mysql.connector
游标类型指定:

PyMySQL: conn.cursor(pymysql.cursors.DictCursor) 或在连接时 cursorclass=pymysql.cursors.DictCursor
mysql-connector-python: conn.cursor(dictionary=True, buffered=True, named_tuple=True) 等参数。

错误对象属性:

PyMySQL: err.args[0] (code), err.args[1] (message)。
mysql-connector-python: err.errno, err.msg

获取服务器信息:

PyMySQL: 通常需要执行 SELECT VERSION()
mysql-connector-python: conn.get_server_info()

SSL 配置: 参数名和结构略有不同。PyMySQL 通常使用 ssl={'ca': ...} 字典。
服务器端游标: PyMySQL 提供了明确的 SSCursorSSDictCursormysql-connector-python 的默认游标在某些模式下可能表现出类似流式的行为,但不如 PyMySQL 的服务器端游标那样显式。

尽管存在这些差异,但由于两者都遵循DB-API 2.0,核心的 execute(), fetchone(), fetchall(), commit(), rollback(), close() 等方法的行为和用法基本一致。

4.3 mysqlclient 简介与基础操作对比

mysqlclientMySQLdb 的一个分支,是一个C扩展模块,通常被认为性能较高。其API也遵循DB-API 2.0。

安装 (回顾): 需要C编译器和MySQL开发库。pip install mysqlclient

连接与使用 (与PyMySQL非常相似):

# import MySQLdb # 这是 mysqlclient 导入时使用的历史名称
# # 或者更现代的用法中,如果你安装了 mysqlclient,有时也可以直接通过
# # import _mysql # 底层C模块,但不推荐直接使用
# # 最常见的是通过 ORM 或框架间接使用,它们会处理导入
# # 如果直接使用,通常是:
# # from MySQLdb import connect, cursors # 假设 MySQLdb 仍然是可用的导入路径

# # 实际上,安装了 mysqlclient 后,它通常会作为 MySQLdb 的替代品。
# # 因此,很多依赖 MySQLdb 的旧代码可以直接运行。
# # 如果你的环境中有 mysqlclient 并且它是 Python 3 的兼容版本:
# try:
#     import MySQLdb as DatabaseModule # 尝试导入 MySQLdb (mysqlclient 会扮演这个角色)
#     from MySQLdb.cursors import DictCursor # 导入字典游标
#     print("mysqlclient (as MySQLdb) 模块已加载。")
# except ImportError:
#     print("无法导入 MySQLdb (mysqlclient)。请确保已正确安装。")
#     DatabaseModule = None # 设置为None,以便后续代码可以检查

# def mysqlclient_connect_to_db(db_host, db_user, db_password, db_name, db_port=3306):
#     if not DatabaseModule:
#         return None
#     try:
#         connection = DatabaseModule.connect( # 使用 MySQLdb.connect()
#             host=db_host,
#             user=db_user,
#             passwd=db_password, # 注意: 参数名是 passwd 而不是 password
#             db=db_name,       # 注意: 参数名是 db 而不是 database
#             port=db_port,
#             charset='utf8mb4'
#             # cursorclass=DictCursor # 也可以在连接时指定
#         )
#         print(f"mysqlclient: 成功连接到数据库 '{db_name}' 在服务器 {db_host}:{db_port}")
#         # 获取版本信息
#         # server_info = connection.get_server_info() # mysqlclient 连接对象有 get_server_info()
#         # print(f"mysqlclient: MySQL 服务器版本: {server_info}")
#         return connection
#     except DatabaseModule.Error as err:
#         # 错误处理方式与 PyMySQL 类似,通常通过 err.args
#         error_code = err.args[0] if len(err.args) > 0 else "N/A"
#         error_message = err.args[1] if len(err.args) > 1 else str(err)
#         print(f"mysqlclient: 连接数据库 '{db_name}' 失败。错误码: {error_code}, 消息: {error_message}")
#         return None

# if __name__ == "__main__":
#     if DatabaseModule:
#         DB_HOST_MC = os.getenv("MYSQLCLIENT_DB_HOST", "localhost")
#         DB_USER_MC = os.getenv("MYSQLCLIENT_DB_USER", "your_mysql_user")
#         DB_PASSWORD_MC = os.getenv("MYSQLCLIENT_DB_PASSWORD", "your_mysql_password") # passwd
#         DB_NAME_MC = os.getenv("MYSQLCLIENT_DB_NAME", "your_database_name") # db
#         DB_PORT_MC = int(os.getenv("MYSQLCLIENT_DB_PORT", 3306))
#
#         print("
--- mysqlclient 连接测试 ---")
#         mc_conn = mysqlclient_connect_to_db(DB_HOST_MC, DB_USER_MC, DB_PASSWORD_MC, DB_NAME_MC, DB_PORT_MC)
#
#         if mc_conn:
#             print(f"mysqlclient: 连接对象: {mc_conn}")
#             try:
#                 with mc_conn.cursor(DictCursor) as cursor: # 使用字典游标
#                     cursor.execute("SELECT * FROM employees WHERE salary > %s LIMIT %s", (55000, 2)) # 参数化查询
#                     print("mysqlclient: 查询薪水 > 55000 (前2条):")
#                     for row in cursor:
#                         print(row)
#                 mc_conn.commit() # 如果有DML则需要
#             except DatabaseModule.Error as err:
#                 print(f"mysqlclient: 查询失败: {err.args[1] if len(err.args) > 1 else err}")
#             finally:
#                 mc_conn.close()
#                 print(f"mysqlclient: 数据库 '{DB_NAME_MC}' 连接已关闭。")
#     else:
#         print("mysqlclient 模块未加载,跳过相关测试。")

mysqlclient (MySQLdb) 的关键区别点:

导入: 通常 import MySQLdbmysqlclient 会安装为这个名字的兼容替代品。
连接参数:

密码参数是 passwd (不是 password)。
数据库名参数是 db (不是 database)。

游标: from MySQLdb.cursors import DictCursor (或其他游标类型)。
错误对象: MySQLdb.Error,错误信息也通常在 err.args 中。
get_server_info(): 连接对象通常有此方法。

由于 mysqlclient 是C扩展,其安装和某些特定行为可能更依赖于底层的 MySQL C API 版本。但在遵循DB-API 2.0的核心操作上,其Python层面的使用方式与其他连接器大同小异。

我们已经覆盖了三种主流Python MySQL连接器的基本连接和操作。这为后续深入探讨更高级的主题,如事务管理、错误处理、数据类型映射、性能优化、连接池等,奠定了坚实的基础。

五、参数化查询与SQL注入防范的深层剖析 (In-depth Analysis of Parameterized Queries and SQL Injection Prevention)

在前面的基础操作中,我们已经强调了使用参数化查询(如 cursor.execute(query, params))来防止SQL注入。现在,我们将更深入地探讨其内部机制、不同参数样式的细微差别、以及在复杂场景下如何正确和安全地构建查询。

5.1 SQL 注入的原理与危害 (Principle and Dangers of SQL Injection)

在详细讨论防范措施之前,必须深刻理解SQL注入是如何发生的以及它能造成多大的破坏。

原理:
SQL注入发生在当应用程序将用户提供(或外部来源)的不可信数据直接拼接或不当嵌入到SQL查询语句中,而没有进行充分的验证和转义时。攻击者可以通过构造特殊的输入,改变原始SQL语句的语义,从而执行恶意的数据库操作。

一个经典的错误示例 (绝对不要这样做!):

# !!! 这是一个非常不安全的示例,仅用于演示SQL注入原理 !!!
def unsafe_get_user_by_username(connection, username_input):
    # 假设 username_input 来自用户在网页表单中的输入
    try:
        with connection.cursor(dictionary=True) as cursor:
            # 直接将用户输入拼接到SQL语句中,极度危险!
            sql_query = f"SELECT user_id, username, email FROM users WHERE username = '{
                username_input}'" # SQL语句拼接
            print(f"  [不安全] 执行的SQL: {
                sql_query}") # 打印执行的SQL (用于观察)
            cursor.execute(sql_query) # 执行拼接后的SQL
            user = cursor.fetchone()
            if user:
                print(f"  [不安全] 找到用户: {
                user}")
                return user
            else:
                print(f"  [不安全] 未找到用户: {
                username_input}")
                return None
    except Exception as e: # 简化错误处理
        print(f"  [不安全] 查询出错: {
                e}")
        return None

# 攻击场景:
# 1. 正常输入: username_input = "alice"
#    SQL会是: SELECT user_id, username, email FROM users WHERE username = 'alice' (正常)

# 2. 恶意输入1 (绕过认证): username_input = "admin' -- "
#    SQL会是: SELECT user_id, username, email FROM users WHERE username = 'admin' -- '
#    解释:
#      - 'admin' 被注入。
#      - ' (单引号) 闭合了前面 username = ' 的单引号。
#      - -- (两个连字符加空格) 是SQL中的行注释符,它会注释掉原始SQL语句中该符号之后的所有内容。
#      - 结果是,WHERE子句变成了 WHERE username = 'admin',原始SQL中可能存在的其他条件或末尾的单引号被注释掉了。
#      - 如果存在名为 'admin' 的用户,即使密码错误,也可能登录成功(取决于后续逻辑)。

# 3. 恶意输入2 (获取所有用户数据): username_input = "whatever' OR '1'='1"
#    SQL会是: SELECT user_id, username, email FROM users WHERE username = 'whatever' OR '1'='1'
#    解释:
#      - WHERE子句变成了 username = 'whatever' OR '1'='1'。
#      - 因为 '1'='1' 永远为真 (TRUE),所以整个OR条件也永远为真。
#      - 这将导致查询返回 `users` 表中的所有行,无视 `username` 的原始条件。

# 4. 恶意输入3 (执行其他命令,如删表 - 如果权限允许且查询结构允许拼接):
#    username_input = "nobody'; DROP TABLE users; -- "
#    SQL会是: SELECT user_id, username, email FROM users WHERE username = 'nobody'; DROP TABLE users; -- '
#    解释:
#      - 通过分号 (;) 注入了第二条SQL命令 `DROP TABLE users`。
#      - 如果数据库连接允许执行多语句 (multi-statement queries,某些连接器或配置下可能允许),
#        并且执行该查询的数据库用户有 `DROP TABLE` 权限,这将导致 `users` 表被删除。

# (假设 pymysql_conn 已连接,并且 users 表已创建并包含数据)
# if __name__ == "__main__":
#     if pymysql_conn:
#         print("
--- SQL注入攻击演示 ---")
#         # 准备users表 (简单版本)
#         with pymysql_conn.cursor() as cur:
#             cur.execute("DROP TABLE IF EXISTS users")
#             cur.execute("CREATE TABLE users (user_id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE, email VARCHAR(100), password_hash VARCHAR(255))")
#             cur.execute("INSERT INTO users (username, email, password_hash) VALUES ('alice', 'alice@example.com', 'hash1'), ('bob', 'bob@example.com', 'hash2'), ('admin_user', 'admin@example.com', 'admin_hash')")
#             pymysql_conn.commit()

#         print("
尝试正常输入:")
#         unsafe_get_user_by_username(pymysql_conn, "alice")

#         print("
尝试恶意输入1 (绕过):")
#         unsafe_get_user_by_username(pymysql_conn, "admin_user' -- ") # 尝试获取admin_user的信息

#         print("
尝试恶意输入2 (获取所有):")
#         unsafe_get_user_by_username(pymysql_conn, "x' OR '1'='1") # 获取所有用户信息

#         # 恶意输入3 (删表) 通常需要特定条件才能成功,这里不直接运行以防意外
#         # print("
尝试恶意输入3 (删表 - 危险,不执行):")
#         # username_for_drop = "y'; DROP TABLE users; -- "
#         # print(f"  如果执行: SELECT ... WHERE username = '{username_for_drop}'")

#         if pymysql_conn: pymysql_conn.close()

危害:

数据泄露: 攻击者可以获取未授权访问的敏感数据(如用户凭证、信用卡信息、个人隐私)。
数据篡改: 攻击者可以修改或删除数据库中的数据。
拒绝服务 (DoS): 通过执行耗尽资源的查询或破坏数据库结构。
服务器接管: 在某些极端情况下,如果数据库服务器配置不当或存在漏洞,SQL注入可能导致攻击者在数据库服务器上执行任意操作系统命令。
破坏数据完整性和一致性
声誉损害和法律责任

5.2 参数化查询的内部工作机制 (How Parameterized Queries Work Internally)

参数化查询(也称为预准备语句 – Prepared Statements,尽管在Python DB-API层面不总是显式地分两步)是防御SQL注入的基石。其核心思想是将SQL代码结构与用户提供的数据严格分离

SQL模板发送:
当使用 cursor.execute("SELECT ... WHERE col = %s", (value,)) 时,Python连接器首先将包含占位符(如 %s)的SQL模板(或称查询结构)发送到MySQL服务器。
例如,发送的是类似 "SELECT user_id, username FROM users WHERE username = ?" (服务器端可能会将 %s 转换为 ? 或其内部占位符)。

服务器端解析和编译SQL模板:
MySQL服务器接收到这个SQL模板后,对其进行解析、语法检查和生成执行计划的准备工作。此时,服务器知道这是一个期望接收一个参数的查询结构,但还不知道参数的具体值是什么。

参数值独立发送:
然后,连接器将参数元组 (value,) 中的数据独立地、作为数据本身发送到MySQL服务器。这些数据不会被服务器当作SQL代码的一部分来解析。

服务器端绑定参数并执行:
MySQL服务器接收到参数值后,将这些值安全地绑定到之前已准备好的SQL模板的占位符位置,然后执行完整的查询。由于参数值是在SQL结构确定之后才被绑定的,并且是以数据形式处理,所以即使参数值中包含SQL特殊字符(如 ', ;, --),它们也只会被当作普通的字符串数据(或其他数据类型)来对待,而不会改变SQL语句的原始结构。

为什么这能防止SQL注入?
因为攻击者输入的恶意字符(如 ' OR '1'='1)是在第3步作为数据发送的。当服务器在第4步绑定参数时,它会将整个 "' OR '1'='1" 字符串视为 username 列应该匹配的值,而不是将其解析为SQL逻辑。
所以,查询会变成类似(概念上):
SELECT ... WHERE username = ('' OR '1'='1') (引号被正确处理为字符串的一部分)
这通常不会匹配任何合法的用户名,从而阻止了攻击。

5.3 不同连接器的参数样式回顾与实践

mysql-connector-python:

默认支持 format (%s) 和 pyformat (%(name)s) 风格。
paramstyle 属性值为 'pyformat'
尽管其 paramstylepyformat,但它也广泛接受 %s 形式的占位符,并且这是其文档和示例中更常见的用法。

# 使用 mysql-connector-python
# import mysql.connector
# config = {'user': 'u', 'password': 'p', 'host': 'h', 'database': 'd'}
# cnx = mysql.connector.connect(**config)
# cursor = cnx.cursor()

# 使用 %s 风格 (推荐)
query_s = "SELECT name FROM products WHERE category_id = %s AND price < %s"
params_s = (10, 50.00) # 元组形式的参数
# cursor.execute(query_s, params_s)

# 使用 %(name)s 风格 (pyformat)
query_named = "UPDATE orders SET status = %(new_status)s WHERE order_id = %(order_id)s"
params_named = {
              'new_status': 'SHIPPED', 'order_id': 123} # 字典形式的参数
# cursor.execute(query_named, params_named)

# cursor.close()
# cnx.close()

PyMySQL:

也主要支持 format (%s) 和 pyformat (%(name)s) 风格。
paramstyle 属性通常也是 'pyformat''format' (具体可能取决于版本或内部实现)。
使用 %s 是最常见的方式。

# 使用 PyMySQL
# import pymysql
# conn_py = pymysql.connect(host='h', user='u', password='p', database='d')
# cursor_py = conn_py.cursor()

# 使用 %s 风格
query_py_s = "DELETE FROM logs WHERE log_date < %s"
params_py_s = ('2023-01-01',) # 单个参数也要是元组
# cursor_py.execute(query_py_s, params_py_s)

# 使用 %(name)s 风格
query_py_named = "SELECT data FROM config WHERE section = %(section_name)s AND key_name = %(key)s"
params_py_named = {
              'section_name': 'network', 'key': 'timeout'}
# cursor_py.execute(query_py_named, params_py_named)
# conn_py.commit() # 如果是DML

# cursor_py.close()
# conn_py.close()

mysqlclient (MySQLdb):

传统上,MySQLdbparamstyleformat
因此,%s 是其标准的参数占位符。它通常不直接支持 pyformat%(name)s 风格,除非你使用的版本或其包装库(如Django ORM)做了额外处理。

# 使用 mysqlclient (MySQLdb)
# import MySQLdb
# conn_mc = MySQLdb.connect(host='h', user='u', passwd='p', db='d')
# cursor_mc = conn_mc.cursor()

# 主要使用 %s 风格
query_mc_s = "INSERT INTO events (event_type, user_id, event_data) VALUES (%s, %s, %s)"
params_mc_s = ('login', 1001, '{"ip": "192.168.1.10"}')
# cursor_mc.execute(query_mc_s, params_mc_s)
# conn_mc.commit()

# cursor_mc.close()
# conn_mc.close()

企业级建议:

始终使用参数化: 这是最重要的一点。
优先选择 %s 风格: 它是所有主流Python MySQL连接器都良好支持的、最简单、最通用的参数化风格。
%(name)s 风格的适用场景: 当SQL语句较长,参数较多时,使用命名参数可以提高代码的可读性,减少因参数顺序错误导致的bug。但要确保你选择的连接器支持它。
不要信任任何外部输入: 即使是来自内部系统或看起来“安全”的来源的数据,如果它最终会成为SQL查询的一部分,也应该通过参数化传递,或者进行严格的白名单验证和类型检查。

5.4 处理 LIKE 子句中的参数化

当在 LIKE 子句中使用参数化时,通配符 (%_) 需要作为参数值的一部分传递,而不是在SQL模板中。

def search_products_by_name_safely(connection, search_term, connector_name="PyMySQL"):
    """安全地使用参数化查询 LIKE 子句搜索产品。"""
    print(f"
--- {
              connector_name}: 安全搜索产品名包含 '{
              search_term}' ---")
    try:
        # 根据连接器类型选择合适的游标和错误处理 (简化示例,假设字典游标可用)
        if connector_name == "mysql.connector":
            cursor = connection.cursor(dictionary=True)
        else: # PyMySQL or mysqlclient
            cursor_class = getattr(pymysql.cursors if connector_name == "PyMySQL" else DatabaseModule.cursors, "DictCursor", None)
            cursor = connection.cursor(cursor_class) if cursor_class else connection.cursor()


        # 错误的做法 (直接拼接通配符到SQL模板 - 仍然有风险,虽然比完全不参数化好一点,但不推荐):
        # query_unsafe_like = f"SELECT product_name, unit_price FROM products_example WHERE product_name LIKE '%{search_term}%'"
        # cursor.execute(query_unsafe_like) # 不推荐!如果search_term包含特殊字符可能仍有问题

        # 正确的做法: 将通配符作为参数值的一部分
        # 模式1: 包含 (contains)
        param_contains = f"%{
              search_term}%" # 在Python代码中构建带通配符的参数值
        query_contains = "SELECT product_id, product_name, unit_price FROM products_example WHERE product_name LIKE %s"

        # 模式2: 开头是 (starts with)
        param_startswith = f"{
              search_term}%"
        query_startswith = "SELECT product_id, product_name, unit_price FROM products_example WHERE product_name LIKE %s"

        # 模式3: 结尾是 (ends with)
        param_endswith = f"%{
              search_term}"
        query_endswith = "SELECT product_id, product_name, unit_price FROM products_example WHERE product_name LIKE %s"

        print(f"  执行 '包含' 搜索 (参数: '{
              param_contains}')")
        cursor.execute(query_contains, (param_contains,))
        results_contain = cursor.fetchall()
        if results_contain:
            for row in results_contain: print(f"    {
              row}")
        else:
            print("    未找到匹配 '包含' 条件的产品。")

        print(f"
  执行 '开头是' 搜索 (参数: '{
              param_startswith}')")
        cursor.execute(query_startswith, (param_startswith,))
        results_startswith = cursor.fetchall()
        if results_startswith:
            for row in results_startswith: print(f"    {
              row}")
        else:
            print("    未找到匹配 '开头是' 条件的产品。")

    except Exception as err: # 通用错误捕获 (具体连接器错误应分开处理)
        print(f"{
              connector_name}: LIKE 查询失败: {
              err}")
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()

# 准备 products_example 表 (如果不存在)
def setup_products_table_for_like(connection, connector_name="PyMySQL"):
    try:
        with connection.cursor() as cursor:
            cursor.execute("DROP TABLE IF EXISTS products_example")
            cursor.execute("""
                CREATE TABLE products_example (
                    product_id INT AUTO_INCREMENT PRIMARY KEY, product_name VARCHAR(100), unit_price DECIMAL(10,2)
                ) ENGINE=InnoDB CHARACTER SET=utf8mb4
            """)
            products_data = [
                ('Laptop Pro X1', 1200.00), ('Wireless Mouse G500', 45.00),
                ('Mechanical Keyboard K7', 150.00), ('USB-C Hub Pro', 60.00),
                ('Laptop Stand V2', 25.00), ('Pro Gaming Mousepad', 20.00)
            ]
            insert_query = "INSERT INTO products_example (product_name, unit_price) VALUES (%s, %s)"
            cursor.executemany(insert_query, products_data) # 批量插入
            connection.commit()
            print(f"{
              connector_name}: 'products_example' 表已创建并填充。")
    except Exception as e:
        print(f"{
              connector_name}: 设置 products_example 表失败: {
              e}")

# if __name__ == "__main__":
    # 假设 pymysql_conn 或 mysql_conn (来自 mysql.connector) 已连接
    # (需要确保这些连接对象在使用前已经成功初始化)

    # # 使用 PyMySQL 示例
    # if pymysql_conn:
    #     setup_products_table_for_like(pymysql_conn, "PyMySQL")
    #     search_products_by_name_safely(pymysql_conn, "Pro", "PyMySQL")
    #     search_products_by_name_safely(pymysql_conn, "Mouse", "PyMySQL")
    #     if pymysql_conn: pymysql_conn.close()

    # # 使用 mysql-connector-python 示例 (需要先建立 mysql_conn)
    # # config_mc = {'user': 'u', 'password': 'p', 'host': 'h', 'database': 'd'}
    # # mysql_conn_official = mysql.connector.connect(**config_mc)
    # # if mysql_conn_official:
    # #     setup_products_table_for_like(mysql_conn_official, "mysql.connector")
    # #     search_products_by_name_safely(mysql_conn_official, "Laptop", "mysql.connector")
    # #     if mysql_conn_official: mysql_conn_official.close()

代码解释:

param_contains = f"%{search_term}%": 关键点。SQL的 LIKE 通配符 % 是在Python字符串中与用户输入 search_term 结合的。
query_contains = "... WHERE product_name LIKE %s": SQL模板本身只包含一个 %s 占位符。
cursor.execute(query_contains, (param_contains,)): 将构造好的、包含通配符的字符串 param_contains 作为单个数据参数传递给数据库。数据库驱动程序会确保这个参数值被正确地转义和引用,即使 search_term 本身包含特殊字符(如单引号),也不会破坏SQL结构。
这种方法既安全,又能灵活地实现不同的 LIKE 匹配模式(包含、开头是、结尾是)。

5.5 处理 IN 子句中的参数化

IN 子句允许你匹配列值是否在一个给定的值列表中。当这个列表是动态的(例如,来自用户选择的多个ID),参数化会稍微复杂一些,因为SQL模板中的占位符数量需要与参数列表的长度匹配。

错误的做法 (直接拼接):

# ids_list = [1, 2, 3]
# query_unsafe_in = f"SELECT * FROM items WHERE item_id IN ({','.join(map(str, ids_list))})" # 非常危险

如果 ids_list 中的元素不是纯数字,或者可以被恶意构造,这会导致SQL注入。

安全的做法: 动态生成占位符。

def get_items_by_ids_safely(connection, item_ids_list, connector_name="PyMySQL"):
    """安全地使用参数化查询 IN 子句获取项目。"""
    if not item_ids_list: # 如果ID列表为空
        print(f"
--- {
              connector_name}: ID列表为空,无需查询 ---")
        return []

    print(f"
--- {
              connector_name}: 安全查询 item_ids IN {
              item_ids_list} ---")
    results = []
    try:
        # 根据连接器类型选择合适的游标和错误处理 (简化示例)
        if connector_name == "mysql.connector":
            cursor = connection.cursor(dictionary=True)
        else:
            cursor_class = getattr(pymysql.cursors if connector_name == "PyMySQL" else DatabaseModule.cursors, "DictCursor", None)
            cursor = connection.cursor(cursor_class) if cursor_class else connection.cursor()

        # 1. 为 IN 子句动态生成占位符字符串
        # 例如,如果 item_ids_list 是 [1, 2, 3],那么 placeholders 应该是 "%s, %s, %s"
        placeholders = ', '.join(['%s'] * len(item_ids_list)) # 生成逗号分隔的 %s 占位符

        # 2. 构建完整的SQL查询模板
        query = f"SELECT item_id, item_name FROM items WHERE item_id IN ({
              placeholders})" # 将占位符嵌入查询
        # 注意: 这里使用f-string嵌入placeholders是安全的,因为placeholders是我们自己生成的固定 "%s" 字符串,
        # 不是来自用户输入。最终的 item_ids_list 才是作为参数传递的。

        print(f"  执行的SQL模板 (占位符): {
              query}")
        print(f"  传递的参数: {
              tuple(item_ids_list)}") # 参数必须是元组或列表

        # 3. 执行查询,将 item_ids_list (转换为元组) 作为参数传递
        cursor.execute(query, tuple(item_ids_list)) # 将ID列表转换为元组作为参数传递
        fetched_items = cursor.fetchall()

        if fetched_items:
            print("  找到的项目:")
            for item in fetched_items:
                results.append(item)
                print(f"    {
              item}")
        else:
            print("  未找到任何匹配ID的项目。")

    except Exception as err:
        print(f"{
              connector_name}: IN 子句查询失败: {
              err}")
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()
    return results

# 准备 items 表
def setup_items_table(connection, connector_name="PyMySQL"):
    try:
        with connection.cursor() as cursor:
            cursor.execute("DROP TABLE IF EXISTS items")
            cursor.execute("""
                CREATE TABLE items (item_id INT PRIMARY KEY, item_name VARCHAR(100))
                ENGINE=InnoDB CHARACTER SET=utf8mb4
            """)
            items_data = [(1, 'Apple'), (2, 'Banana'), (3, 'Cherry'), (4, 'Date'), (5, 'Elderberry')]
            insert_query = "INSERT INTO items (item_id, item_name) VALUES (%s, %s)"
            cursor.executemany(insert_query, items_data)
            connection.commit()
            print(f"{
              connector_name}: 'items' 表已创建并填充。")
    except Exception as e:
        print(f"{
              connector_name}: 设置 items 表失败: {
              e}")

# if __name__ == "__main__":
    # 假设 pymysql_conn 或 mysql_conn 已连接
    # # 使用 PyMySQL 示例
    # if pymysql_conn:
    #     setup_items_table(pymysql_conn, "PyMySQL")
    #     get_items_by_ids_safely(pymysql_conn, [1, 3, 5], "PyMySQL")
    #     get_items_by_ids_safely(pymysql_conn, [2, 4], "PyMySQL")
    #     get_items_by_ids_safely(pymysql_conn, [99], "PyMySQL") # 测试不存在的ID
    #     get_items_by_ids_safely(pymysql_conn, [], "PyMySQL")    # 测试空列表
    #     if pymysql_conn: pymysql_conn.close()

    # # 使用 mysql-connector-python 示例
    # # if mysql_conn_official:
    # #     setup_items_table(mysql_conn_official, "mysql.connector")
    # #     get_items_by_ids_safely(mysql_conn_official, [1, 2], "mysql.connector")
    # #     if mysql_conn_official: mysql_conn_official.close()

代码解释:

placeholders = ', '.join(['%s'] * len(item_ids_list))

['%s'] * len(item_ids_list): 创建一个包含 len(item_ids_list)'%s' 字符串的列表。例如,如果 item_ids_list 有3个元素,这将是 ['%s', '%s', '%s']
', '.join(...): 将列表中的元素用逗号和空格连接起来,得到如 "%s, %s, %s" 的字符串。

query = f"SELECT ... WHERE item_id IN ({placeholders})":

这里使用 f-string 将我们自己安全生成的 placeholders 字符串嵌入到SQL模板中是安全的,因为 placeholders 只包含 , %s,不包含任何来自外部的、可能有害的数据。

cursor.execute(query, tuple(item_ids_list)):

将原始的 item_ids_list(转换为元组)作为参数传递。连接器会将这个元组中的每个值依次绑定到SQL模板中的每个 %s

企业级考量 (IN 子句):

参数数量限制: MySQL(以及大多数数据库)对单个SQL语句中可以使用的参数数量(或 IN 列表中的元素数量)有限制。这个限制通常很大(数千个),但如果 item_ids_list 可能包含成千上万个ID,直接构建一个巨大的 IN 子句可能会达到这个限制或导致性能问题(解析长SQL、优化器处理等)。

MySQL 的 max_allowed_packet 也会影响能发送的SQL语句的大小。
对于非常大的ID列表,考虑分批处理,或者将ID列表插入到一个临时表中,然后使用 JOIN 进行查询。

# 伪代码:使用临时表处理大量ID
# def get_items_by_large_ids_via_temp_table(connection, large_id_list):
#     with connection.cursor() as cursor:
#         # 1. 创建一个临时表 (会话级别,连接关闭后自动删除)
#         cursor.execute("CREATE TEMPORARY TABLE temp_ids (id INT PRIMARY KEY)")
#         # 2. 将ID批量插入临时表
#         id_tuples = [(id_val,) for id_val in large_id_list] # [(1,), (2,), ...]
#         cursor.executemany("INSERT INTO temp_ids (id) VALUES (%s)", id_tuples)
#         connection.commit() # 确保临时表数据写入 (如果需要跨多个语句或事务阶段)
#
#         # 3. JOIN 查询
#         query = "SELECT i.item_id, i.item_name FROM items i JOIN temp_ids t ON i.item_id = t.id"
#         cursor.execute(query)
#         results = cursor.fetchall()
#         # 4. 临时表会自动清理,或显式 DROP TEMPORARY TABLE temp_ids;
#         return results

性能: 巨大的 IN 列表也可能导致查询优化器选择次优的执行计划。使用临时表 JOIN 对于非常大的列表通常性能更稳定。

5.6 其他需要注意的参数化场景

ORDER BYGROUP BY 子句:

不能直接参数化列名: ORDER BY %sGROUP BY %s 通常是不安全或不工作的。数据库驱动程序会将 %s 替换为一个字符串字面量,例如 ORDER BY 'column_name',这通常会导致按常量字符串排序(无实际效果)或语法错误。
安全处理动态排序/分组:

白名单验证: 如果排序列名或分组列名来自用户输入,必须将其与一个预定义的、安全的列名列表(白名单)进行比较。只有当输入的列名在白名单中时,才将其直接(但安全地,例如确保没有注入)拼接到SQL语句中。
映射: 将用户输入映射到实际的列名。

def get_sorted_employees_safely(connection, sort_column_input, sort_order_input="ASC"):
    """安全地按动态列排序员工。"""
    print(f"
--- 安全动态排序: 列='{
                sort_column_input}', 顺序='{
                sort_order_input}' ---")
    # 1. 白名单验证排序列
    allowed_sort_columns = {
              "first_name", "last_name", "hire_date", "salary"}
    if sort_column_input not in allowed_sort_columns:
        print(f"  错误: 无效的排序列 '{
                sort_column_input}'。将使用默认排序。")
        sort_column_sql = "emp_no" # 默认排序列
    else:
        sort_column_sql = sort_column_input # 确认是安全的列名

    # 2. 白名单验证排序顺序
    sort_order_sql = "ASC" # 默认升序
    if sort_order_input.upper() == "DESC":
        sort_order_sql = "DESC"

    try:
        with connection.cursor(dictionary=True) as cursor:
            # 这里拼接 sort_column_sql 和 sort_order_sql 是安全的,因为它们已经过白名单验证
            query = f"SELECT emp_no, first_name, last_name, hire_date, salary FROM employees ORDER BY {
                sort_column_sql} {
                sort_order_sql} LIMIT 5"
            print(f"  执行SQL: {
                query}")
            cursor.execute(query) # 注意:这里没有外部参数了,因为列名和顺序已安全嵌入
            for row in cursor:
                print(f"    {
                row}")
    except Exception as e:
        print(f"  动态排序查询失败: {
                e}")

# if __name__ == "__main__":
    # # 假设 pymysql_conn 已连接且 employees 表有数据
    # if pymysql_conn:
    #     get_sorted_employees_safely(pymysql_conn, "salary", "DESC")
    #     get_sorted_employees_safely(pymysql_conn, "last_name")
    #     get_sorted_employees_safely(pymysql_conn, "invalid_column", "ASC") # 测试无效列名
    #     get_sorted_employees_safely(pymysql_conn, "hire_date", "WRONG_ORDER") # 测试无效顺序
    #     if pymysql_conn: pymysql_conn.close()

LIMITOFFSET 子句:

LIMIT %sOFFSET %s (或 LIMIT %s, %s) 是可以安全参数化的,因为这些子句期望的是数字。

def get_paginated_employees(connection, page_number, page_size):
    """获取分页的员工数据。"""
    offset = (page_number - 1) * page_size
    print(f"
--- 分页查询: 第 {
                page_number} 页, 每页 {
                page_size} 条 (OFFSET {
                offset}) ---")
    try:
        with connection.cursor(dictionary=True) as cursor:
            query = "SELECT emp_no, first_name, last_name FROM employees ORDER BY emp_no LIMIT %s OFFSET %s"
            # 或者 LIMIT %s, %s  (如果是两个参数的LIMIT形式,则 (offset, page_size))
            # 这里使用标准的 LIMIT count OFFSET offset
            cursor.execute(query, (page_size, offset)) # 参数顺序: 先是LIMIT的数量,然后是OFFSET
            for row in cursor:
                print(f"    {
                row}")
    except Exception as e:
        print(f"  分页查询失败: {
                e}")

# if __name__ == "__main__":
    # if pymysql_conn:
    #     get_paginated_employees(pymysql_conn, 1, 2) # 第1页,每页2条
    #     get_paginated_employees(pymysql_conn, 2, 2) # 第2页,每页2条
    #     if pymysql_conn: pymysql_conn.close()

表名和列名:

不能参数化表名或列名。例如 SELECT %s FROM %s WHERE ... 是行不通的。占位符是为数据值准备的,不是为SQL结构(如表名、列名、数据库名、SQL关键字)准备的。
如果需要动态指定表名或列名(例如,在ORM或元数据驱动的工具中),必须:

严格白名单验证: 确保传入的表名/列名是预期的、合法的、已知的。
转义/引用标识符: 如果表名或列名可能包含特殊字符或与SQL关键字冲突,需要使用数据库特定的标识符引用字符(MySQL中是反引号 `)。Python连接器通常不自动为表名/列名执行此操作。

# 不安全的动态表名 (仅用于说明,不推荐)
# table_name_input = "users" # 或恶意输入 "users; DROP DATABASE students; --"
# query = f"SELECT * FROM `{table_name_input}`" # 即使用反引号,如果输入是恶意的,仍可能出问题

# 更安全的方式:白名单
def query_dynamic_table_safely(connection, table_key, column_key, filter_value):
    allowed_tables = {
                  "users_main": "users", "audit": "audit_log"}
    allowed_columns = {
                  "users_main": {
                  "id", "name", "email"}, "audit": {
                  "log_id", "action", "timestamp"}}

    if table_key not in allowed_tables or column_key not in allowed_columns.get(table_key, set()):
        print("错误: 无效的表名或列名。")
        return

    actual_table_name = allowed_tables[table_key]
    actual_column_name = column_key # 已经是安全的了

    # 注意:这里仍然是字符串格式化,但因为表名和列名来自白名单,所以风险受控。
    # 参数化仍然用于 WHERE 子句的值。
    query = f"SELECT * FROM `{
                    actual_table_name}` WHERE `{
                    actual_column_name}` = %s"
    print(f"  安全动态查询: {
                    query} (参数: {
                    filter_value})")
    # ... (执行 cursor.execute(query, (filter_value,))) ...

参数化查询是Python与MySQL安全交互的绝对核心。深刻理解其原理、正确使用不同场景下的参数化方法,是构建健壮、安全的企业级应用的基础。

六、Python 中使用 asyncioaiomysql 进行异步 MySQL 操作 (Asynchronous MySQL Operations with asyncio and aiomysql in Python)

在现代 Web 应用和网络服务中,I/O 操作(如数据库查询、API 调用、文件读写)往往是性能瓶颈。传统的同步编程模型下,当一个 I/O 操作发生时,整个线程会被阻塞,直到操作完成。这意味着在等待数据库返回结果时,CPU 资源被闲置,无法处理其他请求,在高并发场景下会导致严重的性能问题。

异步编程,特别是 Python 中的 asyncio 框架,提供了一种解决方案。它允许程序在等待一个 I/O 操作完成时,切换去执行其他任务,而不是完全阻塞。当 I/O 操作准备就绪时,程序可以回来处理结果。这种协作式多任务处理方式能够显著提高应用的吞吐量和响应能力。

aiomysql 是一个流行的 Python 库,它为 PyMySQL 提供了一个异步接口,使其能够与 asyncio 无缝集成,从而实现对 MySQL 数据库的异步访问。

6.1. asyncio 基础回顾 (A Quick Recap of asyncio Fundamentals)

在深入 aiomysql 之前,简要回顾一下 asyncio 的核心概念至关重要:

事件循环 (Event Loop): asyncio 的核心,负责调度和执行异步任务、处理 I/O 事件和回调。
协程 (Coroutines): 使用 async def 定义的特殊函数。协程可以暂停执行并将控制权交还给事件循环,以便在等待 I/O 操作时执行其他任务。使用 await 关键字来调用另一个协程或等待一个 Future 对象。
Future 和 Task: Future 对象代表一个最终会产生结果的异步操作。TaskFuture 的一个子类,用于在事件循环中调度和运行协程。
async/await 语法: async 用于声明一个协程函数,await 用于暂停协程的执行,等待一个 awaitable 对象(如另一个协程、FutureTask)完成。

6.2. 安装 aiomysql (Installing aiomysql)

与安装其他 Python 包类似,可以使用 pip 来安装 aiomysql

pip install aiomysql

这个命令会下载并安装 aiomysql 及其依赖项(通常包括 PyMySQL)。

6.3. 建立异步连接 (Establishing Asynchronous Connections)

使用 aiomysql 连接到 MySQL 数据库与使用同步库(如 PyMySQLmysql-connector-python)在概念上相似,但操作是异步的。

6.3.1. 单个异步连接
import asyncio
import aiomysql

# 定义一个异步函数来创建和测试连接
async def connect_to_mysql():
    """
    这个异步函数演示了如何使用 aiomysql 创建一个到 MySQL 数据库的异步连接。
    """
    conn = None  # 初始化连接变量,以便在 finally 块中可用
    try:
        # 使用 aiomysql.connect() 异步创建连接
        # 注意这里的 await 关键字,表示这是一个异步操作,需要等待其完成
        conn = await aiomysql.connect(
            host='127.0.0.1',       # MySQL 服务器主机地址
            port=3306,              # MySQL 服务器端口号
            user='your_user',       # MySQL 用户名
            password='your_password', # MySQL 密码
            db='your_database',     # 要连接的数据库名称
            loop=asyncio.get_event_loop() # 可选:显式传递事件循环,通常aiomysql会自动获取
        )
        print("成功通过 aiomysql 异步连接到 MySQL!")

        # 获取一个异步游标对象
        # 注意:创建游标本身不是异步操作,但后续使用游标执行SQL是异步的
        cur = await conn.cursor()

        # 执行一个简单的异步查询
        await cur.execute("SELECT VERSION();") # await 用于等待数据库执行查询
        version_result = await cur.fetchone()    # await 用于等待获取查询结果
        print(f"MySQL 版本: {
              version_result[0]}")

        # 关闭游标
        await cur.close() # 关闭游标也是一个异步操作(尽管在aiomysql中通常是瞬时的)

    except aiomysql.Error as e:
        print(f"aiomysql 连接或操作错误: {
              e}")
    except Exception as e:
        print(f"发生了预料之外的错误: {
              e}")
    finally:
        if conn:
            # 关闭连接
            conn.close() # 关闭连接。在aiomysql中,close() 本身不是协程,但它会确保异步资源被清理
            print("aiomysql 连接已关闭。")

# 主异步函数
async def main():
    """
    主异步函数,用于运行 connect_to_mysql 协程。
    """
    await connect_to_mysql()

# 运行主异步函数
if __name__ == "__main__":
    # 获取事件循环并运行 main() 协程直到完成
    asyncio.run(main())

代码解释:

import asyncioimport aiomysql: 导入必要的库。
async def connect_to_mysql():: 定义一个协程函数。
conn = await aiomysql.connect(...): 这是建立连接的核心。await 关键字表明这是一个异步调用,程序会在这里暂停,直到连接建立或发生错误,期间事件循环可以处理其他任务。连接参数与同步库类似。

loop=asyncio.get_event_loop(): 虽然 aiomysql 通常能自动找到当前事件循环,但显式传递有时在特定场景下是必要的或更清晰。

cur = await conn.cursor(): 获取一个异步游标。虽然 conn.cursor() 本身可能不是一个耗时的异步操作,但 aiomysql 的设计要求使用 await 来获取游标,以保持 API 的一致性,并为未来可能的异步游标创建逻辑做准备。
await cur.execute("SELECT VERSION();"): 异步执行 SQL 查询。
version_result = await cur.fetchone(): 异步获取查询结果。
await cur.close(): 异步关闭游标。
conn.close(): 关闭连接。在 aiomysql 中,conn.close() 方法本身不是一个协程(即不需要 await),但它会触发底层的清理工作,确保所有与连接相关的异步资源得到妥善释放。
async def main(): 封装了我们的主要异步逻辑。
asyncio.run(main()): Python 3.7+ 中运行顶层异步程序的标准方式。它负责创建事件循环、运行传入的协程直到完成,并管理事件循环的生命周期。

这个例子展示了最基本的异步连接和查询流程。在真实的企业应用中,直接为每个请求创建和关闭连接效率低下。因此,连接池是必不可少的。

6.3.2. 使用异步连接池 (aiomysql.create_pool)

连接池预先创建并维护一定数量的数据库连接,应用程序可以按需从池中获取连接,使用完毕后归还给池,而不是频繁地创建和销毁连接。这大大减少了连接建立的开销,提高了性能和资源利用率。

aiomysql 提供了 create_pool 函数来创建异步连接池。

import asyncio
import aiomysql

# 数据库配置
DB_CONFIG = {
            
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'your_user',
    'password': 'your_password',
    'db': 'your_database',
}

# 全局变量,用于存储连接池
# 在实际应用中,通常在应用启动时初始化
db_pool = None

async def init_db_pool():
    """
    异步初始化 MySQL 连接池。
    这个函数应该在应用程序启动时调用一次。
    """
    global db_pool
    if db_pool is None:
        try:
            # 使用 aiomysql.create_pool() 异步创建连接池
            # minsize: 池中保持的最小空闲连接数
            # maxsize: 池中允许的最大连接数
            # loop: 事件循环
            # autocommit: 设置为 True 表示每个SQL语句执行后自动提交,默认为 False
            db_pool = await aiomysql.create_pool(
                minsize=5,
                maxsize=20,
                **DB_CONFIG, # 将字典解包作为关键字参数传递
                loop=asyncio.get_event_loop(),
                autocommit=True # 示例:启用自动提交,生产环境根据需求调整
            )
            print("aiomysql 连接池初始化成功。")
            print(f"连接池大小: min={
              db_pool.minsize}, max={
              db_pool.maxsize}, current_size={
              db_pool.size}, free={
              db_pool.freesize}")
        except Exception as e:
            print(f"初始化 aiomysql 连接池失败: {
              e}")
            # 在实际应用中,这里可能需要更健壮的错误处理,例如重试或退出应用
            db_pool = None # 确保如果失败,db_pool 仍然是 None

async def close_db_pool():
    """
    异步关闭 MySQL 连接池。
    这个函数应该在应用程序关闭时调用。
    """
    global db_pool
    if db_pool:
        db_pool.close() # 关闭池,释放所有连接的请求会开始
        await db_pool.wait_closed() # 等待所有连接真正关闭
        print("aiomysql 连接池已关闭。")
        db_pool = None

async def execute_query_from_pool(query: str, args=None):
    """
    从连接池获取连接并异步执行查询。

    参数:
        query (str): 要执行的 SQL 查询语句。
        args (tuple, optional): 查询参数 (用于参数化查询)。默认为 None。

    返回:
        查询结果 (如果适用),否则为 None。
    """
    if db_pool is None:
        print("错误:数据库连接池未初始化。")
        return None

    # 使用 async with 语句从连接池获取连接
    # 当离开 async with 块时,连接会自动归还给池
    async with db_pool.acquire() as conn:
        # 从获取的连接创建异步游标
        # 同样使用 async with 来确保游标被正确关闭
        async with conn.cursor(aiomysql.DictCursor) as cur: # 使用 DictCursor 以字典形式获取结果
            try:
                # 异步执行查询
                await cur.execute(query, args)
                print(f"查询 '{
              query}' (参数: {
              args}) 执行成功。")

                # 根据查询类型决定是否获取结果
                if query.strip().upper().startswith("SELECT"):
                    # 如果是 SELECT 查询,获取所有结果
                    result = await cur.fetchall()
                    return result
                else:
                    # 对于 INSERT, UPDATE, DELETE 等操作,可以获取影响的行数
                    affected_rows = cur.rowcount
                    print(f"影响行数: {
              affected_rows}")
                    # 对于非 SELECT 查询,通常不需要显式提交,因为我们在池创建时设置了 autocommit=True
                    # 如果 autocommit=False,则需要在此处调用 await conn.commit()
                    return affected_rows # 或者返回 True 表示成功

            except aiomysql.Error as e:
                print(f"数据库操作错误: {
              e}")
                # 如果 autocommit=False 且发生错误,可能需要 await conn.rollback()
                return None
            except Exception as e:
                print(f"执行查询时发生意外错误: {
              e}")
                return None

async def worker_task(task_id: int):
    """
    一个模拟的工作任务,它从连接池执行查询。
    """
    print(f"任务 {
              task_id} 开始执行...")
    # 示例:插入数据
    insert_query = "INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);"
    employee_name = f"员工_{
              task_id}"
    department = "技术部" if task_id % 2 == 0 else "市场部"
    salary = 5000 + task_id * 100
    await execute_query_from_pool(insert_query, (employee_name, department, salary))

    # 示例:查询数据
    select_query = "SELECT name, salary FROM employees WHERE department = %s ORDER BY salary DESC LIMIT %s;"
    target_department = "技术部"
    limit = 3
    results = await execute_query_from_pool(select_query, (target_department, limit))
    if results:
        print(f"任务 {
              task_id} 查询到部门 '{
              target_department}' 薪资最高的 {
              limit} 名员工:")
        for row in results:
            print(f"  - 姓名: {
              row['name']}, 薪资: {
              row['salary']}")
    print(f"任务 {
              task_id} 执行完毕。")
    print(f"当前连接池状态: size={
              db_pool.size}, free={
              db_pool.freesize}")


async def main_with_pool():
    """
    演示使用连接池的主异步函数。
    """
    # 确保表存在 (仅为演示目的,实际应用中表结构应提前管理好)
    await init_db_pool() # 在所有操作之前初始化连接池
    if not db_pool:
        print("无法初始化连接池,程序退出。")
        return

    # 模拟一个简单的表结构
    create_table_query = """
    CREATE TABLE IF NOT EXISTS employees (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        department VARCHAR(50),
        salary DECIMAL(10, 2),
        hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        UNIQUE KEY unique_name (name)
    ) ENGINE=InnoDB;
    """
    await execute_query_from_pool(create_table_query)
    print("员工表 'employees' 已确保存在或创建。")

    # 创建并运行多个并发任务
    # 这些任务将共享同一个连接池
    num_tasks = 10
    tasks = [worker_task(i) for i in range(1, num_tasks + 1)]
    await asyncio.gather(*tasks) # 并发运行所有 worker_task

    # 所有任务完成后,关闭连接池
    await close_db_pool()

if __name__ == "__main__":
    asyncio.run(main_with_pool())

代码解释:

DB_CONFIG: 将数据库连接参数存储在字典中,方便管理。
db_pool = None: 全局变量,用于持有连接池对象。
async def init_db_pool():

global db_pool: 声明 db_pool 为全局变量。
db_pool = await aiomysql.create_pool(...): 异步创建连接池。

minsize: 池中维护的最小空闲连接数。池会尝试保持至少这么多的连接可用。
maxsize: 池中允许的最大连接数。当所有连接都在使用中且达到此数量时,新的获取连接请求可能需要等待。
**DB_CONFIG: 将 DB_CONFIG 字典解包,传递主机、用户、密码等参数。
loop: 传递事件循环。
autocommit=True: 设置为 True 后,每个 SQL 语句执行后会自动提交,无需显式调用 conn.commit()。这简化了代码,但对于需要精细事务控制的场景,应设为 False

打印连接池状态信息:db_pool.size (当前池中总连接数), db_pool.freesize (当前池中空闲连接数)。

async def close_db_pool():

db_pool.close(): 开始关闭连接池的过程。此调用会立即返回,但池不会立即关闭,它会等待所有已获取的连接被归还。
await db_pool.wait_closed(): 异步等待,直到池中的所有连接都已实际关闭。

async def execute_query_from_pool(query: str, args=None):

async with db_pool.acquire() as conn:: 这是从池中获取连接的关键。db_pool.acquire() 是一个协程,返回一个连接对象。async with 语句确保在代码块结束时(无论正常结束还是发生异常),连接都会通过 conn.release()async with 自动处理)归还给池。
async with conn.cursor(aiomysql.DictCursor) as cur:: 从获取的连接创建游标。aiomysql.DictCursor 使查询结果以字典形式返回(列名为键),更易于使用。同样,async with 确保游标在使用完毕后自动关闭 (await cur.close())。
await cur.execute(query, args): 异步执行参数化的查询。
await cur.fetchall(): 如果是 SELECT 查询,异步获取所有结果。
cur.rowcount: 对于 INSERT, UPDATE, DELETE 等操作,可以通过 cur.rowcount 获取受影响的行数。
事务注意: 如果 autocommit=False (在 create_pool 时设置),则在执行修改数据的操作后,需要显式调用 await conn.commit() 来提交事务,或在发生错误时调用 await conn.rollback() 来回滚。

async def worker_task(task_id: int): 模拟一个并发执行的任务单元。它会调用 execute_query_from_pool 来执行数据库操作。这展示了多个异步任务如何共享同一个连接池。
async def main_with_pool():

await init_db_pool(): 在程序开始时初始化连接池。
create_table_query: 一个创建示例表的 DDL 语句。
tasks = [worker_task(i) for i in range(1, num_tasks + 1)]: 创建多个 worker_task 协程。
await asyncio.gather(*tasks): 并发运行列表中的所有协程。asyncio.gather 会等待所有任务完成后再继续。
await close_db_pool(): 在程序结束前关闭连接池。

if __name__ == "__main__": asyncio.run(main_with_pool()): 运行主程序。

这个连接池的例子更接近真实应用。连接池的参数(minsize, maxsize, timeout 等)需要根据应用的具体负载和 MySQL 服务器的能力进行仔细调优。

6.4. 执行异步查询 (Executing Asynchronous Queries)

一旦获得了异步游标 (cur),执行查询的方式与同步库非常相似,主要区别在于每个可能引起 I/O 等待的操作都需要 await

6.4.1. 执行 SELECT 查询并获取结果
import asyncio
import aiomysql

# (假设 DB_CONFIG 和 db_pool 已按前例设置和初始化)
# (假设 employees 表已创建并有一些数据)

async def fetch_employee_data(department_name: str, min_salary: float):
    """
    异步查询指定部门和最低薪资的员工信息。
    """
    if db_pool is None:
        print("错误:数据库连接池未初始化。")
        return []

    query = """
        SELECT id, name, department, salary, hire_date
        FROM employees
        WHERE department = %s AND salary >= %s
        ORDER BY salary DESC;
    """
    args = (department_name, min_salary)

    async with db_pool.acquire() as conn:
        # 使用 SSCursor (Server-Side Cursor) 来处理可能的大结果集,逐行流式获取
        # 注意:aiomysql 的 SSCursor 行为可能与 mysql-connector-python 的不完全一致,
        # 并且对于非常大的结果集,仍然需要小心内存管理。
        # 对于普通大小的结果集,aiomysql.Cursor 或 aiomysql.DictCursor 通常足够。
        # 这里我们为了演示多样性,使用 aiomysql.DictCursor,它更常用。
        async with conn.cursor(aiomysql.DictCursor) as cur:
            try:
                print(f"执行查询: {
              query} 参数: {
              args}")
                await cur.execute(query, args)

                # 获取单行结果
                # first_employee = await cur.fetchone()
                # if first_employee:
                #     print(f"第一位符合条件的员工: {first_employee}")

                # 获取所有结果
                employees = await cur.fetchall()
                print(f"查询到 {
              len(employees)} 位符合条件的员工。")
                # for emp in employees:
                # print(f"  ID: {emp['id']}, 姓名: {emp['name']}, 部门: {emp['department']}, 薪资: {emp['salary']}")
                return employees

                # 或者,逐行处理 (如果结果集非常大,这可能更优)
                # print("逐行处理结果:")
                # results = []
                # async for row in cur: # aiomysql 的游标支持异步迭代
                #     print(f"  处理员工: {row['name']}")
                #     results.append(row)
                # return results


            except aiomysql.Error as e:
                print(f"查询员工数据时出错: {
              e}")
                return []
            except Exception as e:
                print(f"意外错误: {
              e}")
                return []

async def main_select_example():
    await init_db_pool()
    if not db_pool: return

    # 创建一些示例数据
    await execute_query_from_pool(
        "INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
        ("张三", "技术部", 12000.00)
    )
    await execute_query_from_pool(
        "INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
        ("李四", "技术部", 15000.00)
    )
    await execute_query_from_pool(
        "INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
        ("王五", "市场部", 11000.00)
    )
    await execute_query_from_pool(
        "INSERT INTO employees (name, department, salary) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE salary = VALUES(salary);",
        ("赵六", "技术部", 9000.00)
    )

    print("
--- 查询技术部薪资大于等于10000的员工 ---")
    tech_employees = await fetch_employee_data("技术部", 10000.00)
    for emp in tech_employees:
        print(f"  -> 姓名: {
              emp['name']}, 薪资: {
              emp['salary']}")

    print("
--- 查询市场部薪资大于等于5000的员工 ---")
    market_employees = await fetch_employee_data("市场部", 5000.00)
    for emp in market_employees:
        print(f"  -> 姓名: {
              emp['name']}, 薪资: {
              emp['salary']}")

    await close_db_pool()

if __name__ == "__main__":
    # 运行前确保 employees 表已存在 (可复用上一个例子的 main_with_pool 中的建表逻辑)
    # 简单起见,这里直接调用,但更好的做法是分离应用逻辑和一次性设置
    async def setup_and_run():
        await init_db_pool()
        if not db_pool:
            print("无法初始化连接池,程序退出。")
            return

        create_table_query = """
        CREATE TABLE IF NOT EXISTS employees (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            department VARCHAR(50),
            salary DECIMAL(10, 2),
            hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE KEY unique_name (name)
        ) ENGINE=InnoDB;
        """
        # execute_query_from_pool 内部会从池中获取连接
        async with db_pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(create_table_query)
        print("员工表 'employees' 已确保存在或创建。")
        await close_db_pool() # 关闭初始化时打开的池

        # 现在运行select的例子
        await main_select_example()

    asyncio.run(setup_and_run())

代码解释:

fetch_employee_data:

定义了参数化的 SELECT 查询。
使用 aiomysql.DictCursor,这样 cur.fetchall() 返回的是字典列表,cur.fetchone() 返回单个字典。
await cur.execute(query, args): 异步执行。
await cur.fetchall(): 异步获取所有匹配的行。
异步迭代: aiomysql 的游标对象实现了异步迭代协议 (__aiter____anext__),因此可以使用 async for row in cur: 来逐行处理结果。这对于非常大的结果集特别有用,可以避免一次性将所有数据加载到内存中。上面的代码中注释掉了这部分,但这是一个重要的特性。

main_select_example:

调用 init_db_pool() 初始化连接池。
插入一些示例数据以便查询。这里复用了之前定义的 execute_query_from_pool 函数,但需要注意它内部的 print 语句。
调用 fetch_employee_data 执行查询并打印结果。
调用 close_db_pool() 关闭连接池。

setup_and_runif __name__ == "__main__"::

由于 fetch_employee_data 依赖于 init_db_poolclose_db_pool,并且可能依赖于 employees 表的存在,setup_and_run 协程用于协调这些步骤。它首先确保表存在,然后运行主要的 SELECT 示例。
注意,init_db_poolclose_db_pool 被多次调用。在更复杂的应用中,这些通常是应用生命周期的一部分,init_db_pool 在启动时调用一次,close_db_pool 在关闭时调用一次。

获取结果的方法:

await cur.fetchone(): 获取下一行结果。如果没有更多行,则返回 None
await cur.fetchall(): 获取所有剩余的行。返回一个元组的元组 (默认游标) 或字典列表 (DictCursor)。如果结果集为空,返回一个空元组/列表。
await cur.fetchmany(size=cur.arraysize): 获取指定数量的行。arraysize 是游标的一个属性,默认为1。
async for row in cur:: 通过异步迭代逐行获取。

6.4.2. 执行 INSERT, UPDATE, DELETE 操作

这些操作的执行方式与 SELECT 类似,但通常我们更关心的是操作是否成功以及影响了多少行。如果连接池或连接未设置为 autocommit=True,则需要显式提交事务。

import asyncio
import aiomysql
from datetime import datetime

# (假设 DB_CONFIG, db_pool, init_db_pool, close_db_pool 已定义)
# (假设 employees 表已创建)

async def add_new_employee(name: str, department: str, salary: float, hire_date: datetime = None):
    """
    异步向 employees 表插入一条新员工记录。
    如果连接池的 autocommit=False,则需要显式事务处理。
    为了演示,假设我们的连接池配置为 autocommit=True。
    """
    if db_pool is None:
        print("错误:数据库连接池未初始化。")
        return None

    query = """
        INSERT INTO employees (name, department, salary, hire_date)
        VALUES (%s, %s, %s, %s);
    """
    # 如果 hire_date 未提供,则使用数据库的默认值 (CURRENT_TIMESTAMP) 或 Python 生成的当前时间
    # 为了能插入 NULL 以便让数据库使用默认值,需要传递 None
    effective_hire_date = hire_date if hire_date else datetime.now() # 或者直接传 None

    args = (name, department, salary, effective_hire_date)

    async with db_pool.acquire() as conn:
        async with conn.cursor() as cur: # 不需要 DictCursor,因为 INSERT 不返回复杂结构
            try:
                await cur.execute(query, args)
                # 如果 autocommit=False (在 create_pool 中设置), 需要在这里提交:
                # await conn.commit()
                # print("事务已提交。")

                inserted_id = cur.lastrowid # 获取最后插入行的自增 ID
                affected_rows = cur.rowcount # 获取影响的行数 (对于 INSERT 通常是 1)

                print(f"成功插入员工: {
              name}, ID: {
              inserted_id}, 影响行数: {
              affected_rows}")
                return inserted_id
            except aiomysql.IntegrityError as e: # 例如,违反唯一约束
                print(f"插入员工 '{
              name}' 失败 (数据完整性错误): {
              e}")
                # 如果 autocommit=False, 可能需要回滚:
                # await conn.rollback()
                # print("事务已回滚。")
                return None
            except aiomysql.Error as e:
                print(f"插入员工 '{
              name}' 时数据库操作错误: {
              e}")
                # if conn and not conn.autocommit: await conn.rollback()
                return None
            except Exception as e:
                print(f"插入员工 '{
              name}' 时发生意外错误: {
              e}")
                # if conn and not conn.autocommit: await conn.rollback()
                return None

async def update_employee_salary(name: str, new_salary: float):
    """
    异步更新指定员工的薪资。
    """
    if db_pool is None: return 0

    query = "UPDATE employees SET salary = %s WHERE name = %s;"
    args = (new_salary, name)

    async with db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            try:
                await cur.execute(query, args)
                # if not conn.autocommit: await conn.commit()
                affected_rows = cur.rowcount
                print(f"更新员工 '{
              name}' 薪资为 {
              new_salary}。影响行数: {
              affected_rows}")
                return affected_rows
            except Exception as e:
                print(f"更新员工 '{
              name}' 薪资时发生错误: {
              e}")
                # if conn and not conn.autocommit: await conn.rollback()
                return 0

async def delete_employee(name: str):
    """
    异步删除指定员工。
    """
    if db_pool is None: return 0

    query = "DELETE FROM employees WHERE name = %s;"
    args = (name,)

    async with db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            try:
                await cur.execute(query, args)
                # if not conn.autocommit: await conn.rollback()
                affected_rows = cur.rowcount
                print(f"删除员工 '{
              name}'。影响行数: {
              affected_rows}")
                return affected_rows
            except Exception as e:
                print(f"删除员工 '{
              name}' 时发生错误: {
              e}")
                # if conn and not conn.autocommit: await conn.rollback()
                return 0

async def main_dml_example():
    # 这里假设 init_db_pool 和 close_db_pool 的定义与之前连接池示例一致
    # 并且连接池创建时 autocommit=True
    await init_db_pool()
    if not db_pool: return

    # 清理并创建表 (确保从干净状态开始)
    async with db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("DROP TABLE IF EXISTS employees;")
            await cur.execute("""
                CREATE TABLE employees (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(100) NOT NULL UNIQUE,
                    department VARCHAR(50),
                    salary DECIMAL(10, 2),
                    hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                ) ENGINE=InnoDB;
            """)
            print("员工表 'employees' 已重建。")

    # 添加新员工
    emp_id1 = await add_new_employee("刘备", "管理层", 25000.00, datetime(2023, 1, 10))
    emp_id2 = await add_new_employee("关羽", "战斗部", 22000.00) # 使用默认 hire_date
    emp_id3 = await add_new_employee("张飞", "战斗部", 21000.00)

    # 尝试添加一个同名员工 (应失败,因为 name 字段有唯一约束)
    await add_new_employee("刘备", "后勤部", 10000.00)

    # 更新员工薪资
    await update_employee_salary("关羽", 23000.00)
    await update_employee_salary("诸葛亮", 30000.00) # 尝试更新不存在的员工

    # 查询当前所有员工
    print("
--- 当前员工列表 ---")
    all_employees = await execute_query_from_pool("SELECT name, department, salary, DATE_FORMAT(hire_date, '%Y-%m-%d') as formatted_hire_date FROM employees;") # execute_query_from_pool 来自之前的例子
    if all_employees:
        for emp in all_employees:
            print(f"  {
              emp}")

    # 删除员工
    await delete_employee("张飞")
    await delete_employee("孙权") # 尝试删除不存在的员工

    print("
--- 删除张飞后的员工列表 ---")
    remaining_employees = await execute_query_from_pool("SELECT name, department, salary FROM employees;")
    if remaining_employees:
        for emp in remaining_employees:
            print(f"  {
              emp}")

    await close_db_pool()

if __name__ == "__main__":
    # 为了能复用 execute_query_from_pool,我们需要其定义
    # 在实际项目中,这些函数会组织在模块中
    DB_CONFIG = {
            
        'host': '127.0.0.1', 'port': 3306, 'user': 'your_user',
        'password': 'your_password', 'db': 'your_database',
    }
    db_pool = None

    async def init_db_pool_local(): # 避免与全局冲突,或确保全局db_pool被此函数设置
        global db_pool
        if db_pool is None:
            db_pool = await aiomysql.create_pool(
                minsize=2, maxsize=5, **DB_CONFIG,
                loop=asyncio.get_event_loop(), autocommit=True # 关键:设置为True
            )
            print("本地DML示例连接池初始化成功 (autocommit=True)。")

    async def close_db_pool_local():
        global db_pool
        if db_pool:
            db_pool.close()
            await db_pool.wait_closed()
            print("本地DML示例连接池已关闭。")
            db_pool = None # 重置,以便下次运行可以重新初始化

    # 重新定义 execute_query_from_pool 以便此脚本独立运行
    async def execute_query_from_pool(query: str, args=None):
        if db_pool is None: return None
        async with db_pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(query, args)
                if query.strip().upper().startswith("SELECT"):
                    return await cur.fetchall()
                else:
                    return cur.rowcount

    # 更新 main_dml_example 以使用本地化的池管理函数
    async def main_dml_example_runner():
        await init_db_pool_local()
        if not db_pool:
            print("连接池未能初始化 (DML)。")
            return
        try:
            await main_dml_example_content() # 将原 main_dml_example 逻辑放入新函数
        finally:
            await close_db_pool_local()

    async def main_dml_example_content(): # 原 main_dml_example 的内容
        async with db_pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("DROP TABLE IF EXISTS employees;")
                await cur.execute("""
                    CREATE TABLE employees (
                        id INT AUTO_INCREMENT PRIMARY KEY,
                        name VARCHAR(100) NOT NULL UNIQUE,
                        department VARCHAR(50),
                        salary DECIMAL(10, 2),
                        hire_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    ) ENGINE=InnoDB;
                """)
                print("员工表 'employees' 已重建。")
        emp_id1 = await add_new_employee("刘备", "管理层", 25000.00, datetime(2023, 1, 10))
        emp_id2 = await add_new_employee("关羽", "战斗部", 22000.00)
        emp_id3 = await add_new_employee("张飞", "战斗部", 21000.00)
        await add_new_employee("刘备", "后勤部", 10000.00)
        await update_employee_salary("关羽", 23000.00)
        await update_employee_salary("诸葛亮", 30000.00)
        print("
--- 当前员工列表 ---")
        all_employees = await execute_query_from_pool("SELECT name, department, salary, DATE_FORMAT(hire_date, '%Y-%m-%d') as formatted_hire_date FROM employees;")
        if all_employees:
            for emp in all_employees: print(f"  {
              emp}")
        await delete_employee("张飞")
        await delete_employee("孙权")
        print("
--- 删除张飞后的员工列表 ---")
        remaining_employees = await execute_query_from_pool("SELECT name, department, salary FROM employees;")
        if remaining_employees:
            for emp in remaining_employees: print(f"  {
              emp}")

    # 将 init_db_pool 和 close_db_pool 的调用移到 main_dml_example_runner 内部
    # 并重命名 main_dml_example 为 main_dml_example_content
    # 这是为了确保池的正确生命周期管理在这个特定示例中。
    # 在实际应用中,池的生命周期通常与整个应用的生命周期绑定。

    asyncio.run(main_dml_example_runner())

代码解释:

add_new_employee:

cur.lastrowid: 在执行 INSERT 语句后,如果表有自增主键,可以通过 cur.lastrowid 获取刚插入行的 ID。
cur.rowcount: 对于 INSERT, UPDATE, DELETEcur.rowcount 返回受操作影响的行数。
错误处理: 捕捉 aiomysql.IntegrityError 可以处理像唯一键冲突这样的特定数据库错误。
事务: 注释中提到了如果 autocommit=False,则需要 await conn.commit()await conn.rollback()。由于我们的连接池示例设置了 autocommit=True,所以这里不需要显式调用。

update_employee_salarydelete_employee: 逻辑与 add_new_employee 类似,执行相应的 SQL 语句并返回影响的行数。
main_dml_example_content:

演示了 DML 操作的完整流程:重建表、插入数据(包括一次预期的失败插入)、更新数据(包括一次对不存在记录的更新尝试)、查询数据、删除数据(包括一次对不存在记录的删除尝试)。
DATE_FORMAT(hire_date, '%Y-%m-%d'): 在 SQL 查询中格式化日期,使其更易读。

独立运行的调整:

为了使 main_dml_example 部分能够独立运行并管理自己的连接池,添加了 init_db_pool_localclose_db_pool_localmain_dml_example_runner。这是为了演示的便利性,实际项目中,连接池管理通常是更集中的。
autocommit=True 是关键,它简化了单个 DML 操作,因为每个成功的 execute 都会被自动提交。

6.4.3. 参数化查询 (Parameterized Queries)

如前面的例子所示,aiomysql (继承自 PyMySQL) 使用 %s 作为占位符进行参数化查询。参数以元组或列表的形式作为第二个参数传递给 cur.execute()

# ... (在协程函数内部,已获取 cur)
user_input_name = "危险的' OR '1'='1" # 一个潜在的SQL注入尝试
user_input_salary = 1000.0

# 安全的方式:使用参数化查询
query = "SELECT * FROM employees WHERE name = %s AND salary > %s;"
args = (user_input_name, user_input_salary)
await cur.execute(query, args) # aiomysql/PyMySQL 会正确处理特殊字符,防止注入

# 不安全的方式:字符串拼接 (绝对禁止!)
# unsafe_query = f"SELECT * FROM employees WHERE name = '{user_input_name}' AND salary > {user_input_salary};"
# await cur.execute(unsafe_query) # 极易受到SQL注入攻击!

始终使用参数化查询是防止 SQL 注入攻击的最重要手段。 aiomysql 底层的 PyMySQL 会负责对传入的参数进行适当的转义和处理,确保它们被当作数据值而不是 SQL 代码片段来对待。

七、aiomysql 中的异步事务管理 (Asynchronous Transaction Management in aiomysql)

事务是一系列数据库操作,这些操作要么全部成功执行,要么在任何一步失败时全部回滚到初始状态,从而保证数据的原子性 (Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability) – 即 ACID 属性。在异步环境中,事务管理需要特别注意,因为协程的并发执行可能引入新的复杂性。

7.1. 显式事务控制 (Explicit Transaction Control)

默认情况下,当你通过 aiomysql.create_pool 创建连接池或 aiomysql.connect 创建单个连接时,可以通过 autocommit 参数控制事务行为。

autocommit=True (某些驱动或配置下的默认值,或如我们之前示例中显式设置的): 每个 SQL 语句执行后都会被视为一个独立的事务并自动提交。这简化了简单操作,但对于需要多条语句原子执行的场景则不适用。
autocommit=False: 这是进行显式事务管理的必要设置。在此模式下,数据库操作在执行后不会立即生效,除非显式调用 commit()

7.1.1. 配置 autocommit=False

对于连接池:

import asyncio
import aiomysql

DB_CONFIG_NO_AUTOCOMMIT = {
            
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'your_user',
    'password': 'your_password',
    'db': 'your_database_tx', # 使用一个专门用于事务测试的数据库
}

# 全局变量,用于存储事务型连接池
tx_db_pool = None

async def init_tx_db_pool():
    """
    异步初始化 MySQL 连接池,并禁用自动提交 (autocommit=False)。
    """
    global tx_db_pool
    if tx_db_pool is None:
        try:
            print("正在初始化事务型连接池 (autocommit=False)...")
            tx_db_pool = await aiomysql.create_pool(
                minsize=2,
                maxsize=10,
                **DB_CONFIG_NO_AUTOCOMMIT,
                loop=asyncio.get_event_loop(),
                autocommit=False # 关键:禁用自动提交以启用手动事务控制
            )
            print("事务型连接池 (autocommit=False) 初始化成功。")
            print(f"连接池状态: min={
              tx_db_pool.minsize}, max={
              tx_db_pool.maxsize}")
        except Exception as e:
            print(f"初始化事务型连接池失败: {
              e}")
            tx_db_pool = None

async def close_tx_db_pool():
    """
    异步关闭事务型 MySQL 连接池。
    """
    global tx_db_pool
    if tx_db_pool:
        tx_db_pool.close()
        await tx_db_pool.wait_closed()
        print("事务型连接池已关闭。")
        tx_db_pool = None

代码解释:

DB_CONFIG_NO_AUTOCOMMIT: 数据库连接配置,建议为事务测试使用独立的数据库或表前缀,避免干扰其他数据。
tx_db_pool: 用于存储配置为 autocommit=False 的连接池。
init_tx_db_pool():

autocommit=False: 这是核心设置。当从这个池中获取连接后,所有的数据库操作都需要显式的 commitrollback

close_tx_db_pool(): 关闭连接池的辅助函数。

7.1.2. 事务操作:begin, commit, rollback

一旦获得了 autocommit=False 的连接,就可以使用以下协程方法来控制事务:

await conn.begin(): 显式开始一个新的事务。虽然在 autocommit=False 模式下,第一个数据操作语句会自动开始一个事务(如果当前没有活动事务),但显式调用 await conn.begin() 是一个良好的编程习惯,它使得事务的边界更加清晰。对于某些数据库系统和驱动,它甚至是必需的。
await conn.commit(): 提交当前事务,将自上次提交/回滚或事务开始以来所做的所有更改永久保存到数据库中。
await conn.rollback(): 回滚当前事务,撤销自上次提交/回滚或事务开始以来所做的所有更改,使数据库恢复到事务开始前的状态。

企业场景:银行转账操作

这是一个经典的事务场景。假设我们有一个 accounts 表,包含用户账户和余额。从一个账户向另一个账户转账需要两个 UPDATE 操作:一个减少源账户余额,一个增加目标账户余额。这两个操作必须要么都成功,要么都失败。

-- 用于事务演示的表结构
CREATE TABLE IF NOT EXISTS accounts (
    account_id VARCHAR(50) PRIMARY KEY,
    owner_name VARCHAR(100) NOT NULL,
    balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0) -- 余额不能为负
) ENGINE=InnoDB;

INSERT IGNORE INTO accounts (account_id, owner_name, balance) VALUES
('ACC001', 'Alice', 1000.00),
('ACC002', 'Bob', 500.00),
('ACC003', 'Charlie_Insufficient', 50.00);

现在,让我们用 aiomysql 实现这个转账操作,并进行详尽的事务控制:

import asyncio
import aiomysql

# --- 前面定义的 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool, init_tx_db_pool, close_tx_db_pool ---
# (确保这些已在您的环境中定义并可访问)
# DB_CONFIG_NO_AUTOCOMMIT = { 'host': '127.0.0.1', ... 'db': 'your_database_tx' }
# tx_db_pool = None
# async def init_tx_db_pool(): ...
# async def close_tx_db_pool(): ...


async def transfer_funds(from_account_id: str, to_account_id: str, amount: float):
    """
    异步执行资金转账操作,包含完整的事务控制。

    参数:
        from_account_id (str): 转出账户ID。
        to_account_id (str): 转入账户ID。
        amount (float): 转账金额,必须为正数。

    返回:
        bool: 转账成功返回 True,否则返回 False。
    """
    if tx_db_pool is None:
        print("错误:事务型数据库连接池未初始化。")
        return False
    if amount <= 0:
        print("错误:转账金额必须为正数。")
        return False
    if from_account_id == to_account_id:
        print("错误:不能给自己转账。") # 业务逻辑校验
        return False

    # 从连接池获取一个连接,用于执行整个事务
    # 注意:这里的连接将保持 autocommit=False 的状态
    async with tx_db_pool.acquire() as conn:
        # 从连接获取游标
        async with conn.cursor(aiomysql.DictCursor) as cur:
            try:
                # 1. 显式开始事务 (良好实践)
                await conn.begin()
                print(f"事务开始:从 {
              from_account_id} 转账 {
              amount} 到 {
              to_account_id}")

                # 2. 检查转出账户余额是否充足
                await cur.execute(
                    "SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE;",
                    (from_account_id,)
                )
                # "FOR UPDATE" 会对选定的行加锁,防止其他事务在此期间修改这些行,
                # 这对于防止并发条件下的余额检查和更新之间出现问题至关重要。
                # 注意:不是所有场景都需要 FOR UPDATE,过度使用可能导致锁竞争。

                from_account = await cur.fetchone()
                if not from_account:
                    print(f"错误:转出账户 {
              from_account_id} 不存在。")
                    await conn.rollback() # 回滚事务
                    print("事务已回滚。")
                    return False

                if from_account['balance'] < amount:
                    print(f"错误:转出账户 {
              from_account_id} 余额不足。当前余额: {
              from_account['balance']}, 需要: {
              amount}")
                    await conn.rollback() # 回滚事务
                    print("事务已回滚。")
                    return False

                # 3. 检查转入账户是否存在 (可选,取决于业务需求)
                await cur.execute(
                    "SELECT account_id FROM accounts WHERE account_id = %s FOR UPDATE;",
                    (to_account_id,)
                )
                # 对目标账户也加锁,如果后续操作非常依赖其状态,或者防止其被删除等。
                to_account_exists = await cur.fetchone()
                if not to_account_exists:
                    print(f"错误:转入账户 {
              to_account_id} 不存在。")
                    await conn.rollback() # 回滚事务
                    print("事务已回滚。")
                    return False

                # 4. 从转出账户扣款
                update_from_query = "UPDATE accounts SET balance = balance - %s WHERE account_id = %s;"
                await cur.execute(update_from_query, (amount, from_account_id))
                if cur.rowcount != 1: # 确保确实有一行被更新了
                    print(f"错误:从账户 {
              from_account_id} 扣款失败或账户状态异常。")
                    await conn.rollback()
                    print("事务已回滚 (扣款影响行数非1)。")
                    return False
                print(f"账户 {
              from_account_id} 已扣款 {
              amount}。")


                # --- 模拟一个可能发生的错误,以测试回滚逻辑 ---
                # if to_account_id == "ACC_FAIL_TRANSFER": # 特定账户ID用于触发模拟失败
                #     raise Exception("模拟向目标账户增款时发生意外故障!")
                # --- 模拟结束 ---

                # 5. 向转入账户拨款
                update_to_query = "UPDATE accounts SET balance = balance + %s WHERE account_id = %s;"
                await cur.execute(update_to_query, (amount, to_account_id))
                if cur.rowcount != 1: # 确保确实有一行被更新了
                    print(f"错误:向账户 {
              to_account_id} 拨款失败或账户状态异常。")
                    await conn.rollback()
                    print("事务已回滚 (拨款影响行数非1)。")
                    return False
                print(f"账户 {
              to_account_id} 已收款 {
              amount}。")

                # 6. 所有操作成功,提交事务
                await conn.commit()
                print(f"转账成功!事务已提交。从 {
              from_account_id} 转账 {
              amount} 到 {
              to_account_id}")
                return True

            except aiomysql.MySQLError as db_err: # 捕获 aiomysql 特有的数据库错误
                print(f"数据库操作错误,事务回滚: {
              db_err}")
                if conn.connected and not conn.closed: # 检查连接状态以安全回滚
                    try:
                        await conn.rollback()
                        print("事务因数据库错误已回滚。")
                    except Exception as rb_err:
                        print(f"回滚事务时发生错误: {
              rb_err}")
                return False
            except Exception as e: # 捕获其他任何Python异常
                print(f"发生意外错误,事务回滚: {
              e}")
                if conn.connected and not conn.closed:
                    try:
                        await conn.rollback()
                        print("事务因意外错误已回滚。")
                    except Exception as rb_err:
                        print(f"回滚事务时发生错误: {
              rb_err}")
                return False

async def check_balances(*account_ids):
    """
    异步查询并打印指定账户的余额。
    """
    if tx_db_pool is None:
        print("错误:事务型数据库连接池未初始化。")
        return

    async with tx_db_pool.acquire() as conn:
        async with conn.cursor(aiomysql.DictCursor) as cur:
            # 在 autocommit=False 的连接上执行 SELECT 通常不需要显式事务
            # 但如果希望读取的是已提交的数据(而不是当前未提交事务中的数据,这取决于隔离级别)
            # 则此查询本身也应在事务之外,或者确保之前的事务已结束。
            # 对于简单的余额检查,通常读取的是最新提交状态。
            # 如果在事务中执行 SELECT,它将遵循该事务的隔离级别。
            print("
--- 当前账户余额 ---")
            for acc_id in account_ids:
                await cur.execute("SELECT owner_name, balance FROM accounts WHERE account_id = %s;", (acc_id,))
                account = await cur.fetchone()
                if account:
                    print(f"  账户: {
              acc_id} ({
              account['owner_name']}), 余额: {
              account['balance']}")
                else:
                    print(f"  账户: {
              acc_id} 未找到。")

async def main_transaction_example():
    """
    主函数,演示事务性转账操作。
    """
    # 初始化事务型连接池
    await init_tx_db_pool()
    if not tx_db_pool:
        print("事务型连接池初始化失败,程序退出。")
        return

    # 准备演示表 (确保表存在且有初始数据)
    async with tx_db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            # 因为 autocommit=False, DDL 和初始数据插入也需要 commit
            await conn.begin() # 开始一个事务来设置环境
            await cur.execute("DROP TABLE IF EXISTS accounts;") # 清理旧表
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    account_id VARCHAR(50) PRIMARY KEY,
                    owner_name VARCHAR(100) NOT NULL,
                    balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0)
                ) ENGINE=InnoDB;
            """)
            print("表 'accounts' 已创建/重建。")
            initial_data = [
                ('ACC001', 'Alice', 1000.00),
                ('ACC002', 'Bob', 500.00),
                ('ACC003', 'Charlie_Insufficient', 50.00),
                ('ACC004', 'David_Receiver', 200.00),
                ('ACC_FAIL_TRANSFER', 'Eve_Fail', 100.00) # 用于模拟失败的账户
            ]
            for acc_id, owner, bal in initial_data:
                await cur.execute(
                    "INSERT INTO accounts (account_id, owner_name, balance) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE balance = VALUES(balance);",
                    (acc_id, owner, bal)
                )
            await conn.commit() # 提交表结构和初始数据
            print("初始账户数据已插入并提交。")

    # 场景1: 成功转账
    print("
--- 场景1: Alice 向 Bob 转账 100.00 ---")
    await check_balances('ACC001', 'ACC002')
    success1 = await transfer_funds('ACC001', 'ACC002', 100.00)
    print(f"转账操作结果: {
              '成功' if success1 else '失败'}")
    await check_balances('ACC001', 'ACC002')

    # 场景2: 余额不足导致转账失败
    print("
--- 场景2: Charlie 向 David 转账 200.00 (余额不足) ---")
    await check_balances('ACC003', 'ACC004')
    success2 = await transfer_funds('ACC003', 'ACC004', 200.00)
    print(f"转账操作结果: {
              '成功' if success2 else '失败'}")
    await check_balances('ACC003', 'ACC004') # 余额应保持不变

    # 场景3: 转出账户不存在
    print("
--- 场景3: 不存在的账户 NONEXIST001 向 Bob 转账 50.00 ---")
    await check_balances('NONEXIST001', 'ACC002')
    success3 = await transfer_funds('NONEXIST001', 'ACC002', 50.00)
    print(f"转账操作结果: {
              '成功' if success3 else '失败'}")
    await check_balances('ACC002') # Bob的余额应保持不变

    # 场景4: 转账过程中发生模拟的意外错误 (如果启用了模拟错误代码)
    # print("
--- 场景4: Alice 向 Eve_Fail 转账 50.00 (模拟拨款失败) ---")
    # await check_balances('ACC001', 'ACC_FAIL_TRANSFER')
    # success4 = await transfer_funds('ACC001', 'ACC_FAIL_TRANSFER', 50.00)
    # print(f"转账操作结果: {'成功' if success4 else '失败'}")
    # await check_balances('ACC001', 'ACC_FAIL_TRANSFER') # 双方余额都应保持不变,因为事务会回滚

    # 场景5: 转账金额为0或负数 (业务逻辑校验)
    print("
--- 场景5: Alice 向 Bob 转账 -50.00 (无效金额) ---")
    success5 = await transfer_funds('ACC001', 'ACC002', -50.00)
    print(f"转账操作结果: {
              '成功' if success5 else '失败'}")
    await check_balances('ACC001', 'ACC002') # 余额应保持不变

    # 清理并关闭连接池
    await close_tx_db_pool()

if __name__ == "__main__":
    asyncio.run(main_transaction_example())

代码解释 transfer_funds 函数:

获取连接: async with tx_db_pool.acquire() as conn: 从配置为 autocommit=False 的池中获取连接。
获取游标: async with conn.cursor(aiomysql.DictCursor) as cur:
await conn.begin(): 显式启动一个事务。这是一个好习惯,能清晰地标记事务的开始。
余额检查与 FOR UPDATE:

SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE;: 在读取转出账户余额时,使用了 FOR UPDATE 子句。
作用: FOR UPDATE 会对查询匹配到的行(在这里是 from_account_id 对应的行)施加一个排他锁(X锁)。这意味着在当前事务提交或回滚之前,其他尝试读取这些行(如果也用 FOR UPDATELOCK IN SHARE MODE)或尝试更新/删除这些行的事务将会被阻塞。
原因: 这是为了防止并发场景下的“丢失更新”或“脏读”问题。例如,如果没有 FOR UPDATE,两个并发的转账请求可能同时读取到足够的余额,然后都进行扣款,导致余额透支。FOR UPDATE 确保了在检查余额和执行扣款之间,该账户的余额不会被其他事务修改。

条件判断与回滚:

如果账户不存在或余额不足,打印错误信息,然后调用 await conn.rollback() 来撤销事务开始以来的所有操作(在这个阶段,实际上还没有执行任何修改操作,但回滚是标准流程)。

执行 UPDATE:

UPDATE accounts SET balance = balance - %s WHERE account_id = %s;
UPDATE accounts SET balance = balance + %s WHERE account_id = %s;
cur.rowcount != 1: 检查每个 UPDATE 语句是否确实影响了预期的行数(通常是1行)。如果不是,可能意味着账户ID错误,或者账户在此期间被意外删除(尽管 FOR UPDATE 应该能部分防止这种情况),这通常表示一个异常状态,应该回滚事务。

模拟错误: 代码中注释掉的部分 if to_account_id == "ACC_FAIL_TRANSFER": raise Exception(...) 可以用来测试当事务中间发生Python异常时的回滚逻辑。
await conn.commit(): 如果所有操作都成功执行(余额充足,账户存在,更新操作影响了正确的行数),则调用此方法将所有更改永久写入数据库。
异常处理与回滚:

try...except aiomysql.MySQLError as db_err:: 捕获特定于数据库驱动的错误。
try...except Exception as e:: 捕获任何其他Python层面可能发生的异常。
在任何捕获到异常的情况下,都会尝试调用 await conn.rollback() 来确保数据的一致性。
if conn.connected and not conn.closed:: 在尝试回滚前检查连接状态是一个好习惯,避免在已关闭的连接上操作。

代码解释 main_transaction_example 函数:

初始化与清理: 调用 init_tx_db_pool()。在开始时,它会清理并重建 accounts 表,并插入一些初始数据。注意,由于连接池是 autocommit=False,所以 DDL 和初始 INSERT 操作也需要包裹在 conn.begin()conn.commit() 之间。
场景演示: 通过多个转账尝试来演示事务的各种行为:成功、因余额不足失败、因账户不存在失败等。
check_balances: 一个辅助函数,用于在操作前后查询并显示账户余额,以验证事务的效果。
关闭连接池: await close_tx_db_pool()

这个转账示例清晰地展示了在 aiomysql 中如何通过 begin, commit, rollback 以及结合错误处理来实现可靠的异步事务。FOR UPDATE 的使用是处理并发修改的关键点,但它也可能增加锁竞争,需要根据实际应用的并发模式和性能要求来权衡。

7.2. 使用异步上下文管理器进行事务管理 (Transactions with Asynchronous Context Managers)

虽然显式的 begin, commit, rollback 调用是有效的,但它们可能会使代码显得冗长,并且容易忘记在所有可能的代码路径(包括异常路径)中正确地调用 commitrollback。Python 的异步上下文管理器 (async with) 提供了一种更优雅、更安全的方式来管理资源,包括事务。

我们可以创建一个自定义的异步上下文管理器来封装事务逻辑。

import asyncio
import aiomysql

# --- 前面定义的 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool, init_tx_db_pool, close_tx_db_pool ---

class AsyncTransactionManager:
    """
    一个异步上下文管理器,用于简化 aiomysql 中的事务处理。
    它从连接池获取连接,并自动处理事务的开始、提交和回滚。
    """
    def __init__(self, pool, cursor_type=aiomysql.DictCursor):
        """
        初始化事务管理器。

        参数:
            pool: aiomysql 连接池 (必须配置为 autocommit=False)。
            cursor_type: 希望游标返回结果的类型,例如 aiomysql.DictCursor 或 aiomysql.Cursor。
        """
        if pool is None or pool.autocommit is True:
            raise ValueError("连接池必须提供且配置为 autocommit=False")
        self._pool = pool
        self._conn = None  # 将在 __aenter__ 中获取
        self._cursor_type = cursor_type
        self.cur = None    # 游标将在 __aenter__ 中创建并对外暴露

    async def __aenter__(self):
        """
        异步进入上下文时调用。获取连接,开始事务,并返回游标。
        """
        self._conn = await self._pool.acquire() # 从池中获取连接
        try:
            await self._conn.begin() # 显式开始事务
            self.cur = await self._conn.cursor(self._cursor_type) # 创建游标
            print("AsyncTransactionManager: 事务开始,游标已创建。")
            return self.cur # 返回游标供 'as' 子句使用
        except Exception as e:
            # 如果在 __aenter__ 中发生错误(例如 begin() 失败),需要释放连接
            if self._conn:
                self._pool.release(self._conn) # 归还连接给池
                self._conn = None
            print(f"AsyncTransactionManager: __aenter__ 失败: {
              e}")
            raise # 重新抛出异常

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        异步退出上下文时调用。根据是否发生异常来提交或回滚事务,并关闭游标和连接。

        参数:
            exc_type: 异常类型 (如果上下文中没有异常,则为 None)。
            exc_val: 异常实例 (如果上下文中没有异常,则为 None)。
            exc_tb: 异常的 traceback (如果上下文中没有异常,则为 None)。
        """
        if self.cur:
            await self.cur.close() # 关闭游标
            print("AsyncTransactionManager: 游标已关闭。")

        if not self._conn: # 如果连接在 aenter 中未能成功获取
            return False # 指示不抑制异常 (如果存在)

        try:
            if exc_type is not None: # 如果在 'async with' 块内发生了异常
                print(f"AsyncTransactionManager: 检测到异常 ({
              exc_type}),正在回滚事务...")
                await self._conn.rollback()
                print("AsyncTransactionManager: 事务已回滚。")
            else: # 如果没有异常发生
                print("AsyncTransactionManager: 未检测到异常,正在提交事务...")
                await self._conn.commit()
                print("AsyncTransactionManager: 事务已提交。")
        except Exception as e_tx:
            # 如果提交或回滚本身失败,这是一个严重的问题
            print(f"AsyncTransactionManager: 在提交/回滚事务时发生错误: {
              e_tx}")
            # 即使提交/回滚失败,也要确保连接被释放
            # 决定是否抑制原始异常 (exc_type) 还是这个新的异常 (e_tx)
            # 通常,不应抑制原始异常,除非这个新异常更关键
            if exc_type is None: # 如果原始块没有异常,但提交/回滚有,则抛出新的
                raise e_tx
            # 否则,原始异常 (exc_type) 将被 async with 语句重新抛出
        finally:
            if self._conn:
                self._pool.release(self._conn) # 无论如何,都要将连接归还给池
                self._conn = None
                print("AsyncTransactionManager: 连接已释放回池。")

        # 返回 False 表示不抑制在 'async with' 块中发生的异常 (如果有的话)
        # 如果返回 True,则会抑制异常 (通常不推荐,除非明确处理了它)
        return False


async def transfer_funds_with_cm(from_account_id: str, to_account_id: str, amount: float):
    """
    使用 AsyncTransactionManager 上下文管理器执行资金转账。
    """
    if tx_db_pool is None: return False
    if amount <= 0: return False
    if from_account_id == to_account_id: return False

    try:
        # 使用异步上下文管理器
        async with AsyncTransactionManager(tx_db_pool) as cur: # cur 就是 __aenter__ 返回的游标
            print(f"上下文管理器事务:从 {
              from_account_id} 转账 {
              amount} 到 {
              to_account_id}")

            # 1. 检查转出账户余额 (逻辑与之前类似,但现在在 'async with' 块内)
            await cur.execute(
                "SELECT balance FROM accounts WHERE account_id = %s FOR UPDATE;",
                (from_account_id,)
            )
            from_account = await cur.fetchone()
            if not from_account:
                print(f"错误:转出账户 {
              from_account_id} 不存在。")
                # 不需要手动 rollback,上下文管理器会处理
                raise ValueError(f"转出账户 {
              from_account_id} 不存在") # 抛出异常触发回滚

            if from_account['balance'] < amount:
                print(f"错误:转出账户 {
              from_account_id} 余额不足。")
                raise ValueError(f"账户 {
              from_account_id} 余额不足") # 抛出异常触发回滚

            # 2. 检查转入账户是否存在
            await cur.execute(
                "SELECT account_id FROM accounts WHERE account_id = %s FOR UPDATE;",
                (to_account_id,)
            )
            if not await cur.fetchone():
                print(f"错误:转入账户 {
              to_account_id} 不存在。")
                raise ValueError(f"转入账户 {
              to_account_id} 不存在")

            # 3. 从转出账户扣款
            await cur.execute(
                "UPDATE accounts SET balance = balance - %s WHERE account_id = %s;",
                (amount, from_account_id)
            )
            if cur.rowcount != 1:
                raise RuntimeError(f"从账户 {
              from_account_id} 扣款失败")

            print(f"账户 {
              from_account_id} 已通过上下文管理器扣款 {
              amount}。")

            # 模拟一个可能发生的错误
            # if to_account_id == "ACC_FAIL_TRANSFER_CM":
            #     raise Exception("模拟上下文管理器内拨款失败!")

            # 4. 向转入账户拨款
            await cur.execute(
                "UPDATE accounts SET balance = balance + %s WHERE account_id = %s;",
                (amount, to_account_id)
            )
            if cur.rowcount != 1:
                raise RuntimeError(f"向账户 {
              to_account_id} 拨款失败")

            print(f"账户 {
              to_account_id} 已通过上下文管理器收款 {
              amount}。")
            # 如果代码执行到这里没有抛出任何异常,__aexit__ 将会自动提交事务

        # 如果 async with 块成功完成(没有未捕获的异常)
        print(f"上下文管理器转账成功!从 {
              from_account_id} 转账 {
              amount} 到 {
              to_account_id}")
        return True

    except ValueError as ve: # 捕获我们自己定义的业务逻辑错误
        print(f"转账业务逻辑错误 (上下文管理器): {
              ve}")
        return False
    except RuntimeError as rte: # 捕获我们自己定义的运行时错误
        print(f"转账运行时错误 (上下文管理器): {
              rte}")
        return False
    except aiomysql.MySQLError as db_err: # 捕获数据库特定错误
        print(f"数据库操作错误 (上下文管理器): {
              db_err}")
        return False
    except Exception as e: # 捕获其他任何意外异常
        print(f"意外错误 (上下文管理器): {
              e}")
        return False

async def main_context_manager_example():
    await init_tx_db_pool() # 使用之前定义的事务型连接池初始化
    if not tx_db_pool:
        print("事务型连接池 (上下文管理器示例) 初始化失败,程序退出。")
        return

    # 确保表和数据存在 (与之前 main_transaction_example 中的设置类似)
    async with tx_db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            await conn.begin()
            await cur.execute("DROP TABLE IF EXISTS accounts;")
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    account_id VARCHAR(50) PRIMARY KEY,
                    owner_name VARCHAR(100) NOT NULL,
                    balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0)
                ) ENGINE=InnoDB;
            """)
            initial_data = [
                ('ACC_CM_001', 'Alice_CM', 1000.00),
                ('ACC_CM_002', 'Bob_CM', 500.00),
                ('ACC_CM_003', 'Charlie_CM_Insufficient', 50.00),
                ('ACC_FAIL_TRANSFER_CM', 'Eve_CM_Fail', 100.00)
            ]
            for acc_id, owner, bal in initial_data:
                await cur.execute(
                    "INSERT INTO accounts (account_id, owner_name, balance) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE balance = VALUES(balance);",
                    (acc_id, owner, bal)
                )
            await conn.commit()
            print("初始账户数据已为上下文管理器示例插入并提交。")

    print("
--- 场景CM 1: Alice_CM 向 Bob_CM 转账 100.00 (成功) ---")
    await check_balances('ACC_CM_001', 'ACC_CM_002') # check_balances 复用之前的定义
    res_cm1 = await transfer_funds_with_cm('ACC_CM_001', 'ACC_CM_002', 100.00)
    print(f"上下文管理器转账结果: {
              '成功' if res_cm1 else '失败'}")
    await check_balances('ACC_CM_001', 'ACC_CM_002')

    print("
--- 场景CM 2: Charlie_CM 向 Bob_CM 转账 200.00 (余额不足) ---")
    await check_balances('ACC_CM_003', 'ACC_CM_002')
    res_cm2 = await transfer_funds_with_cm('ACC_CM_003', 'ACC_CM_002', 200.00)
    print(f"上下文管理器转账结果: {
              '成功' if res_cm2 else '失败'}")
    await check_balances('ACC_CM_003', 'ACC_CM_002') # 余额应不变

    # print("
--- 场景CM 3: Alice_CM 向 Eve_CM_Fail 转账 50.00 (模拟内部错误) ---")
    # await check_balances('ACC_CM_001', 'ACC_FAIL_TRANSFER_CM')
    # res_cm3 = await transfer_funds_with_cm('ACC_CM_001', 'ACC_FAIL_TRANSFER_CM', 50.00)
    # print(f"上下文管理器转账结果: {'成功' if res_cm3 else '失败'}")
    # await check_balances('ACC_CM_001', 'ACC_FAIL_TRANSFER_CM') # 余额应不变

    await close_tx_db_pool()

if __name__ == "__main__":
    # 为了独立运行此示例,需要包含或模拟 init_tx_db_pool, close_tx_db_pool, check_balances
    # 和 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool
    # (此处假设它们已在全局或可访问范围内定义,如同一个脚本中的前一部分)
    DB_CONFIG_NO_AUTOCOMMIT = {
            
        'host': '127.0.0.1', 'port': 3306, 'user': 'your_user',
        'password': 'your_password', 'db': 'your_database_tx_cm', # 使用新库
    }
    tx_db_pool = None

    # (复用/重定义 init_tx_db_pool, close_tx_db_pool, check_balances 以便脚本可独立运行)
    async def init_tx_db_pool():
        global tx_db_pool
        if tx_db_pool is None:
            try:
                tx_db_pool = await aiomysql.create_pool(
                    minsize=1, maxsize=5, **DB_CONFIG_NO_AUTOCOMMIT,
                    loop=asyncio.get_event_loop(), autocommit=False
                )
                print("事务型连接池 (CM example) 初始化成功。")
            except Exception as e:
                print(f"初始化事务型连接池 (CM example) 失败: {
              e}")
                tx_db_pool = None
    async def close_tx_db_pool():
        global tx_db_pool
        if tx_db_pool:
            tx_db_pool.close()
            await tx_db_pool.wait_closed()
            print("事务型连接池 (CM example) 已关闭。")
            tx_db_pool = None
    async def check_balances(*account_ids): # 简化版,实际应复用之前的
        if tx_db_pool is None: return
        async with tx_db_pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                print("
--- 当前账户余额 (CM Example) ---")
                for acc_id in account_ids:
                    await cur.execute("SELECT owner_name, balance FROM accounts WHERE account_id = %s;", (acc_id,))
                    account = await cur.fetchone()
                    if account: print(f"  账户: {
              acc_id}, 余额: {
              account['balance']}")
                    else: print(f"  账户: {
              acc_id} 未找到。")

    asyncio.run(main_context_manager_example())

代码解释 AsyncTransactionManager:

__init__(self, pool, cursor_type):

接收一个已配置为 autocommit=Falseaiomysql 连接池。
如果池未提供或 autocommitTrue,则抛出 ValueError
存储池和期望的游标类型。

async def __aenter__(self):

self._conn = await self._pool.acquire(): 从池中异步获取一个数据库连接。
await self._conn.begin(): 显式开始一个新事务。
self.cur = await self._conn.cursor(self._cursor_type): 创建一个游标。
return self.cur: 将游标返回,这样在 async with ... as cur: 语句中,cur 就可以直接使用了。
错误处理: 如果在 __aenter__(例如 begin() 失败)中发生错误,它会确保释放已获取的连接,然后重新抛出异常,这样 async with 块就不会执行。

async def __aexit__(self, exc_type, exc_val, exc_tb):

async with 块执行完毕或块内发生未捕获的异常时,此方法会被调用。
exc_type, exc_val, exc_tb: 这三个参数包含了异常信息。如果 async with 块正常完成(没有异常),它们都将是 None
关闭游标: if self.cur: await self.cur.close()
事务处理:

if exc_type is not None:: 如果 async with 块内部发生了异常,意味着事务应该回滚。调用 await self._conn.rollback()
else:: 如果没有异常发生,意味着事务中的所有操作都成功了(或者至少没有抛出Python异常),调用 await self._conn.commit()

错误处理中的错误处理: 如果 commit()rollback() 本身失败,它会打印错误。
finally: 无论 commitrollback 是否成功,self._pool.release(self._conn) 都会被调用,以确保连接总是被归还到池中。这是至关重要的,否则连接池中的连接会耗尽。
return False: 表示如果 async with 块内发生了异常,__aexit__ 方法不会抑制(swallow)这个异常,异常会继续向上传播。这是标准行为。如果你想抑制异常(例如,因为你已经完全处理了它),你可以返回 True

transfer_funds_with_cm 函数:

基本逻辑与 transfer_funds 相同,但事务的开始、提交、回滚以及连接和游标的管理都由 AsyncTransactionManager 自动处理。
如果发生业务逻辑错误(如余额不足),函数会 raise ValueError(...)。这个异常会被 AsyncTransactionManager__aexit__ 捕获,导致事务回滚。
代码更加简洁,因为不需要显式的 conn.begin(), conn.commit(), conn.rollback() 调用,也不需要在每个错误路径上手动管理连接释放。
外层的 try...except 块现在主要用于捕获并记录由 AsyncTransactionManager 传播出来的或在管理器之外发生的错误,并返回一个表示操作成功与否的布尔值。

优势:

代码更简洁: 减少了事务管理的样板代码。
更安全: 自动处理提交和回滚,减少了因忘记调用这些方法而导致数据不一致的风险。连接和游标的释放也得到了保证。
已关注业务逻辑: 开发者可以更专注于实现核心的数据库操作逻辑。

何时选择:

对于任何涉及多步数据库修改且需要原子性的操作,使用事务上下文管理器通常是更推荐的方式。
如果需要非常细致的、非典型的事务控制流程(例如,在事务中多次提交部分工作,这通常不推荐),则可能需要显式控制。

7.3. 异步事务中的保存点 (Savepoints in Asynchronous Transactions)

事务保存点 (Savepoints) 提供了一种在事务内部创建“检查点”的机制。如果后续操作失败,你可以选择回滚到某个保存点,而不是回滚整个事务。这对于实现更复杂的、可能包含可选操作或多阶段提交逻辑的事务非常有用。

aiomysql (通过其底层的 PyMySQL) 支持 SQL标准的保存点命令:

await conn.savepoint(name: str): 在当前事务中创建一个名为 name 的保存点。
await conn.rollback_savepoint(name: str): 回滚到名为 name 的保存点。此操作会撤销在该保存点之后执行的所有更改,但保存点本身仍然存在,事务也仍然活动。在该保存点之前所做的更改不受影响。
await conn.release_savepoint(name: str): 销毁名为 name 的保存点。这通常在不再需要回滚到该保存点时执行,可以释放一些数据库资源。注意:在某些数据库系统中,回滚到某个保存点后,其后的保存点可能会自动被销毁。提交整个事务或回滚整个事务也会销毁所有保存点。

企业场景:复杂订单处理流程

假设一个订单处理系统,创建订单涉及多个步骤:

创建订单主记录 (核心操作,必须成功)。
尝试扣减商品库存 (核心操作,必须成功)。
可选:为用户应用优惠券 (如果失败,不应导致整个订单失败,但应记录下来)。
可选:记录用户积分变更 (如果失败,也不应导致整个订单失败)。
最终确认订单。

如果核心操作(1和2)失败,整个事务回滚。如果可选操作(3或4)失败,我们可能只想撤销该可选操作本身,然后继续处理订单,或者标记该可选操作失败并继续。

import asyncio
import aiomysql

# --- 假设 DB_CONFIG_NO_AUTOCOMMIT, tx_db_pool, init_tx_db_pool, close_tx_db_pool,
# --- AsyncTransactionManager, check_balances (修改为检查库存和优惠券) 已定义 ---

# 模拟表结构 (简化)
# accounts 表 (复用,假设有用户账户用于支付)
# products 表
# CREATE TABLE IF NOT EXISTS products (
#     product_id VARCHAR(50) PRIMARY KEY,
#     name VARCHAR(100),
#     stock INT UNSIGNED NOT NULL,
#     price DECIMAL(10,2) NOT NULL
# ) ENGINE=InnoDB;
# orders 表
# CREATE TABLE IF NOT EXISTS orders (
#     order_id VARCHAR(50) PRIMARY KEY,
#     user_account_id VARCHAR(50),
#     total_amount DECIMAL(10,2),
#     status VARCHAR(20) DEFAULT 'PENDING', -- PENDING, PROCESSING, COMPLETED, FAILED_COUPON, FAILED_POINTS
#     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
#     FOREIGN KEY (user_account_id) REFERENCES accounts(account_id)
# ) ENGINE=InnoDB;
# order_items 表
# CREATE TABLE IF NOT EXISTS order_items (
#     item_id INT AUTO_INCREMENT PRIMARY KEY,
#     order_id VARCHAR(50),
#     product_id VARCHAR(50),
#     quantity INT,
#     price_per_unit DECIMAL(10,2),
#     FOREIGN KEY (order_id) REFERENCES orders(order_id),
#     FOREIGN KEY (product_id) REFERENCES products(product_id)
# ) ENGINE=InnoDB;
# coupons 表
# CREATE TABLE IF NOT EXISTS coupons (
#     coupon_code VARCHAR(50) PRIMARY KEY,
#     discount_amount DECIMAL(10,2),
#     is_valid BOOLEAN DEFAULT TRUE
# ) ENGINE=InnoDB;


async def process_order_with_savepoints(
    user_id: str,
    product_details: list, # [{'product_id': 'P001', 'quantity': 1}, ...]
    coupon_code: str = None
):
    """
    使用保存点处理复杂订单流程。
    """
    if tx_db_pool is None: return "连接池未初始化"

    order_id = f"ORD_{
              asyncio.get_running_loop().time()}" # 简易订单ID
    final_order_status = "PROCESSING" # 初始期望状态

    # 使用我们之前定义的 AsyncTransactionManager
    # 注意:conn 对象本身在 AsyncTransactionManager 内部,如果需要直接调用 conn.savepoint 等,
    # Manager 需要暴露 conn 或者提供包装方法。
    # 为简单起见,我们这里直接在获取连接后操作,不使用 AsyncTransactionManager,
    # 以便清晰展示 savepoint 的用法。如果用 Manager,Manager 需要扩展。

    async with tx_db_pool.acquire() as conn: # 确保 autocommit=False
        async with conn.cursor(aiomysql.DictCursor) as cur:
            try:
                await conn.begin() # 开始主事务
                print(f"订单 {
              order_id}: 主事务开始。")

                # 1. 创建订单主记录
                total_amount_calculated = 0 # 稍后计算
                await cur.execute(
                    "INSERT INTO orders (order_id, user_account_id, total_amount, status) VALUES (%s, %s, %s, %s);",
                    (order_id, user_id, 0, 'PENDING_CALCULATION') # 初始总金额为0,状态待计算
                )
                if cur.rowcount != 1:
                    raise RuntimeError("创建订单主记录失败。")
                print(f"订单 {
              order_id}: 主记录已创建。")

                # 2. 处理订单项并扣减库存 (核心)
                for item in product_details:
                    pid, qty = item['product_id'], item['quantity']
                    # 获取产品价格和当前库存 (加锁以确保一致性)
                    await cur.execute(
                        "SELECT price, stock FROM products WHERE product_id = %s FOR UPDATE;", (pid,)
                    )
                    product_info = await cur.fetchone()
                    if not product_info:
                        raise ValueError(f"产品 {
              pid} 不存在。")
                    if product_info['stock'] < qty:
                        raise ValueError(f"产品 {
              pid} 库存不足 (需要 {
              qty}, 现有 {
              product_info['stock']})。")

                    # 扣减库存
                    await cur.execute(
                        "UPDATE products SET stock = stock - %s WHERE product_id = %s;", (qty, pid)
                    )
                    if cur.rowcount != 1:
                        raise RuntimeError(f"扣减产品 {
              pid} 库存失败。")

                    item_price = product_info['price']
                    total_amount_calculated += item_price * qty
                    # 插入订单项
                    await cur.execute(
                        "INSERT INTO order_items (order_id, product_id, quantity, price_per_unit) VALUES (%s, %s, %s, %s);",
                        (order_id, pid, qty, item_price)
                    )
                    print(f"订单 {
              order_id}: 产品 {
              pid} (数量 {
              qty}) 已添加,库存已扣减。")

                # 更新订单总金额
                await cur.execute(
                    "UPDATE orders SET total_amount = %s, status = 'PENDING_PAYMENT' WHERE order_id = %s;",
                    (total_amount_calculated, order_id)
                )
                print(f"订单 {
              order_id}: 总金额 {
              total_amount_calculated} 已计算。")

                # --- 可选操作:应用优惠券 ---
                if coupon_code:
                    savepoint_coupon = "sp_coupon_application"
                    await conn.savepoint(savepoint_coupon) # 创建优惠券应用的保存点
                    print(f"订单 {
              order_id}: 创建保存点 '{
              savepoint_coupon}'。")
                    try:
                        await cur.execute(
                            "SELECT discount_amount, is_valid FROM coupons WHERE coupon_code = %s FOR UPDATE;",
                            (coupon_code,)
                        )
                        coupon_info = await cur.fetchone()
                        if not coupon_info or not coupon_info['is_valid']:
                            raise ValueError(f"优惠券 '{
              coupon_code}' 无效或不存在。")

                        discount = coupon_info['discount_amount']
                        # 假设优惠券使用后失效
                        await cur.execute("UPDATE coupons SET is_valid = FALSE WHERE coupon_code = %s;", (coupon_code,))
                        if cur.rowcount != 1:
                             raise RuntimeError(f"更新优惠券 '{
              coupon_code}' 状态失败。")

                        total_amount_calculated -= discount
                        if total_amount_calculated < 0: total_amount_calculated = 0 # 防止负金额

                        await cur.execute(
                            "UPDATE orders SET total_amount = %s, status = 'PENDING_PAYMENT_COUPON_APPLIED' WHERE order_id = %s;",
                            (total_amount_calculated, order_id)
                        )
                        print(f"订单 {
              order_id}: 优惠券 '{
              coupon_code}' 应用成功,折扣 {
              discount}。新总额: {
              total_amount_calculated}")
                        await conn.release_savepoint(savepoint_coupon) # 成功,释放保存点 (可选)
                        print(f"订单 {
              order_id}: 释放保存点 '{
              savepoint_coupon}'。")
                    except (ValueError, RuntimeError) as e_coupon:
                        print(f"订单 {
              order_id}: 应用优惠券 '{
              coupon_code}' 失败: {
              e_coupon}。回滚到 '{
              savepoint_coupon}'。")
                        await conn.rollback_savepoint(savepoint_coupon) # 回滚优惠券操作
                        # 即使优惠券失败,订单本身可能仍然有效,但状态需要更新
                        final_order_status = "PENDING_PAYMENT_COUPON_FAILED"
                        await cur.execute( # 更新订单状态以反映优惠券失败
                            "UPDATE orders SET status = %s WHERE order_id = %s;",
                            (final_order_status, order_id)
                        )
                        print(f"订单 {
              order_id}: 优惠券操作已回滚,订单状态更新为 '{
              final_order_status}'。")
                        # 注意:此时 total_amount_calculated 仍然是应用优惠券前的金额,因为回滚了。
                        # 需要重新获取或基于之前的值。为简化,我们假设主流程继续。

                # --- 假设还有其他可选操作,如积分 (类似逻辑,使用另一个savepoint) ---

                # 最终提交或根据情况更新订单状态
                if final_order_status.endswith("_FAILED"): # 如果有可选操作失败了
                     print(f"订单 {
              order_id}: 由于可选操作失败,最终状态为 '{
              final_order_status}'。")
                else:
                    final_order_status = "PROCESSING" # 如果一切顺利
                    await cur.execute(
                        "UPDATE orders SET status = %s WHERE order_id = %s;",
                        (final_order_status, order_id)
                    )
                print(f"订单 {
              order_id}: 所有操作完成。准备提交主事务。最终状态: '{
              final_order_status}'")
                await conn.commit() # 提交整个事务
                print(f"订单 {
              order_id}: 主事务已提交。")
                return f"订单 {
              order_id} 处理完成,状态: {
              final_order_status}, 总金额: {
              total_amount_calculated if not final_order_status.endswith('_FAILED') else '查看订单详情'}"

            except (ValueError, RuntimeError, aiomysql.MySQLError) as e_main:
                print(f"订单 {
              order_id}: 主事务发生错误: {
              e_main}。正在回滚整个事务...")
                if conn.connected and not conn.closed: # 确保连接有效
                    await conn.rollback()
                    print(f"订单 {
              order_id}: 主事务已回滚。")
                return f"订单 {
              order_id} 处理失败: {
              e_main}"
            except Exception as e_unexpected:
                print(f"订单 {
              order_id}: 发生意外错误: {
              e_unexpected}。正在回滚整个事务...")
                if conn.connected and not conn.closed:
                    await conn.rollback()
                    print(f"订单 {
              order_id}: 主事务因意外错误已回滚。")
                return f"订单 {
              order_id} 处理失败 (意外): {
              e_unexpected}"

async def setup_savepoint_demo_tables():
    if tx_db_pool is None: await init_tx_db_pool() # 确保池已初始化
    async with tx_db_pool.acquire() as conn:
        async with conn.cursor() as cur:
            await conn.begin()
            await cur.execute("DROP TABLE IF EXISTS order_items;")
            await cur.execute("DROP TABLE IF EXISTS orders;")
            await cur.execute("DROP TABLE IF EXISTS coupons;")
            await cur.execute("DROP TABLE IF EXISTS products;")
            # accounts 表通常已存在,但确保它存在
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    account_id VARCHAR(50) PRIMARY KEY, owner_name VARCHAR(100) NOT NULL,
                    balance DECIMAL(15, 2) NOT NULL CHECK (balance >= 0)
                ) ENGINE=InnoDB;
            """)
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    product_id VARCHAR(50) PRIMARY KEY, name VARCHAR(100),
                    stock INT UNSIGNED NOT NULL, price DECIMAL(10,2) NOT NULL
                ) ENGINE=InnoDB;
            """)
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS coupons (
                    coupon_code VARCHAR(50) PRIMARY KEY, discount_amount DECIMAL(10,2),
                    is_valid BOOLEAN DEFAULT TRUE
                ) ENGINE=InnoDB;
            """)
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS orders (
                    order_id VARCHAR(50) PRIMARY KEY, user_account_id VARCHAR(50),
                    total_amount DECIMAL(10,2), status VARCHAR(50) DEFAULT 'PENDING',
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (user_account_id) REFERENCES accounts(account_id)
                ) ENGINE=InnoDB;
            """)
            await cur.execute("""
                CREATE TABLE IF NOT EXISTS order_items (
                    item_id INT AUTO_INCREMENT PRIMARY KEY, order_id VARCHAR(50),
                    product_id VARCHAR(50), quantity INT, price_per_unit DECIMAL(10,2),
                    FOREIGN KEY (order_id) REFERENCES orders(order_id),
                    FOREIGN KEY (product_id) REFERENCES products(product_id)
                ) ENGINE=InnoDB;
            """)
            # 插入初始数据
            users = [('USER001', 'Test User 1', 5000.00), ('USER002', 'Test User 2', 300.00)]
            products_data = [('P001', 'Laptop', 10, 1200.00), ('P002', 'Mouse', 50, 25.00), ('P003', 'Keyboard', 30, 75.00)]
            coupons_data = [('SAVE10', 10.00, True), ('INVALID20', 20.00, False), ('BIG50', 50.00, True)]
            for acc_id, owner, bal in users: await cur.execute("INSERT IGNORE INTO accounts VALUES (%s,%s,%s)", (acc_id,owner,bal))
            for pid, name, stock, price in products_data: await cur.execute("INSERT IGNORE INTO products VALUES (%s,%s,%s,%s)", (pid,name,stock,price))
            for code, disc, valid in coupons_data: await cur.execute("INSERT IGNORE INTO coupons VALUES (%s,%s,%s)", (code,disc,valid))
            await conn.commit()
            print("演示保存点所需的表和初始数据已设置/重置。")

async def main_savepoint_example():
    global tx_db_pool # 确保我们可以访问和修改全局的连接池
    tx_db_pool = None # 重置池,以便 init_tx_db_pool 可以创建一个新的
    DB_CONFIG_NO_AUTOCOMMIT['db'] = 'your_database_tx_sp' # 使用特定数据库
    await init_tx_db_pool() # 初始化 `autocommit=False` 的连接池
    if not tx_db_pool: return

    await setup_savepoint_demo_tables()

    print("
--- 场景SP 1: 成功订单,使用有效优惠券 'SAVE10' ---")
    order1_items = [{
            'product_id': 'P001', 'quantity': 1}, {
            'product_id': 'P002', 'quantity': 2}]
    result1 = await process_order_with_savepoints('USER001', order1_items, coupon_code='SAVE10')
    print(f"场景SP 1 结果: {
              result1}")

    print("
--- 场景SP 2: 成功订单,使用无效优惠券 'INVALID20' (优惠券部分应回滚) ---")
    order2_items = [{
            'product_id': 'P003', 'quantity': 1}]
    result2 = await process_order_with_savepoints('USER001', order2_items, coupon_code='INVALID20')
    print(f"场景SP 2 结果: {
              result2}")

    print("
--- 场景SP 3: 订单因库存不足失败 (主事务应回滚) ---")
    order3_items = [{
            'product_id': 'P001', 'quantity': 20}] # P001 库存只有10
    result3 = await process_order_with_savepoints('USER002', order3_items, coupon_code='BIG50')
    print(f"场景SP 3 结果: {
              result3}")

    print("
--- 场景SP 4: 成功订单,不使用优惠券 ---")
    order4_items = [{
            'product_id': 'P002', 'quantity': 1}]
    result4 = await process_order_with_savepoints('USER002', order4_items)
    print(f"场景SP 4 结果: {
              result4}")

    # 检查最终库存和优惠券状态
    async with tx_db_pool.acquire() as conn:
        async with conn.cursor(aiomysql.DictCursor) as cur:
            print("
--- 最终产品库存 ---")
            await cur.execute("SELECT product_id, stock FROM products;")
            for row in await cur.fetchall(): print(f"  {
              row['product_id']}: {
              row['stock']}")
            print("
--- 最终优惠券状态 ---")
            await cur.execute("SELECT coupon_code, is_valid FROM coupons;")
            for row in await cur.fetchall(): print(f"  {
              row['coupon_code']}: {
              '有效' if row['is_valid'] else '失效'}")

    await close_tx_db_pool()


if __name__ == "__main__":
    asyncio.run(main_savepoint_example())

代码解释 process_order_with_savepoints:

主事务: await conn.begin() 启动整个订单处理事务。
核心操作: 创建订单记录、处理订单项、扣减库存。这些操作如果失败,会抛出异常,导致整个事务回滚(在最外层的 except 块中处理)。
保存点 sp_coupon_application:

await conn.savepoint(savepoint_coupon): 在尝试应用优惠券之前创建一个保存点。
try...except 块包裹了优惠券应用逻辑。
如果优惠券应用成功:

await conn.release_savepoint(savepoint_coupon): 显式释放保存点。虽然不是严格必需(事务结束时所有保存点都会消失),但这是良好实践。

如果优惠券应用失败 (例如,优惠券无效或更新数据库时出错):

await conn.rollback_savepoint(savepoint_coupon): 将事务状态回滚到 sp_coupon_application 创建的时刻。这意味着对 orders 表(总金额)和 coupons 表(有效性)所做的与此优惠券相关的更改都被撤销了。
核心订单操作(如库存扣减)不受此保存点回滚的影响。
final_order_status 被更新以反映优惠券失败,订单继续处理。

提交或回滚主事务:

如果所有核心操作都成功(并且任何可选操作失败后都已妥善处理,例如通过保存点回滚并更新状态),则 await conn.commit() 提交整个事务。
如果任何主流程中的操作(非保存点保护的)或未被捕获的异常发生,最外层的 except 块会执行 await conn.rollback() 来回滚整个事务。

main_savepoint_examplesetup_savepoint_demo_tables:

setup_savepoint_demo_tables 创建了更复杂的表结构 (products, coupons, orders, order_items) 并填充了初始数据,以支持保存点场景。
main_savepoint_example 演示了不同情况:

订单成功且优惠券成功。
订单核心成功,但优惠券无效(优惠券操作回滚,订单继续)。
订单因核心操作(库存不足)失败(整个事务回滚)。
订单成功且不使用优惠券。

最后检查数据库状态以验证操作的正确性。

使用保存点的注意事项:

复杂性: 保存点增加了事务逻辑的复杂性。只在确实需要部分回滚的场景下使用。
命名: 保存点名称在事务内必须唯一。
数据库支持: 确保你的 MySQL 版本和存储引擎 (通常是 InnoDB) 支持保存点。
性能: 过多地创建和回滚保存点可能会有轻微的性能开销。
AsyncTransactionManager 的集成: 如果要将保存点与我们之前创建的 AsyncTransactionManager 一起使用,AsyncTransactionManager 需要进行扩展,例如提供一个方法来访问底层的 conn 对象,或者直接在 AsyncTransactionManager 上实现 savepoint, rollback_savepoint, release_savepoint 的包装方法。

通过这两个深入的事务管理示例(显式控制和上下文管理器)以及保存点的应用,我们已经覆盖了在 Python 和 aiomysql 中进行健壮的异步数据库操作的关键方面。确保数据的一致性是任何企业级应用的核心要求。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容