
SQLx 是 Rust 生态中一个独特的数据库工具,它不生成代码,而是在编译时检查 SQL 查询的正确性。自 2019 年发布以来,SQLx 凭借其异步优先的设计和编译时 SQL 验证能力,成为 Rust 异步数据库操作的热门选择。本文介绍如何在 Actix-web 项目中使用 SQLx 进行类型安全的数据库操作。
为什么选择 SQLx
SQLx 采用了一种与众不同的设计理念:直接编写 SQL,但在编译时进行验证。这种设计既保留了 SQL 的灵活性,又提供了类型安全保障。
SQLx 的核心优势在于编译时 SQL 验证。SQL 语法错误、字段名错误、类型不匹配等问题在编译阶段即可发现,无需运行时连接数据库即可进行验证。此外,SQLx 原生支持异步操作,与 Rust 的异步生态完美集成。
需要注意的是,SQLx 需要数据库连接来执行编译时检查(虽然支持离线模式,但体验会有所差异)。对于喜爱直接编写 SQL 的开发者来说,SQLx 提供了最佳的类型安全与灵活性平衡。
项目配置
使用 SQLx 的第一步是配置项目依赖。在 Cargo.toml 中添加以下依赖:
[dependencies]
actix-web = "4.5"
actix-rt = "2.10"
sqlx = { version = "0.7", features = ["runtime-actix-native-tls", "mysql", "chrono", "offline"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
dotenvy = "0.15"
chrono = { version = "0.4", features = ["serde"] }
注意:上述版本号使用语义化版本范围,会自动获取最新的兼容补丁版本。SQLx 的 features 说明:
- runtime-actix-native-tls:使用 Actix 运行时和 native-tls(也可选择 runtime-tokio-native-tls)
- mysql:MySQL 数据库支持
- chrono:日期时间类型支持
- offline:启用离线模式,允许在没有数据库连接时编译
配置数据库连接,在项目根目录创建 .env 文件:
DATABASE_URL=mysql://user:password@localhost/dbname
注意:SQLx 在编译时需要数据库连接来验证 SQL 查询。如果使用离线模式,需要先运行 cargo sqlx prepare 生成查询元数据。
数据库迁移
SQLx 提供了 sqlx-cli 工具来管理数据库迁移。第一安装 CLI 工具:
cargo install sqlx-cli --features mysql
创建迁移:
sqlx migrate add create_users
在生成的
migrations/xxx_create_users.sql 文件中编写以下 SQL:
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
运行迁移:
sqlx migrate run
回滚迁移:
sqlx migrate revert
连接池配置
SQLx 内置了连接池功能,无需额外的连接池库。连接池配置简单高效:
// src/db.rs
use sqlx::mysql::{MySqlPool, MySqlPoolOptions};
use dotenvy::dotenv;
use std::env;
use std::time::Duration;
pub type DbPool = MySqlPool;
pub async fn create_db_pool() -> Result<DbPool, sqlx::Error> {
dotenv().ok();
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
MySqlPoolOptions::new()
.max_connections(20) // 最大连接数
.min_connections(5) // 最小连接数
.acquire_timeout(Duration::from_secs(30)) // 获取连接的超时时间
.idle_timeout(Some(Duration::from_secs(600))) // 空闲连接超时
.max_lifetime(Some(Duration::from_secs(1800))) // 连接最大生存时间
.connect(&database_url)
.await
}
项目结构:提议的项目结构如下:
src/
├── main.rs # 主程序入口和路由
├── db.rs # 数据库连接池配置
└── models.rs # 数据模型定义
注意:SQLx 不需要单独的 schema 文件,所有类型信息都从 SQL 查询中推导。
Actix-web 集成:完整的 CRUD 示例
以下是一个完整的 Actix-web + SQLx 的 CRUD 应用示例,涵盖所有基本操作。
数据模型
// src/models.rs
use serde::{Deserialize, Serialize};
use chrono::NaiveDateTime;
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
pub struct User {
pub id: i32,
pub name: String,
pub email: String,
pub created_at: NaiveDateTime,
}
// 请求/响应结构体
#[derive(Deserialize)]
pub struct CreateUserRequest {
pub name: String,
pub email: String,
}
#[derive(Serialize)]
pub struct UserResponse {
pub id: i32,
pub name: String,
pub email: String,
pub created_at: String,
}
#[derive(Deserialize)]
pub struct UpdateUserRequest {
pub name: Option<String>,
pub email: Option<String>,
}
#[derive(Deserialize)]
pub struct UserQueryParams {
pub name: Option<String>,
pub email: Option<String>,
pub limit: Option<i32>,
pub offset: Option<i32>,
}
注意:SQLx 使用 sqlx::FromRow derive 宏自动将查询结果映射到结构体,字段名需要与数据库列名匹配。
完整的应用代码
// src/main.rs
use actix_web::{web, App, HttpServer, HttpResponse, Result as ActixResult};
use serde_json::json;
use dotenvy::dotenv;
mod db;
mod models;
use db::{DbPool, create_db_pool};
use models::*;
// 主函数
#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenvy().ok();
// 创建连接池
let pool = create_db_pool()
.await
.expect("Failed to create database pool");
println!("服务器启动在 http://127.0.0.1:8080");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(pool.clone()))
.service(
web::scope("/api/users")
.route("", web::get().to(get_users))
.route("", web::post().to(create_user))
.route("/{id}", web::get().to(get_user))
.route("/{id}", web::put().to(update_user))
.route("/{id}", web::delete().to(delete_user))
)
.route("/health", web::get().to(health_check))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
// 1. 获取用户列表(支持查询参数和分页)
async fn get_users(
pool: web::Data<DbPool>,
query: web::Query<UserQueryParams>,
) -> ActixResult<HttpResponse> {
let limit = query.limit.unwrap_or(10);
let offset = query.offset.unwrap_or(0);
// 使用条件构建查询
let users = if let (Some(name), Some(email)) = (&query.name, &query.email) {
sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users
WHERE name LIKE ? AND email LIKE ?
ORDER BY created_at DESC LIMIT ? OFFSET ?"
)
.bind(format!("%{}%", name))
.bind(format!("%{}%", email))
.bind(limit)
.bind(offset)
.fetch_all(&**pool)
.await
} else if let Some(name) = &query.name {
sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users
WHERE name LIKE ?
ORDER BY created_at DESC LIMIT ? OFFSET ?"
)
.bind(format!("%{}%", name))
.bind(limit)
.bind(offset)
.fetch_all(&**pool)
.await
} else if let Some(email) = &query.email {
sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users
WHERE email LIKE ?
ORDER BY created_at DESC LIMIT ? OFFSET ?"
)
.bind(format!("%{}%", email))
.bind(limit)
.bind(offset)
.fetch_all(&**pool)
.await
} else {
sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users
ORDER BY created_at DESC LIMIT ? OFFSET ?"
)
.bind(limit)
.bind(offset)
.fetch_all(&**pool)
.await
}
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("数据库查询错误: {}", e)))?;
let response: Vec<UserResponse> = users.into_iter()
.map(|u| UserResponse {
id: u.id,
name: u.name,
email: u.email,
created_at: u.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
})
.collect();
Ok(HttpResponse::Ok().json(response))
}
// 2. 获取单个用户
async fn get_user(
pool: web::Data<DbPool>,
path: web::Path<i32>,
) -> ActixResult<HttpResponse> {
let user_id = path.into_inner();
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users WHERE id = ?"
)
.bind(user_id)
.fetch_optional(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("数据库查询错误: {}", e)))?;
match user {
Some(u) => {
let response = UserResponse {
id: u.id,
name: u.name,
email: u.email,
created_at: u.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
};
Ok(HttpResponse::Ok().json(response))
}
None => Ok(HttpResponse::NotFound().json(json!({
"error": "User not found"
})))
}
}
// 3. 创建用户
async fn create_user(
pool: web::Data<DbPool>,
req: web::Json<CreateUserRequest>,
) -> ActixResult<HttpResponse> {
let result = sqlx::query(
"INSERT INTO users (name, email) VALUES (?, ?)"
)
.bind(&req.name)
.bind(&req.email)
.execute(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("创建用户失败: {}", e)))?;
let user_id = result.last_insert_id() as i32;
// 获取创建的用户
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users WHERE id = ?"
)
.bind(user_id)
.fetch_one(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("查询用户失败: {}", e)))?;
let response = UserResponse {
id: user.id,
name: user.name,
email: user.email,
created_at: user.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
};
Ok(HttpResponse::Created().json(response))
}
// 4. 更新用户
async fn update_user(
pool: web::Data<DbPool>,
path: web::Path<i32>,
req: web::Json<UpdateUserRequest>,
) -> ActixResult<HttpResponse> {
let user_id = path.into_inner();
// 检查是否有字段要更新
if req.name.is_none() && req.email.is_none() {
return Ok(HttpResponse::BadRequest().json(json!({
"error": "至少需要提供一个要更新的字段"
})));
}
// 先检查用户是否存在
let exists = sqlx::query_scalar::<_, i32>(
"SELECT COUNT(*) FROM users WHERE id = ?"
)
.bind(user_id)
.fetch_one(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("数据库查询错误: {}", e)))?;
if exists == 0 {
return Ok(HttpResponse::NotFound().json(json!({
"error": "User not found"
})));
}
// 构建更新语句
let mut update_fields = Vec::new();
let mut params: Vec<String> = Vec::new();
if let Some(ref name) = req.name {
update_fields.push("name = ?");
params.push(name.clone());
}
if let Some(ref email) = req.email {
update_fields.push("email = ?");
params.push(email.clone());
}
let sql = format!(
"UPDATE users SET {} WHERE id = ?",
update_fields.join(", ")
);
let mut query = sqlx::query(&sql);
for param in params {
query = query.bind(param);
}
query = query.bind(user_id);
query
.execute(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("更新用户失败: {}", e)))?;
// 获取更新后的用户
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users WHERE id = ?"
)
.bind(user_id)
.fetch_one(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("查询用户失败: {}", e)))?;
let response = UserResponse {
id: user.id,
name: user.name,
email: user.email,
created_at: user.created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
};
Ok(HttpResponse::Ok().json(response))
}
// 5. 删除用户
async fn delete_user(
pool: web::Data<DbPool>,
path: web::Path<i32>,
) -> ActixResult<HttpResponse> {
let user_id = path.into_inner();
let result = sqlx::query("DELETE FROM users WHERE id = ?")
.bind(user_id)
.execute(&**pool)
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(format!("删除用户失败: {}", e)))?;
if result.rows_affected() == 0 {
Ok(HttpResponse::NotFound().json(json!({
"error": "User not found"
})))
} else {
Ok(HttpResponse::NoContent().finish())
}
}
// 健康检查(连接池监控)
async fn health_check(pool: web::Data<DbPool>) -> ActixResult<HttpResponse> {
// 测试数据库连接
let result = sqlx::query_scalar::<_, i32>("SELECT 1")
.fetch_one(&**pool)
.await;
match result {
Ok(_) => Ok(HttpResponse::Ok().json(json!({
"status": "healthy",
"database": "connected"
}))),
Err(e) => Ok(HttpResponse::ServiceUnavailable().json(json!({
"status": "unhealthy",
"error": format!("数据库连接失败: {}", e)
})))
}
}
注意:上述代码示例假设所有模块都已正确配置。实际使用时,请确保:
- 数据库连接字符串在 .env 文件中正确配置
- 已运行数据库迁移:sqlx migrate run
- 编译时数据库可访问(用于 SQL 验证),或使用离线模式
编译时 SQL 检查
SQLx 的核心特性是在编译时验证 SQL 查询。使用 query! 宏可以在编译时检查 SQL 的正确性:
use sqlx::mysql::MySqlPool;
// 编译时检查的查询
let user = sqlx::query!(
"SELECT id, name, email, created_at FROM users WHERE id = ?",
user_id
)
.fetch_optional(pool)
.await?;
// query! 宏会自动推导返回类型,无需手动指定
if let Some(u) = user {
println!("用户: {} ({})", u.name, u.email);
}
注意:query! 宏需要在编译时连接数据库。如果无法连接,可以使用 query_as! 或启用离线模式。
离线模式
如果需要在没有数据库连接时编译,可以使用离线模式:
- 首次编译时连接数据库,SQLx 会生成查询元数据
- 运行 cargo sqlx prepare 将元数据保存到 .sqlx 目录
- 后续编译时使用 SQLX_OFFLINE=true cargo build 即可离线编译
# 生成离线元数据
cargo sqlx prepare --database-url mysql://user:password@localhost/dbname
# 离线编译
SQLX_OFFLINE=true cargo build
错误处理最佳实践
统一的错误处理机制能够提升代码的可维护性和一致性。以下是推荐的错误处理实现:
// src/errors.rs(可选,用于统一错误处理)
use actix_web::{error::ResponseError, HttpResponse};
use sqlx::Error as SqlxError;
use serde_json::json;
#[derive(Debug)]
pub enum AppError {
DatabaseError(SqlxError),
NotFound,
ValidationError(String),
}
impl From<SqlxError> for AppError {
fn from(err: SqlxError) -> Self {
AppError::DatabaseError(err)
}
}
impl std::fmt::Display for AppError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppError::DatabaseError(e) => write!(f, "数据库错误: {}", e),
AppError::NotFound => write!(f, "资源未找到"),
AppError::ValidationError(msg) => write!(f, "验证错误: {}", msg),
}
}
}
impl ResponseError for AppError {
fn error_response(&self) -> HttpResponse {
match self {
AppError::DatabaseError(e) => {
HttpResponse::InternalServerError().json(json!({
"error": format!("数据库错误: {}", e)
}))
}
AppError::NotFound => {
HttpResponse::NotFound().json(json!({
"error": "资源未找到"
}))
}
AppError::ValidationError(msg) => {
HttpResponse::BadRequest().json(json!({
"error": msg
}))
}
}
}
}
// 使用示例
async fn get_user(
pool: web::Data<DbPool>,
path: web::Path<i32>,
) -> Result<HttpResponse, AppError> {
let user_id = path.into_inner();
let user = sqlx::query_as::<_, User>(
"SELECT id, name, email, created_at FROM users WHERE id = ?"
)
.bind(user_id)
.fetch_optional(&**pool)
.await?;
match user {
Some(u) => Ok(HttpResponse::Ok().json(u)),
None => Err(AppError::NotFound),
}
}
API 使用示例
启动服务器后,可通过以下 API 端点进行测试:
# 获取用户列表(支持查询参数)
GET http://127.0.0.1:8080/api/users?name=张&limit=10&offset=0
# 获取单个用户
GET http://127.0.0.1:8080/api/users/1
# 创建用户
POST http://127.0.0.1:8080/api/users
Content-Type: application/json
{
"name": "张三",
"email": "zhangsan@example.com"
}
# 更新用户
PUT http://127.0.0.1:8080/api/users/1
Content-Type: application/json
{
"name": "李四",
"email": "lisi@example.com"
}
# 删除用户
DELETE http://127.0.0.1:8080/api/users/1
# 健康检查
GET http://127.0.0.1:8080/health
总结
SQLx 通过编译时 SQL 验证和原生异步支持,为 Rust 开发者提供了既灵活又类型安全的数据库操作方案。虽然需要在编译时连接数据库(或使用离线模式),但其带来的类型安全和 SQL 验证能力能够显著减少运行时错误。
需要注意的是,SQLx 作为 SQL 查询工具,其核心价值在于直接编写 SQL 的同时享受类型安全。对于熟悉 SQL 的开发者来说,SQLx 提供了最佳的类型安全与灵活性平衡。
对于初次使用 SQLx + Actix-web 的开发者,提议从小项目开始,逐步熟悉编译时 SQL 检查的使用方式,再逐步探索复杂的查询和事务处理。遇到问题时,可以参考 SQLx 官方文档,文档相对完善。
工具只是手段,理解原理才是根本。掌握 SQL 和数据库设计的核心概念,无论使用何种数据库工具都能游刃有余。


















暂无评论内容