### Connect and Execute Simple Queries with pgwire Client in Rust Source: https://context7.com/sunng87/pgwire/llms.txt This Rust code snippet demonstrates how to use the `pgwire` client to establish a connection to a PostgreSQL server and execute simple queries. It configures the client, handles server parameters, and processes query results, including iterating over data rows. Dependencies include `pgwire` and `tokio`. ```rust use std::sync::Arc; use pgwire::api::client::auth::DefaultStartupHandler; use pgwire::api::client::query::DefaultSimpleQueryHandler; use pgwire::api::client::ClientInfo; use pgwire::tokio::client::PgWireClient; #[tokio::main] async fn main() -> Result<(), Box> { let config = Arc::new( "host=127.0.0.1 port=5432 user=myuser dbname=mydb password=mypass" .parse() .unwrap(), ); let startup_handler = DefaultStartupHandler::new(); let mut client = PgWireClient::connect(config, startup_handler, None).await?; println!("Connected. Server parameters: {:?}", client.server_parameters()); let query_handler = DefaultSimpleQueryHandler::new(); let mut results = client .simple_query(query_handler, "SELECT id, name FROM users WHERE id < 10") .await?; if let Some(result) = results.pop() { let mut reader = result.into_data_rows_reader(); while let Some(mut row) = reader.next_row() { let id: Option = row.next_value(); let name: Option = row.next_value(); println!("ID: {:?}, Name: {:?}", id, name); } } Ok(()) } ``` -------------------------------- ### Manage Transaction State in Rust using pgwire Source: https://context7.com/sunng87/pgwire/llms.txt This Rust code illustrates how to manage transaction states (BEGIN, COMMIT, ROLLBACK) at the wire protocol level using `pgwire`. It implements the `SimpleQueryHandler` to intercept and respond to transaction-related commands, allowing for custom transaction logic. Dependencies include `pgwire` and `async-trait`. ```rust use pgwire::api::query::SimpleQueryHandler; use pgwire::api::results::{Response, Tag}; use pgwire::error::PgWireResult; #[async_trait::async_trait] impl SimpleQueryHandler for MyTransactionHandler { async fn do_query(&self, _client: &mut C, query: &str) -> PgWireResult> where C: pgwire::api::ClientInfo + Unpin + Send + Sync, { let upper_query = query.to_uppercase(); if upper_query.starts_with("BEGIN") || upper_query.starts_with("START TRANSACTION") { Ok(vec![Response::TransactionStart(Tag::new("BEGIN"))]) } else if upper_query.starts_with("COMMIT") { // Perform commit logic Ok(vec![Response::TransactionEnd(Tag::new("COMMIT"))]) } else if upper_query.starts_with("ROLLBACK") { // Perform rollback logic Ok(vec![Response::TransactionEnd(Tag::new("ROLLBACK"))]) } else if upper_query.starts_with("SELECT") { // Execute query within transaction // Transaction state is tracked automatically by pgwire Ok(vec![Response::Execution(Tag::new("SELECT").with_rows(0))]) } else { Ok(vec![Response::Execution(Tag::new("OK"))]) } } } struct MyTransactionHandler; ``` -------------------------------- ### Rust: Implement Simple Query Handler for PostgreSQL Server Source: https://context7.com/sunng87/pgwire/llms.txt This Rust code demonstrates how to implement a simple query handler for a PostgreSQL server using the pgwire library. It defines a custom handler that processes text-based SELECT queries, returning schema and data rows, or an 'OK' tag for other commands. Dependencies include `pgwire`, `async-trait`, and `tokio`. ```Rust use std::sync::Arc; use async_trait::async_trait; use futures::stream; use tokio::net::TcpListener; use pgwire::api::auth::StartupHandler; use pgwire::api::query::SimpleQueryHandler; use pgwire::api::results::{DataRowEncoder, FieldInfo, QueryResponse, Response, Tag}; use pgwire::api::{ClientInfo, PgWireServerHandlers, Type}; use pgwire::error::PgWireResult; use pgwire::tokio::process_socket; struct MyQueryHandler; #[async_trait] impl SimpleQueryHandler for MyQueryHandler { async fn do_query(&self, _client: &mut C, query: &str) -> PgWireResult> where C: ClientInfo + Unpin + Send + Sync, { if query.to_uppercase().starts_with("SELECT") { let schema = Arc::new(vec![ FieldInfo::new("id".into(), None, None, Type::INT4, pgwire::api::results::FieldFormat::Text), FieldInfo::new("name".into(), None, None, Type::VARCHAR, pgwire::api::results::FieldFormat::Text), ]); let mut encoder = DataRowEncoder::new(schema.clone()); encoder.encode_field(&1i32).unwrap(); encoder.encode_field(&"Alice").unwrap(); let row1 = encoder.take_row(); encoder.encode_field(&2i32).unwrap(); encoder.encode_field(&"Bob").unwrap(); let row2 = encoder.take_row(); let data_rows = stream::iter(vec![Ok(row1), Ok(row2)]); Ok(vec![Response::Query(QueryResponse::new(schema, data_rows))]) } else { Ok(vec![Response::Execution(Tag::new("OK").with_rows(1))]) } } } impl PgWireServerHandlers for MyQueryHandler { fn simple_query_handler(&self) -> Arc { Arc::new(MyQueryHandler) } fn startup_handler(&self) -> Arc { Arc::new(MyQueryHandler) } } #[async_trait] impl StartupHandler for MyQueryHandler { async fn on_startup( &self, _client: &mut C, _message: pgwire::messages::PgWireFrontendMessage, ) -> PgWireResult<()> { Ok(()) } } #[tokio::main] async fn main() { let handler = Arc::new(MyQueryHandler); let listener = TcpListener::bind("127.0.0.1:5432").await.unwrap(); loop { let (socket, _) = listener.accept().await.unwrap(); let handler = handler.clone(); tokio::spawn(async move { process_socket(socket, None, handler).await }); } } ``` -------------------------------- ### Rust Extended Query Handler for Prepared Statements Source: https://context7.com/sunng87/pgwire/llms.txt Implements the Extended Query Handler for pgwire, enabling prepared statements with parameter binding. It defines how to process queries, describe statements and portals, and handle parameter extraction and encoding for extended query operations. Requires `pgwire`, `async-trait`, and `futures` crates. ```rust use std::sync::Arc; use async_trait::async_trait; use futures::stream; use pgwire::api::portal::{Format, Portal}; use pgwire::api::query::ExtendedQueryHandler; use pgwire::api::results::{DataRowEncoder, DescribePortalResponse, DescribeStatementResponse, FieldInfo, QueryResponse, Response}; use pgwire::api::stmt::{NoopQueryParser, StoredStatement}; use pgwire::api::{ClientInfo, Type}; use pgwire::error::PgWireResult; struct MyExtendedQueryHandler { query_parser: Arc, } #[async_trait] impl ExtendedQueryHandler for MyExtendedQueryHandler { type Statement = String; type QueryParser = NoopQueryParser; fn query_parser(&self) -> Arc { self.query_parser.clone() } async fn do_query( &self, _client: &mut C, portal: &Portal, _max_rows: usize, ) -> PgWireResult where C: ClientInfo + Unpin + Send + Sync, { let query = &portal.statement.statement; if query.contains("$1") { // Extract parameter value let id_param = portal.parameter::(0, &Type::INT4)?; let schema = Arc::new(vec![ FieldInfo::new("id".into(), None, None, Type::INT4, pgwire::api::results::FieldFormat::Binary), FieldInfo::new("value".into(), None, None, Type::VARCHAR, pgwire::api::results::FieldFormat::Text), ]); let mut encoder = DataRowEncoder::new(schema.clone()); encoder.encode_field(&id_param).unwrap(); encoder.encode_field(&format!("Value for id {}", id_param)).unwrap(); let row = encoder.take_row(); let data_rows = stream::iter(vec![Ok(row)]); Ok(Response::Query(QueryResponse::new(schema, data_rows))) } else { Ok(Response::Execution( pgwire::api::results::Tag::new("SELECT").with_rows(0) )) } } async fn do_describe_statement( &self, _client: &mut C, _target: &StoredStatement, ) -> PgWireResult where C: ClientInfo + Unpin + Send + Sync, { let param_types = vec![Type::INT4]; let fields = vec![ FieldInfo::new("id".into(), None, None, Type::INT4, pgwire::api::results::FieldFormat::Binary), FieldInfo::new("value".into(), None, None, Type::VARCHAR, pgwire::api::results::FieldFormat::Text), ]; Ok(DescribeStatementResponse::new(param_types, fields)) } async fn do_describe_portal( &self, _client: &mut C, _portal: &Portal, ) -> PgWireResult where C: ClientInfo + Unpin + Send + Sync, { let fields = vec![ FieldInfo::new("id".into(), None, None, Type::INT4, pgwire::api::results::FieldFormat::Binary), FieldInfo::new("value".into(), None, None, Type::VARCHAR, pgwire::api::results::FieldFormat::Text), ]; Ok(DescribePortalResponse::new(fields)) } } ``` -------------------------------- ### Rust MD5 Password Authentication with Custom Source Source: https://context7.com/sunng87/pgwire/llms.txt Configures MD5 password-based authentication for pgwire using a custom authentication source. It demonstrates how to implement the `AuthSource` trait to provide passwords based on username, including hashing with a salt. This requires `pgwire` and `async-trait` crates. ```rust use std::sync::Arc; use async_trait::async_trait; use pgwire::api::auth::md5pass::{hash_md5_password, Md5PasswordAuthStartupHandler}; use pgwire::api::auth::{AuthSource, DefaultServerParameterProvider, LoginInfo, Password}; use pgwire::error::PgWireResult; #[derive(Debug)] struct MyAuthSource; #[async_trait] impl AuthSource for MyAuthSource { async fn get_password(&self, login_info: &LoginInfo) -> PgWireResult { let salt = vec![0x01, 0x02, 0x03, 0x04]; let username = login_info.user().unwrap_or("default"); // In production, fetch password from database let cleartext_password = match username { "admin" => "admin123", "user" => "user456", _ => "defaultpass", }; let hashed = hash_md5_password(username, cleartext_password, &salt); Ok(Password::new(Some(salt), hashed.as_bytes().to_vec())) } } fn create_auth_handler() -> Arc> { let auth_source = Arc::new(MyAuthSource); let mut params = DefaultServerParameterProvider::default(); params.server_version = "14.0-mydb-1.0".to_owned(); Arc::new(Md5PasswordAuthStartupHandler::new( auth_source, Arc::new(params), )) } ``` -------------------------------- ### Enable SSL/TLS Support in Rust pgwire Server Source: https://context7.com/sunng87/pgwire/llms.txt Configures a pgwire server to support encrypted connections using SSL/TLS. This involves loading a server certificate and private key, building a `rustls::ServerConfig`, and then passing this configuration to the `process_socket` function. It requires `tokio` for async operations and `rustls` for TLS. ```rust use std::sync::Arc; use tokio::net::TcpListener; use tokio_rustls::rustls::{self, pki_types::PrivateKeyDer}; use pgwire::api::PgWireServerHandlers; use pgwire::tokio::process_socket; #[tokio::main] async fn main() -> Result<(), Box> { // Load certificate and private key let cert_data = std::fs::read("server.crt")?; let key_data = std::fs::read("server.key")?; let certs = rustls_pemfile::certs(&mut cert_data.as_slice()) .collect::, _>>()?; let key = rustls_pemfile::private_key(&mut key_data.as_slice())?; .ok_or("No private key found")?; let config = rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key)?; let tls_config = Arc::new(config); let handler = Arc::new(MyServerHandlers); let listener = TcpListener::bind("127.0.0.1:5432").await?; loop { let (socket, _) = listener.accept().await?; let handler = handler.clone(); let tls_config = tls_config.clone(); tokio::spawn(async move { process_socket(socket, Some(tls_config), handler).await }); } } struct MyServerHandlers; impl PgWireServerHandlers for MyServerHandlers { // Implement handlers... } ``` -------------------------------- ### Handle COPY FROM STDIN for Data Import in Rust Source: https://context7.com/sunng87/pgwire/llms.txt This snippet demonstrates how to implement a CopyHandler in Rust to manage COPY FROM STDIN operations. It handles receiving CopyData messages, buffering them, and processing them upon CopyDone. Error handling for CopyFail is also included. Dependencies include `pgwire`, `async-trait`, and `tokio`. ```rust use std::sync::Arc; use async_trait::async_trait; use pgwire::api::copy::CopyHandler; use pgwire::api::query::SimpleQueryHandler; use pgwire::api::results::{CopyResponse, Response}; use pgwire::api::ClientInfo; use pgwire::error::PgWireResult; use pgwire::messages::copy::{CopyData, CopyDone, CopyFail}; struct MyCopyHandler { buffer: Arc>>, } #[async_trait] impl SimpleQueryHandler for MyCopyHandler { async fn do_query(&self, _client: &mut C, query: &str) -> PgWireResult> where C: ClientInfo + Unpin + Send + Sync, { if query.to_uppercase().contains("COPY") { // Initiate COPY IN mode // format: 0=text, columns: 2, column_formats: [0,0] for text Ok(vec![Response::CopyIn(CopyResponse::new(0, 2, vec![0, 0]))]) } else { Ok(vec![]) } } } #[async_trait] impl CopyHandler for MyCopyHandler { async fn on_copy_data(&self, _client: &mut C, copy_data: CopyData) -> PgWireResult<()> where C: ClientInfo + Unpin + Send + Sync, { let mut buffer = self.buffer.lock().await; buffer.extend_from_slice(©_data.data); println!("Received {} bytes", copy_data.data.len()); Ok(()) } async fn on_copy_done(&self, _client: &mut C, _done: CopyDone) -> PgWireResult<()> where C: ClientInfo + Unpin + Send + Sync, { let buffer = self.buffer.lock().await; println!("Copy complete. Total bytes: {}", buffer.len()); // Process buffered data here Ok(()) } async fn on_copy_fail(&self, _client: &mut C, fail: CopyFail) -> pgwire::error::PgWireError where C: ClientInfo + Unpin + Send + Sync, { println!("Copy failed: {}", fail.message); pgwire::error::PgWireError::UserError(Box::new( pgwire::error::ErrorInfo::new( "ERROR".to_owned(), "XX000".to_owned(), format!("COPY failed: {}", fail.message), ) )) } } ``` -------------------------------- ### Configure SCRAM-SHA-256 Authentication in Rust Source: https://context7.com/sunng87/pgwire/llms.txt Sets up SCRAM-SHA-256 authentication by defining a custom AuthSource. This handler generates salted and hashed passwords for user authentication. In production, these hashed passwords should be securely stored in a database. It uses the `pgwire` crate for SCRAM-SHA-256 implementation. ```rust use std::sync::Arc; use async_trait::async_trait; use pgwire::api::auth::sasl::scram::{SASLScramAuthStartupHandler, ScramSha256}; use pgwire::api::auth::{AuthSource, DefaultServerParameterProvider, LoginInfo, Password}; use pgwire::error::PgWireResult; #[derive(Debug)] struct ScramAuthSource; #[async_trait] impl AuthSource for ScramAuthSource { async fn get_password(&self, login_info: &LoginInfo) -> PgWireResult { let username = login_info.user().unwrap_or("default"); // Return SCRAM-SHA-256 hashed password with salt // In production: store salted+hashed password in database let iterations = 4096; let salt = b"random_salt_bytes"; let cleartext_password = match username { "admin" => "secure_admin_password", _ => "default_password", }; // Hash password with SCRAM-SHA-256 let password_hash = ScramSha256::hash_password( cleartext_password.as_bytes(), salt, iterations, ); Ok(Password::new(Some(salt.to_vec()), password_hash)) } } fn create_scram_handler() -> Arc { let auth_source = Arc::new(ScramAuthSource); let params = Arc::new(DefaultServerParameterProvider::default()); Arc::new(SASLScramAuthStartupHandler::new(auth_source, params)) } ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.