上一篇我介绍了Go生态圈的sqlx库。 Rust生态圈也有一个知名的sqlx库 ,今天给大家介绍一下。这两个没有什么关联啊,纯粹属于名称相同而已。
概览 sqlx是一个为Rust语言提供的功能齐全的数据库访问和查询构建器库。它支持多种数据库,包括PostgreSQL、MySQL、SQLite等。sqlx的设计目标是成为Rust中最直观、高效且类型安全的数据库客户端。
真正的异步 。从头开始使用 async/await 构建,以实现最大的并发性。
编译时检查查询 (如果你需要的话)。请注意,sqlx 不是 ORM。
与数据库无关 。支持 PostgreSQL、MySQL、MariaDB、SQLite。
纯 Rust 。Postgres 和 MySQL/MariaDB 驱动程序是使用零不安全的代码以纯 Rust 编写的。
与运行时无关 。在不同的运行时(async-std/tokio/actix)和 TLS 后端(native-tls,rustls)上运行。
SQLite 驱动程序使用 libsqlite3 C 库。
sqlx 除非启用了 sqlite 功能,否则会使用 #![forbid(unsafe_code)]。SQLite 驱动程序通过 libsqlite3-sys 直接调用 SQLite3 API,这需要使用不安全的代码。
另外,它还有以下特性:
跨平台。作为原生的 Rust 代码,sqlx 可以在任何支持 Rust 的平台上编译。
内建的连接池功能,使用 sqlx::Pool。
行流式处理。数据从数据库异步读取并按需解码。
自动的语句准备和缓存。当使用高级查询 API(sqlx::query)时,语句将按连接进行准备和缓存。
简单的(未准备)查询执行,包括将结果获取到与高级 API 使用的相同 Row 类型。支持批量执行并返回所有语句的结果。
传输层安全性(TLS)在支持的平台(MySQL、MariaDB 和 PostgreSQL)上可用。
使用 LISTEN 和 NOTIFY 以支持 PostgreSQL 异步通知。
支持保存点的嵌套事务。
任何数据库驱动程序,允许在运行时更改数据库驱动程序。AnyPool 根据 URL 方案连接到指定的驱动程序。
sqlx 支持编译时检查的查询。然而,它并不是通过提供一个 Rust API 或 DSL(特定领域语言)来构建查询来实现这一点的。相反,它提供了宏,这些宏接受常规的 SQL 作为输入,并确保其对于您的数据库是有效的。其工作原理是,sqlx 在编译时连接到您的开发数据库,让数据库本身验证(并返回一些有关)您的 SQL 查询的信息。这有一些可能令人惊讶的含义:
由于 sqlx 不需要解析 SQL 字符串本身,因此可以使用开发数据库接受的任何语法(包括数据库扩展添加的内容)
由于数据库允许您检索的查询信息量不同,从查询宏获得的 SQL 验证的程度取决于数据库。
它不是一个ORM库,你如果想使用ORM库,可以参考ormx 和SeaORM 。
安装 sqlx支持多种异步运行时,你可以通过选择不同的特性来使用不同的异步运行时。目前支持的异步运行时有async-std, tokio和 actix(其实是tokio的别名)。还支持TLS的连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # Cargo.toml [dependencies] # P挑选下面的一行引入sqlx: # tokio (no TLS) sqlx = { version = "0.7" , features = [ "runtime-tokio" ] } # tokio + native-tls sqlx = { version = "0.7" , features = [ "runtime-tokio" , "tls-native-tls" ] } # tokio + rustls sqlx = { version = "0.7" , features = [ "runtime-tokio" , "tls-rustls" ] } # async -std (no TLS) sqlx = { version = "0.7" , features = [ "runtime-async-std" ] } # async -std + native-tls sqlx = { version = "0.7" , features = [ "runtime-async-std" , "tls-native-tls" ] } # async -std + rustls sqlx = { version = "0.7" , features = [ "runtime-async-std" , "tls-rustls" ] }
如果你引入了多个异步运行时,默认首选tokio。
同时你也需要引入所需的数据库特性:
1 2 3 4 sqlx = { version = "0.7" , features = [ "postgres" ] } sqlx = { version = "0.7" , features = [ "mysql" ] } sqlx = { version = "0.7" , features = [ "sqlite" ] } sqlx = { version = "0.7" , features = [ "any" ] }
以及一些其他的关于数据类型的特性等,比如chrono、uuid、time、bstr、bigdecimal、rust_decimal、ipnetwork等。
derive支持derive类型的宏,如FromRow, Type, Encode, Decode.
macros 增加了对 query*! 宏的支持,该宏允许进行编译时检查的查询。
一个简单的sqlx示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 use sqlx::postgres::PgPoolOptions;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = MySqlPoolOptions::new () .max_connections (5 ) .connect ("mysql://root:password@localhost/test" ).await ?; let row : (i64 ,) = sqlx::query_as ("SELECT ?" ) .bind (150_i64 ) .fetch_one (&pool).await ?; assert_eq! (row.0 , 150 ); Ok (()) }
连接数据库 sqlx支持多种不同的方式来连接数据库。最常见和推荐的是使用连接池。
建立连接池 连接池可以显著提高应用程序的性能和并发能力。通过重用连接,减少了建立新连接的开销。
1 2 3 4 5 6 7 8 9 10 11 use sqlx::postgres::PgPoolOptions;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPoolOptions::new () .max_connections (5 ) .connect ("postgres://postgres:@localhost" ) .await ?; Ok (()) }
上面的代码创建了一个最大5个连接的PostgreSQL连接池。PgPoolOptions提供了各种配置选项。
比如你可以不通过dsn字符串,而是通过方法进行用户名和密码设置:
1 2 3 4 5 6 7 8 let conn = PgConnectOptions::new () .host ("secret-host" ) .port (2525 ) .username ("secret-user" ) .password ("secret-password" ) .ssl_mode (PgSslMode::Require) .connect () .await ?;
甚至可以在解析dsn字符串后再修改特定的参数,如下面的mysql示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 use sqlx::{Connection, ConnectOptions};use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlSslMode};let conn = MySqlConnection::connect ("mysql://root:password@localhost/db" ).await ?;let conn = MySqlConnectOptions::new () .host ("localhost" ) .username ("root" ) .password ("password" ) .database ("db" ) .connect ().await ?; let mut opts : MySqlConnectOptions = "mysql://root:password@localhost/db" .parse ()?;opts.log_statements (log::LevelFilter::Trace); let pool = MySqlPool::connect_with (&opts).await ?;
单个连接 有时您可能只需要一个简单的单连接,而不需要连接池。
1 2 3 4 5 6 7 8 9 10 use sqlx::postgres::PgConnOptions;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let conn = PgConnOptions::new () .connect ("postgres://postgres:@localhost" ) .await ?; Ok (()) }
查询 在 SQL 中,查询可以分为预编译(参数化)或未预编译(简单)的。预编译查询会缓存其查询计划,使用二进制通信模式(降低带宽和更快的解码速度),并利用参数来避免 SQL 注入。未预编译的查询是简单的,并且仅用于无法使用预编译语句的情况,例如各种数据库命令(如 PRAGMA、SET 或 BEGIN)。
sqlx 支持使用这两种类型的查询进行所有操作。在 sqlx 中,一个 &str 被当作未预编译的查询来处理,而 Query 或 QueryAs 结构体被当作预编译的查询来处理。
在其他语言中,预编译就是 prepared statement,未预编译就是 unprepared statement。
1 2 3 conn.execute ("BEGIN" ).await ?; conn.execute (sqlx::query ("DELETE FROM table" )).await ?;
我们应该尽可能使用高级查询接口。为了使这更加容易,这些类型上有终结器(finalizers),这样就不需要使用执行器(executor)来包装它们。 换句话说,sqlx 提供了高级查询接口,这些接口使得与数据库的交互更加简洁和直观。这些接口被设计为可以独立工作,而不需要显式地创建一个执行器对象来执行查询。终结器(在这里可能指的是一些内部机制或方法)确保了这些高级接口在使用后可以正确地清理和关闭相关资源,从而简化了开发者的工作。
1 2 sqlx::query ("DELETE FROM table" ).execute (&mut conn).await ?; sqlx::query ("DELETE FROM table" ).execute (&pool).await ?;
在 sqlx 中,执行查询(execute)的终结器会返回受影响的行数(如果有的话),并丢弃所有接收到的结果。此外,还提供了 fetch、fetch_one、fetch_optional 和 fetch_all 方法来接收结果。
sqlx::query 返回的 Query 类型, 它会从数据库中返回 Row<'conn>。可以使用 row.get() 方法通过索引或名称访问列值。由于 Row 保持了对连接的不可变借用,因此一次只能存在一个 Row。
fetch 查询的终结器返回一个类似流的类型,该类型会遍历结果集中的行。你可以通过迭代这个流来访问每一行数据。这通常用于处理查询结果集中有多行数据的情况。
1 2 3 4 5 6 7 8 9 10 11 use futures::TryStreamExt;let mut rows = sqlx::query ("SELECT * FROM users WHERE email = ?" ) .bind (email) .fetch (&mut conn); while let Some (row) = rows.try_next ().await ? { let email : &str = row.try_get ("email" )?; }
为了将row映射到领域类型,可以使用以下两种模式之一:
手工映射
1 2 3 4 5 let mut stream = sqlx::query ("SELECT * FROM users" ) .map (|row: PgRow| { }) .fetch (&mut conn);
使用query_as和bind方法
1 2 3 4 5 6 7 #[derive(sqlx::FromRow)] struct User { name: String , id: i64 }let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?" ) .bind (user_email) .bind (user_name) .fetch (&mut conn);
除了使用类似流的类型来遍历结果集之外(fetch),我们还可以使用 fetch_one 或 fetch_optional 来从数据库中请求一个必需的或可选的结果。
fetch_one: 这个方法会尝试从结果集中获取第一行数据。如果结果集为空(即没有数据),那么 fetch_one 通常会返回一个错误。这个方法适用于你期望查询结果只有一行数据的情况。
fetch_optional: 这个方法类似于 fetch_one,但它返回一个可选的结果(Option<Row> 或 Option<T>,如果使用了类型映射)。如果结果集为空,它将返回 None 而不是错误。这使得它在处理可能返回零行或多行数据的查询时更加灵活,但你又只关心第一行数据(如果存在)的情况下特别有用。
使用这两个方法可以帮助你更直接地处理那些只返回单个结果(或可能不返回结果)的查询
原生查询和参数化查询 sqlx支持执行原生SQL查询,也支持使用绑定参数进行参数化查询,后者有助于防止SQL注入攻击。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let name : String = sqlx::query_scalar ("SELECT name FROM users WHERE id = 1" ) .fetch_one (&pool) .await ?; let count : (i64 ,) = sqlx::query_as ("SELECT COUNT(*) FROM users WHERE email LIKE $1" ) .bind ("%@example.com" ) .fetch_one (&pool) .await ?; Ok (()) }
流式查询 sqlx支持流式查询,这意味着你可以在查询结果返回时立即处理它们,而不需要等待整个结果集加载完毕。这对于处理大量数据或需要实时处理数据的情况非常有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 use sqlx::postgres::PgRow;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let mut rows = sqlx::query ("SELECT id, name FROM users" ) .fetch (&pool); while let Some (row) = rows.try_next ().await ? { let id : i32 = row.try_get (0 )?; let name : &str = row.try_get (1 )?; println! ("{} {}" , id, name); } Ok (()) }
查询结果映射到Rust数据结构 最常见的查询方式是将结果映射到一个Rust数据结构,比如结构体或元组结构体。sqlx会自动将数据库列映射到结构体字段。
比如下面这个例子是查询一个用户的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 use sqlx::FromRow;#[derive(FromRow)] struct User { id: i32 , name: String , email: String , } #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let user = sqlx::query_as::<_, User>("SELECT id, name, email FROM users WHERE id = $1" ) .bind (42 ) .fetch_one (&pool) .await ?; println! ("{:?}" , user); Ok (()) }
又比如下面这个例子查询一组书籍的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 use sqlx::postgres::PgPool;use sqlx::FromRow;#[derive(FromRow)] struct Book { id: i32 , title: String , author: String , } #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let books = sqlx::query_as::<_, Book>("SELECT * FROM books" ) .fetch_all (&pool) .await ?; for book in books { println! ("{} - {} ({})" , book.id, book.title, book.author); } Ok (()) }
执行语句 除了查询,sqlx还支持执行其他SQL语句,如INSERT、UPDATE和DELETE等。它提供了多种执行这些语句的方法,包括支持事务。
执行语句 最简单的执行语句方式是使用execute函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let inserted_rows = sqlx::query ("INSERT INTO users (name, email) VALUES ($1, $2)" ) .bind ("User1" ).bind ("user1@example.com" ) .execute (&pool) .await ? .rows_affected (); println! ("Inserted {} rows" , inserted_rows); Ok (()) }
上面这个例子是插入一行数据到users表中,并打印出插入的行数。
不要被sqlx::query这个名字所误导,它不仅仅用于查询,还可以用于执行其他SQL语句。
下面是一个插入/更新/删除数据的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 use sqlx::postgres::PgPool;#[derive(Debug)] struct User { id: i32 , name: String , email: String , } #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let user = User { id: 0 , name: "NewUser" .into (), email: "new@example.com" .into () }; let id = sqlx::query ("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id" ) .bind (&user.name).bind (&user.email) .fetch_one (&pool) .await ? .get (0 ); println! ("Inserted user with id: {}" , id); let updated_rows = sqlx::query ("UPDATE users SET email=$1 WHERE id=$2" ) .bind ("updated@example.com" ).bind (id) .execute (&pool) .await ? .rows_affected (); println! ("Updated {} rows" , updated_rows); let deleted_rows = sqlx::query ("DELETE FROM users WHERE id=$1" ) .bind (id) .execute (&pool) .await ? .rows_affected (); println! ("Deleted {} rows" , deleted_rows); Ok (()) }
事务 sqlx支持事务,你可以使用transaction方法来执行一个事务。 要执行多个语句作为一个原子事务,您可以使用begin、commit和rollback函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let mut tx = pool.begin ().await ?; sqlx::query ("UPDATE users SET email=$1 WHERE id=$2" ) .bind ("new@email.com" ).bind (42 ) .execute (&mut tx) .await ?; sqlx::query ("DELETE FROM users WHERE id=$1" ) .bind (43 ) .execute (&mut tx) .await ?; tx.commit ().await ?; Ok (()) }
上面的示例首先开始一个新事务,然后执行两个语句,最后提交事务。如果中间任何一步失败,可以调用rollback回滚整个事务。
面是一个使用sqlx中事务和回滚(rollback)的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 use sqlx::postgres::PgPool;use sqlx::Error;#[tokio::main] async fn main () -> Result <(), Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let mut transaction = pool.begin ().await ?; sqlx::query ("UPDATE accounts SET balance = balance - $1 WHERE id = $2" ) .bind (100.0 ) .bind (1 ) .execute (&mut transaction) .await ?; sqlx::query ("UPDATE accounts SET balance = balance + $1 WHERE id = $2" ) .bind (100.0 ) .bind (2 ) .execute (&mut transaction) .await ?; if should_rollback () { transaction.rollback ().await ?; println! ("Transaction rolled back" ); } else { transaction.commit ().await ?; println! ("Transaction committed" ); } Ok (()) } fn should_rollback () -> bool { rand::thread_rng ().gen_bool (0.5 ) }
在这个示例中,我们首先使用pool.begin()开始一个新的事务。然后,我们执行两个查询,分别从一个账户扣除100元,并将这100元转账到另一个账户。 接下来,我们调用should_rollback()函数来模拟一个错误情况。如果should_rollback()返回true,我们就调用transaction.rollback().await?来回滚整个事务。否则,我们调用transaction.commit().await?来提交事务。
在真实情况下,您可能会在遇到某些异常或错误时触发回滚,例如:
违反了某些业务规则或数据完整性约束
发生了意外的异常或错误
用户取消或中断了操作
出于某些原因,整个事务需要被回滚
通过使用事务和回滚,您可以确保数据库中的更改要么全部成功,要么完全回滚,从而保持数据的一致性和完整性。这对于处理敏感操作或需要多个步骤的复杂操作非常重要。
连接池和并发 sqlx内置了连接池支持,这使得它天生就支持高效的并发查询。通过连接池,可以避免为每个查询创建新连接的开销。
连接池管理 sqlx中的连接池由PgPool之类的类型表示。您可以直接创建一个连接池实例,也可以使用PgPoolOptions来定制配置:
1 2 3 4 5 6 use sqlx::postgres::PgPoolOptions;let pool = PgPoolOptions::new () .max_connections (10 ) .connect ("postgres://postgres:@localhost" ) .await ?;
上面的代码创建了一个最大连接数为10的PostgreSQL连接池。PgPoolOptions提供了各种配置选项,如最大连接数、最小连接数、连接超时等。
并发查询 由于sqlx内置了连接池,因此并发查询变得非常简单。你只需要在多个异步任务中并行执行查询即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 use sqlx::postgres::PgPool;use std::time::Instant;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let tasks = (0 ..10 ) .map (|_| { let pool = pool.clone (); tokio::spawn (async move { let now = Instant::now (); let _ = sqlx::query ("SELECT pg_sleep(1)" ).execute (&pool).await ; println! ("Task completed in {:?}" , now.elapsed ()); }) }) .collect::<Vec <_>>(); for task in tasks { task.await ?; } Ok (()) }
上面的代码创建了一个包含10个任务的并发查询。每个任务都会执行一个简单的查询,然后打印出执行时间。通过并发查询,您可以同时执行多个查询,从而提高查询效率。
下面是一个更实际的示例,模拟了并发处理多个Web请求的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 use sqlx::postgres::{PgPool, PgRow};use std::io;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let requests = vec! [ "SELECT * FROM users WHERE id = $1" , "SELECT * FROM products WHERE category = $1" , "SELECT * FROM orders WHERE user_id = $1" , ]; for request in requests { let rows = sqlx::query (request) .bind (42 ) .fetch_all (&pool) .await ?; for row in rows { print_row (row); } println! (); } Ok (()) } fn print_row (row: PgRow) { let cols = row.columns (); let values : Vec <&str > = row.get_refs (cols).into_iter ().map (|v| v.unwrap ()).collect (); println! ("{}" , values.join (", " )); }
在这个示例中,我们模拟了处理多个Web请求的场景。我们定义了一个包含多个查询的请求列表,然后并发执行这些查询。每个查询都会返回一组行数据,我们将这些行数据打印出来。通过并发查询,我们可以同时处理多个请求,从而提高系统的性能和效率。
JSON支持 现代数据库广泛支持 JSON 数据类型,sqlx也为此提供了非常好的支持。您可以方便地查询JSON类型以及将查询结果映射为 JSON。
查询 JSON 类型 在数据库中,JSON 类型通常被存储为文本。sqlx允许你直接查询和处理 JSON 数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let json_data : serde_json::Value = sqlx::query (r#" SELECT '[{"id": 1, "name": "Product 1"}, {"id": 2, "name": "Product 2"}]'::json "# ) .fetch_one (&pool) .await ? .get (0 ); println! ("{:?}" , json_data); Ok (()) }
这个例子查询了一个JSON数组,并将其直接映射为 serde_json::Value。
将查询结果映射为JSON 您还可以将常规的查询结果映射为JSON格式。这对于构建API或与前端交互非常有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 use serde::{Serialize, Deserialize};#[derive(Deserialize, Serialize)] struct Product { id: i32 , name: String , price: f64 , } #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let products : Vec <Product> = sqlx::query_as ( "SELECT id, name, price FROM products" ) .fetch_all (&pool) .await ?; let json = serde_json::to_string (&products)?; println! ("{}" , json); Ok (()) }
这个例子查询了产品列表,并使用serde_json将其序列化为JSON格式。
使用 PostgreSQL 的 JSON 类型 这是一个更全面的示例,展示了如何在 PostgreSQL 中使用 JSON 类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 use serde::{Deserialize, Serialize};use sqlx::postgres::PgPool;use sqlx::types::JsonValue;#[derive(Serialize, Deserialize)] struct User { id: i32 , name: String , profile: JsonValue, } #[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let profile = serde_json::json!({ "bio" : "Software Engineer" , "interests" : ["coding" , "reading" ] }); let user = User { id: 0 , name: "NewUser" .into (), profile: profile.into (), }; let id = sqlx::query ("INSERT INTO users (name, profile) VALUES ($1, $2) RETURNING id" ) .bind (&user.name) .bind (&user.profile) .fetch_one (&pool) .await ? .get (0 ); let user : User = sqlx::query_as ("SELECT id, name, profile FROM users WHERE id = $1" ) .bind (id) .fetch_one (&pool) .await ?; println! ("{:?}" , user); Ok (()) }
在这个例子中,我们首先使用serde_json创建了一个JSON 值,作为用户配置文件。然后,我们将这个JSON 值 插入到数据库中。最后,我们查询用户并将配置文件作为 JsonValue 类型获取。
通知和监听 sqlx提供了在数据库中监听通知(NOTIFY/LISTEN)的功能,这使得构建基于事件的、实时应用程序成为可能。
数据库通知 数据库通知是一种机制,允许应用程序在数据库中发生某些事件时接收通知。这种功能在构建基于事件的系统(如聊天应用程序或实时仪表板)时非常有用。
1 2 -- 在数据库中触发通知 NOTIFY channel_name, 'hell o';
使用监听器 sqlx通过DatabaseNotification结构体来表示接收到的通知。您可以在应用程序中设置一个监听器,以接收并处理这些通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 use sqlx::postgres::PgListener;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let listener = PgListener::bind ("postgresql://localhost/" ).await ?; listener.listen ("channel_name" ).await ?; loop { let notification = listener.recv ().await ?; println! ( "Received notification: {} ({})" , notification.payload, notification.payload_pretty (), ); } }
上面的示例创建了一个PostgreSQL监听器,并开始监听名为channel_name的通道。当接收到通知时,它会打印出通知的有效负载。
这是一个更完整的示例,展示了如何在PostgreSQL中设置通知并在应用程序中监听它们:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 use sqlx::postgres::{PgPool, PgListener};#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await ?; let listener = PgListener::bind ("postgresql://localhost/" ).await ?; sqlx::query ("LISTEN channel_name" ).execute (&pool).await ?; sqlx::query ("NOTIFY channel_name, 'hello'" ).execute (&pool).await ?; if let Some (notification) = listener.recv ().await ? { println! ("Received notification: {}" , notification.payload); } Ok (()) }
在这个例子中,我们首先创建了一个PostgreSQL监听器,并在数据库中设置了一个名为channel_name的通知通道。然后,我们在另一个连接上触发了一个通知。最后,监听器接收到通知并打印出了它的有效负载。
测试 编写测试对于任何健壮的软件系统都是必不可少的,sqlx也不例外。幸运的是,sqlx提供了多种方式来测试与数据库交互的代码。
测试连接 最基本的测试是确保您的应用程序能够成功连接到数据库。您可以使用sqlx提供的try_connect函数进行测试:
1 2 3 4 5 6 7 use sqlx::PgPool;#[tokio::test] async fn test_connection () { let pool = PgPool::try_connect ("postgres://postgres:@localhost" ).await .unwrap (); }
测试查询 您还可以测试查询,以确保它们能够正确地执行并返回预期的结果。您可以使用query和query_as函数来测试查询:
1 2 3 4 5 6 7 8 9 10 11 12 13 use sqlx::PgPool;#[tokio::test] async fn test_query () { let pool = PgPool::connect ("postgres://postgres:@localhost" ).await .unwrap (); let row : (i64 ,) = sqlx::query_as ("SELECT 1" ) .fetch_one (&pool) .await .unwrap (); assert_eq! (row.0 , 1 ); }
使用内存数据库 sqlx支持使用内存数据库进行测试,例如SQLite内存数据库。这种方式快速、轻量,非常适合单元测试。
1 2 3 4 5 #[tokio::test] async fn test_query () { let pool = sqlx::SqlitePool::connect (":memory:" ).await .unwrap (); }
对于更全面的集成测试,您可以在测试用例中创建一个临时的测试数据库,执行所需的操作,然后在测试结束时清理该数据库。这种方式更接近真实的生产环境。
使用mock数据库 如msql-srv、opensrv-clickhouse、opensrv-mysql、
下面是一个使用集成测试数据库进行测试的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 use sqlx::postgres::{PgPool, PgRow};#[tokio::test] async fn test_user_operations () { let pool = create_test_pool ().await ; sqlx::query ("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT)" ) .execute (&pool) .await .unwrap (); let name = "Test User" .to_owned (); let email = "test@example.com" .to_owned (); let id = insert_user (&pool, &name, &email).await ; let row : PgRow = sqlx::query_as ("SELECT id, name, email FROM users WHERE id = $1" ) .bind (id) .fetch_one (&pool) .await .unwrap (); assert_eq! (row.get::<i32 , _>(0 ), id); assert_eq! (row.get::<String , _>(1 ), name); assert_eq! (row.get::<String , _>(2 ), email); } async fn insert_user (pool: &PgPool, name: &str , email: &str ) -> i32 { sqlx::query ("INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id" ) .bind (name) .bind (email) .fetch_one (pool) .await .unwrap () .get (0 ) } async fn create_test_pool () -> PgPool { let db_name = "test_database" ; let pool = PgPool::connect (&format! ("postgres://postgres:@localhost/{}" , db_name)) .await .unwrap (); sqlx::query (&format! ("DROP DATABASE IF EXISTS {}" , db_name)) .execute (&pool) .await .unwrap (); sqlx::query (&format! ("CREATE DATABASE {}" , db_name)) .execute (&pool) .await .unwrap (); pool }
在这个示例中,我们首先创建了一个专用的测试数据库。然后我们在这个数据库中创建了一个users表,并进行了插入、查询等操作,最后验证了查询结果。
高级主题 除了基础功能外,sqlx还提供了一些高级功能,如自定义类型映射、编译时检查和性能分析等,可以进一步提高您的生产力和应用程序的性能。
自定义类型映射 sqlx允许您定义自定义的数据类型映射规则,将数据库中的数据类型映射到Rust中的类型。这对于处理一些特殊的数据类型或实现自定义的逻辑非常有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 use sqlx::types::Type;use sqlx::postgres::{PgTypeInfo, PgValueRef};struct MyType (String );impl Type <PgTypeInfo> for MyType { fn type_info () -> PgTypeInfo { PgTypeInfo::with_name ("mytype" ) } fn readable_name () -> String { "MyType" .into () } } impl <'r > PgValueRef<'r > for MyType { fn from_pg_value (value: Option <&'r [u8 ]>) -> Option <MyType> { value.map (|bytes| MyType (String ::from_utf8_lossy (bytes).into_owned ())) } fn to_pg_value (&self ) -> Option <Vec <u8 >> { Some (self .0 .as_bytes ().to_vec ()) } }
在这个例子中,我们定义了一个名为MyType的自定义数据类型,并实现了Type和PgValueRef trait。这样,我们就可以将数据库中的mytype类型映射到Rust中的MyType类型。
编译时检查 sqlx提供了一些宏和编译时检查功能,可以在编译时捕获一些错误,而不是在运行时才发现。这有助于提高代码质量和安全性。
1 2 3 4 5 6 7 8 9 10 11 use sqlx::query;#[rustfmt::skip] let query = query!( " SELECT id, name, email FROM users WHERE id = ? " , 42 );
上面的query!宏可以在编译时检查SQL语句的语法错误,并验证绑定参数的数量和类型。这样可以避免在运行时才发现这些问题。
类似的宏还有query_as!、query_scalar!、query_file!、query_file!、query_file_scalar!以及它们的变种query_xxx_unchecked!。
执行时间 你可以通过计时来分析查询的性能,并根据结果进行优化。sqlx提供了一些工具来帮助您分析查询的性能,如log_statements、log_slow_statements等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 use sqlx::query;use sqlx::postgres::PgQueryAs;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = let query = query_as!( User, r#" SELECT id, name, email FROM users WHERE id = $1 "# ); for attempt in 0 ..5 { let time = std::time::Instant::now (); let _users : Vec <User> = query.bind (42 ).fetch_all (&pool).await ?; let elapsed = time.elapsed (); println! ("Query attempt {attempt} took: {elapsed:?}" ); } Ok (()) }
打印日志 ConnectOptions提供了两个设置日志的方法:
log_statements: 使用指定的级别打印执行语句
log_slow_statements: 使用指定的级别打印执行时间超过指定阈值的SQL语句。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 use sqlx::postgres::PgPoolOptions;#[tokio::main] async fn main () -> Result <(), sqlx::Error> { let pool = PgPoolOptions::new () .max_connections (5 ) .log_statements (log::LevelFilter::Debug ) .log_slow_statements (log::LevelFilter::Warn, std::time::Duration::from_millis (100 )) .connect ("postgres://postgres:@localhost" ) .await ?; let row : (i64 ,) = sqlx::query_as ("SELECT 42" ) .fetch_one (&pool) .await ?; println! ("Result: {}" , row.0 ); Ok (()) }
最佳实践和故障排除 再啰嗦几句。
在使用sqlx时,遵循一些最佳实践可以帮助您编写更加安全、高效和可维护的代码。此外,掌握一些常见错误和故障排除技巧也很有帮助。
sqlx最佳实践
使用参数化查询: 始终使用带参数的查询,而不是字符串插值。这可以防止SQL注入攻击。
监控连接池指标: 监控连接池的指标,如活跃连接数、获取连接等待时间等,以确保连接池配置正确。
避免ORM: sqlx是一个查询构建器,而不是完整的对象关系映射(ORM)库。尽量避免在sqlx中复制ORM功能。
使用流式查询: 对于大型查询结果集,使用流式查询可以避免一次性加载所有数据到内存中。
利用编译时检查: 使用sqlx提供的query!和query_as!宏,可以在编译时捕获SQL语法错误和类型不匹配等问题。
测试覆盖: 为您的数据库交互代码编写单元测试和集成测试,以确保正确性和稳定性。
常见错误和故障排除
连接池耗尽: 如果出现"连接池耗尽"错误,可能是因为并发请求过多或连接池配置不当导致的。检查连接池指标并适当调整max_connections。
死锁: 在事务中执行多个查询时,可能会遇到死锁情况。确保正确使用事务,并实现重试逻辑。
类型不匹配: 如果遇到"无法将PostgreSQL类型映射到Rust类型"之类的错误,检查您的结构体字段类型是否与数据库列类型匹配。
SQL语法错误: 如果出现SQL语法错误,首先检查您是否使用了参数化查询。如果使用了query!宏,也可能是宏解析出现了问题。
查询性能差: 如果查询性能较差,可以使用sqlx提供的查询追踪功能分析查询执行情况,并优化慢查询。如果频繁创建连接,检查连接池配置是否合理,比如min_connections是否过小
生产就绪建议
启用日志记录: 在生产环境中合理启用sqlx的日志记录,以便更好地调试和监控应用程序。
监控指标: 监控数据库和连接池指标,如查询执行时间、错误率、连接池利用率等。
进行负载测试: 在部署之前,对您的应用程序进行全面的负载测试,以确保其能够在生产环境中良好运行。
实施安全最佳实践: 遵循安全最佳实践,如使用参数化查询、限制数据库权限、加密敏感数据等。
准备故障转移计划: 制定数据库故障转移计划,以确保应用程序在数据库出现故障时能够正常运行。
持续集成和交付: 将sqlx集成测试纳入您的持续集成和交付流程,以确保代码质量。
sqlx生态 有一些其他数据库的扩展和支持,比如sqlx-rxqlite、sqlx-clickhouse-ext。
sqlx-crud提供常见的数据库操作的CRUD操作的derive宏:
1 2 3 4 5 6 7 8 9 10 11 12 use sqlx::FromRow;use sqlx_crud::SqlxCrud;#[derive(Debug, FromRow, SqlxCrud)] struct User { user_id: i32 , name: String , } if let Some (user) = User::by_id (&pool, 42 ) { println! ("Found user user_id=42: {:?}" , user); }
sqlx-error提供了对sqlx::Error的包装。
当然还有一些其他的库,不过当前关注度还不是很高。