Rust ORM 库选择之 SQLx+Actix-web 集成:编译时检查的 SQL 实践

Rust ORM 库选择之 SQLx+Actix-web 集成:编译时检查的 SQL 实践

SQLx 是 Rust 生态中一个独特的数据库工具,它不生成代码,而是在编译时检查 SQL 查询的正确性。自 2019 年发布以来,SQLx 凭借其异步优先的设计和编译时 SQL 验证能力,成为 Rust 异步数据库操作的热门选择。本文介绍如何在 Actix-web 项目中使用 SQLx 进行类型安全的数据库操作。

为什么选择 SQLx

SQLx 采用了一种与众不同的设计理念:直接编写 SQL,但在编译时进行验证。这种设计既保留了 SQL 的灵活性,又提供了类型安全保障。

SQLx 的核心优势在于编译时 SQL 验证。SQL 语法错误、字段名错误、类型不匹配等问题在编译阶段即可发现,无需运行时连接数据库即可进行验证。此外,SQLx 原生支持异步操作,与 Rust 的异步生态完美集成。

需要注意的是,SQLx 需要数据库连接来执行编译时检查(虽然支持离线模式,但体验会有所差异)。对于喜爱直接编写 SQL 的开发者来说,SQLx 提供了最佳的类型安全与灵活性平衡。

项目配置

使用 SQLx 的第一步是配置项目依赖。在 Cargo.toml 中添加以下依赖:

[dependencies]
actix-web = "4.5"
actix-rt = "2.10"
sqlx = { version = "0.7", features = ["runtime-actix-native-tls", "mysql", "chrono", "offline"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
dotenvy = "0.15"
chrono = { version = "0.4", features = ["serde"] }

注意:上述版本号使用语义化版本范围,会自动获取最新的兼容补丁版本。SQLx 的 features 说明:

  • runtime-actix-native-tls:使用 Actix 运行时和 native-tls(也可选择 runtime-tokio-native-tls)
  • mysql:MySQL 数据库支持
  • chrono:日期时间类型支持
  • offline:启用离线模式,允许在没有数据库连接时编译

配置数据库连接,在项目根目录创建 .env 文件:

DATABASE_URL=mysql://user:password@localhost/dbname

注意:SQLx 在编译时需要数据库连接来验证 SQL 查询。如果使用离线模式,需要先运行 cargo sqlx prepare 生成查询元数据。

数据库迁移

SQLx 提供了 sqlx-cli 工具来管理数据库迁移。第一安装 CLI 工具:

cargo install sqlx-cli --features mysql

创建迁移:

sqlx migrate add create_users

在生成的
migrations/xxx_create_users.sql 文件中编写以下 SQL:

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

运行迁移:

sqlx migrate run

回滚迁移:

sqlx migrate revert

连接池配置

SQLx 内置了连接池功能,无需额外的连接池库。连接池配置简单高效:

// src/db.rs
use sqlx::mysql::{MySqlPool, MySqlPoolOptions};
use dotenvy::dotenv;
use std::env;
use std::time::Duration;

pub type DbPool = MySqlPool;

pub async fn create_db_pool() -> Result<DbPool, sqlx::Error> {
    dotenv().ok();
    
    let database_url = env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");
    
    MySqlPoolOptions::new()
        .max_connections(20)                    // 最大连接数
        .min_connections(5)                     // 最小连接数
        .acquire_timeout(Duration::from_secs(30))  // 获取连接的超时时间
        .idle_timeout(Some(Duration::from_secs(600))) // 空闲连接超时
        .max_lifetime(Some(Duration::from_secs(1800))) // 连接最大生存时间
        .connect(&database_url)
        .await
}

项目结构:提议的项目结构如下:

src/
├── main.rs          # 主程序入口和路由
├── db.rs            # 数据库连接池配置
└── models.rs        # 数据模型定义

注意:SQLx 不需要单独的 schema 文件,所有类型信息都从 SQL 查询中推导。

Actix-web 集成:完整的 CRUD 示例

以下是一个完整的 Actix-web + SQLx 的 CRUD 应用示例,涵盖所有基本操作。

数据模型

// src/models.rs
use serde::{Deserialize, Serialize};
use chrono::NaiveDateTime;

#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
pub struct User {
    pub id: i32,
    pub name: String,
    pub email: String,
    pub created_at: NaiveDateTime,
}

// 请求/响应结构体
#[derive(Deserialize)]
pub struct CreateUserRequest {
    pub name: String,
    pub email: String,
}

#[derive(Serialize)]
pub struct UserResponse {
    pub id: i32,
    pub name: String,
    pub email: String,
    pub created_at: String,
}

#[derive(Deserialize)]
pub struct UpdateUserRequest {
    pub name: Option<String>,
    pub email: Option<String>,
}

#[derive(Deserialize)]
pub struct UserQueryParams {
    pub name: Option<String>,
    pub email: Option<String>,
    pub limit: Option<i32>,
    pub offset: Option<i32>,
}

注意:SQLx 使用 sqlx::FromRow derive 宏自动将查询结果映射到结构体,字段名需要与数据库列名匹配。

完整的应用代码

// src/main.rs
use actix_web::{web, App, HttpServer, HttpResponse, Result as ActixResult};
use serde_json::json;
use dotenvy::dotenv;

mod db;
mod models;

use db::{DbPool, create_db_pool};
use models::*;

// 主函数
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    dotenvy().ok();
    
    // 创建连接池
    let pool = create_db_pool()
        .await
        .expect("Failed to create database pool");
    
    println!("服务器启动在 http://127.0.0.1:8080");
    
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(pool.clone()))
            .service(
                web::scope("/api/users")
                    .route("", web::get().to(get_users))
                    .route("", web::post().to(create_user))
                    .route("/{id}", web::get().to(get_user))
                    .route("/{id}", web::put().to(update_user))
                    .route("/{id}", web::delete().to(delete_user))
            )
            .route("/health", web::get().to(health_check))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

// 1. 获取用户列表(支持查询参数和分页)
async fn get_users(
    pool: web::Data<DbPool>,
    query: web::Query<UserQueryParams>,
) -> ActixResult<HttpResponse> {
    let limit = query.limit.unwrap_or(10);
    let offset = query.offset.unwrap_or(0);
    
    // 使用条件构建查询
    let users = if let (Some(name), Some(email)) = (&query.name, &query.email) {
        sqlx::query_as::<_, User>(
            "SELECT id, name, email, created_at FROM users 
             WHERE name LIKE ? AND email LIKE ? 
             ORDER BY created_at DESC LIMIT ? OFFSET ?"
        )
        .bind(format!("%{}%", name))
        .bind(format!("%{}%", email))
        .bind(limit)
        .bind(offset)
        .fetch_all(&**pool)
        .await
    } else if let Some(name) = &query.name {
        sqlx::query_as::<_, User>(
            "SELECT id, name, email, created_at FROM users 
             WHERE name LIKE ? 
             ORDER BY created_at DESC LIMIT ? OFFSET ?"
        )
        .bind(format!("%{}%", name))
        .bind(limit)
        .bind(offset)
        .fetch_all(&**pool)
        .await
    } else if let Some(email) = &query.email {
        sqlx::query_as::<_, User>(
            "SELECT id, name, email, created_at FROM users 
             WHERE email LIKE ? 
             ORDER BY created_at DESC LIMIT ? OFFSET ?"
        )
        .bind(format!("%{}%", email))
        .bind(limit)
        .bind(offset)
        .fetch_all(&**pool)
        .await
    } else {
        sqlx::query_as::<_, User>(
            "SELECT id, name, email, created_at FROM users 
             ORDER BY created_at DESC LIMIT ? OFFSET ?"
        )
        .bind(limit)
        .bind(offset)
        .fetch_all(&**pool)
        .await
    }
    .map_err(|e| actix_web::error::ErrorInternalServerError(format!("数据库查询错误: {}", e)))?;
    
    let response: Vec<UserResponse> = users.into_iter()
        .map(|u| UserResponse {
            id: u.id,
            name: u.name,
            email: u.email,
            created_at: u.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
        })
        .collect();
    
    Ok(HttpResponse::Ok().json(response))
}

// 2. 获取单个用户
async fn get_user(
    pool: web::Data<DbPool>,
    path: web::Path<i32>,
) -> ActixResult<HttpResponse> {
    let user_id = path.into_inner();
    
    let user = sqlx::query_as::<_, User>(
        "SELECT id, name, email, created_at FROM users WHERE id = ?"
    )
    .bind(user_id)
    .fetch_optional(&**pool)
    .await
    .map_err(|e| actix_web::error::ErrorInternalServerError(format!("数据库查询错误: {}", e)))?;
    
    match user {
        Some(u) => {
            let response = UserResponse {
                id: u.id,
                name: u.name,
                email: u.email,
                created_at: u.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
            };
            Ok(HttpResponse::Ok().json(response))
        }
        None => Ok(HttpResponse::NotFound().json(json!({
            "error": "User not found"
        })))
    }
}

// 3. 创建用户
async fn create_user(
    pool: web::Data<DbPool>,
    req: web::Json<CreateUserRequest>,
) -> ActixResult<HttpResponse> {
    let result = sqlx::query(
        "INSERT INTO users (name, email) VALUES (?, ?)"
    )
    .bind(&req.name)
    .bind(&req.email)
    .execute(&**pool)
    .await
    .map_err(|e| actix_web::error::ErrorInternalServerError(format!("创建用户失败: {}", e)))?;
    
    let user_id = result.last_insert_id() as i32;
    
    // 获取创建的用户
    let user = sqlx::query_as::<_, User>(
        "SELECT id, name, email, created_at FROM users WHERE id = ?"
    )
    .bind(user_id)
    .fetch_one(&**pool)
    .await
    .map_err(|e| actix_web::error::ErrorInternalServerError(format!("查询用户失败: {}", e)))?;
    
    let response = UserResponse {
        id: user.id,
        name: user.name,
        email: user.email,
        created_at: user.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
    };
    
    Ok(HttpResponse::Created().json(response))
}

// 4. 更新用户
async fn update_user(
    pool: web::Data<DbPool>,
    path: web::Path<i32>,
    req: web::Json<UpdateUserRequest>,
) -> ActixResult<HttpResponse> {
    let user_id = path.into_inner();
    
    // 检查是否有字段要更新
    if req.name.is_none() && req.email.is_none() {
        return Ok(HttpResponse::BadRequest().json(json!({
            "error": "至少需要提供一个要更新的字段"
        })));
    }
    
    // 先检查用户是否存在
    let exists = sqlx::query_scalar::<_, i32>(
        "SELECT COUNT(*) FROM users WHERE id = ?"
    )
    .bind(user_id)
    .fetch_one(&**pool)
    .await
    .map_err(|e| actix_web::error::ErrorInternalServerError(format!("数据库查询错误: {}", e)))?;
    
    if exists == 0 {
        return Ok(HttpResponse::NotFound().json(json!({
            "error": "User not found"
        })));
    }
    
    // 构建更新语句
    let mut update_fields = Vec::new();
    let mut params: Vec<String> = Vec::new();
    
    if let Some(ref name) = req.name {
        update_fields.push("name = ?");
        params.push(name.clone());
    }
    
    if let Some(ref email) = req.email {
        update_fields.push("email = ?");
        params.push(email.clone());
    }
    
    let sql = format!(
        "UPDATE users SET {} WHERE id = ?",
        update_fields.join(", ")
    );
    
    let mut query = sqlx::query(&sql);
    for param in params {
        query = query.bind(param);
    }
    query = query.bind(user_id);
    
    query
        .execute(&**pool)
        .await
        .map_err(|e| actix_web::error::ErrorInternalServerError(format!("更新用户失败: {}", e)))?;
    
    // 获取更新后的用户
    let user = sqlx::query_as::<_, User>(
        "SELECT id, name, email, created_at FROM users WHERE id = ?"
    )
    .bind(user_id)
    .fetch_one(&**pool)
    .await
    .map_err(|e| actix_web::error::ErrorInternalServerError(format!("查询用户失败: {}", e)))?;
    
    let response = UserResponse {
        id: user.id,
        name: user.name,
        email: user.email,
        created_at: user.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
    };
    
    Ok(HttpResponse::Ok().json(response))
}

// 5. 删除用户
async fn delete_user(
    pool: web::Data<DbPool>,
    path: web::Path<i32>,
) -> ActixResult<HttpResponse> {
    let user_id = path.into_inner();
    
    let result = sqlx::query("DELETE FROM users WHERE id = ?")
        .bind(user_id)
        .execute(&**pool)
        .await
        .map_err(|e| actix_web::error::ErrorInternalServerError(format!("删除用户失败: {}", e)))?;
    
    if result.rows_affected() == 0 {
        Ok(HttpResponse::NotFound().json(json!({
            "error": "User not found"
        })))
    } else {
        Ok(HttpResponse::NoContent().finish())
    }
}

// 健康检查(连接池监控)
async fn health_check(pool: web::Data<DbPool>) -> ActixResult<HttpResponse> {
    // 测试数据库连接
    let result = sqlx::query_scalar::<_, i32>("SELECT 1")
        .fetch_one(&**pool)
        .await;
    
    match result {
        Ok(_) => Ok(HttpResponse::Ok().json(json!({
            "status": "healthy",
            "database": "connected"
        }))),
        Err(e) => Ok(HttpResponse::ServiceUnavailable().json(json!({
            "status": "unhealthy",
            "error": format!("数据库连接失败: {}", e)
        })))
    }
}

注意:上述代码示例假设所有模块都已正确配置。实际使用时,请确保:

  1. 数据库连接字符串在 .env 文件中正确配置
  2. 已运行数据库迁移:sqlx migrate run
  3. 编译时数据库可访问(用于 SQL 验证),或使用离线模式

编译时 SQL 检查

SQLx 的核心特性是在编译时验证 SQL 查询。使用 query! 宏可以在编译时检查 SQL 的正确性:

use sqlx::mysql::MySqlPool;

// 编译时检查的查询
let user = sqlx::query!(
    "SELECT id, name, email, created_at FROM users WHERE id = ?",
    user_id
)
.fetch_optional(pool)
.await?;

// query! 宏会自动推导返回类型,无需手动指定
if let Some(u) = user {
    println!("用户: {} ({})", u.name, u.email);
}

注意:query! 宏需要在编译时连接数据库。如果无法连接,可以使用 query_as! 或启用离线模式。

离线模式

如果需要在没有数据库连接时编译,可以使用离线模式:

  1. 首次编译时连接数据库,SQLx 会生成查询元数据
  2. 运行 cargo sqlx prepare 将元数据保存到 .sqlx 目录
  3. 后续编译时使用 SQLX_OFFLINE=true cargo build 即可离线编译
# 生成离线元数据
cargo sqlx prepare --database-url mysql://user:password@localhost/dbname

# 离线编译
SQLX_OFFLINE=true cargo build

错误处理最佳实践

统一的错误处理机制能够提升代码的可维护性和一致性。以下是推荐的错误处理实现:

// src/errors.rs(可选,用于统一错误处理)
use actix_web::{error::ResponseError, HttpResponse};
use sqlx::Error as SqlxError;
use serde_json::json;

#[derive(Debug)]
pub enum AppError {
    DatabaseError(SqlxError),
    NotFound,
    ValidationError(String),
}

impl From<SqlxError> for AppError {
    fn from(err: SqlxError) -> Self {
        AppError::DatabaseError(err)
    }
}

impl std::fmt::Display for AppError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            AppError::DatabaseError(e) => write!(f, "数据库错误: {}", e),
            AppError::NotFound => write!(f, "资源未找到"),
            AppError::ValidationError(msg) => write!(f, "验证错误: {}", msg),
        }
    }
}

impl ResponseError for AppError {
    fn error_response(&self) -> HttpResponse {
        match self {
            AppError::DatabaseError(e) => {
                HttpResponse::InternalServerError().json(json!({
                    "error": format!("数据库错误: {}", e)
                }))
            }
            AppError::NotFound => {
                HttpResponse::NotFound().json(json!({
                    "error": "资源未找到"
                }))
            }
            AppError::ValidationError(msg) => {
                HttpResponse::BadRequest().json(json!({
                    "error": msg
                }))
            }
        }
    }
}

// 使用示例
async fn get_user(
    pool: web::Data<DbPool>,
    path: web::Path<i32>,
) -> Result<HttpResponse, AppError> {
    let user_id = path.into_inner();
    
    let user = sqlx::query_as::<_, User>(
        "SELECT id, name, email, created_at FROM users WHERE id = ?"
    )
    .bind(user_id)
    .fetch_optional(&**pool)
    .await?;
    
    match user {
        Some(u) => Ok(HttpResponse::Ok().json(u)),
        None => Err(AppError::NotFound),
    }
}

API 使用示例

启动服务器后,可通过以下 API 端点进行测试:

# 获取用户列表(支持查询参数)
GET http://127.0.0.1:8080/api/users?name=张&limit=10&offset=0

# 获取单个用户
GET http://127.0.0.1:8080/api/users/1

# 创建用户
POST http://127.0.0.1:8080/api/users
Content-Type: application/json

{
  "name": "张三",
  "email": "zhangsan@example.com"
}

# 更新用户
PUT http://127.0.0.1:8080/api/users/1
Content-Type: application/json

{
  "name": "李四",
  "email": "lisi@example.com"
}

# 删除用户
DELETE http://127.0.0.1:8080/api/users/1

# 健康检查
GET http://127.0.0.1:8080/health

总结

SQLx 通过编译时 SQL 验证和原生异步支持,为 Rust 开发者提供了既灵活又类型安全的数据库操作方案。虽然需要在编译时连接数据库(或使用离线模式),但其带来的类型安全和 SQL 验证能力能够显著减少运行时错误。

需要注意的是,SQLx 作为 SQL 查询工具,其核心价值在于直接编写 SQL 的同时享受类型安全。对于熟悉 SQL 的开发者来说,SQLx 提供了最佳的类型安全与灵活性平衡。

对于初次使用 SQLx + Actix-web 的开发者,提议从小项目开始,逐步熟悉编译时 SQL 检查的使用方式,再逐步探索复杂的查询和事务处理。遇到问题时,可以参考 SQLx 官方文档,文档相对完善。


工具只是手段,理解原理才是根本。掌握 SQL 和数据库设计的核心概念,无论使用何种数据库工具都能游刃有余。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容