# Apache DataFusion Apache DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. It provides a fast, embeddable SQL and DataFrame API for building custom analytical systems and database platforms. DataFusion features a full query planner, columnar streaming execution engine, and partitioned data sources, enabling developers to quickly bootstrap query capabilities and customize nearly every aspect including data sources, functions, query languages, and custom operators. The engine is designed for high performance with vectorized execution, multi-threading, and efficient memory management. DataFusion supports multiple file formats out of the box (Parquet, CSV, JSON, Avro) and provides both SQL and DataFrame APIs for query construction. Its modular architecture allows developers to extend functionality through custom user-defined functions (UDFs), user-defined aggregate functions (UDAFs), user-defined window functions (UDWFs), and custom data sources, making it ideal for building domain-specific query engines, data pipelines, and new database platforms. ## SQL Query Execution Execute SQL queries against various data sources including in-memory tables, Parquet files, CSV files, and custom data sources. ```rust use datafusion::prelude::*; use datafusion::arrow::array::{UInt8Array, UInt64Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use std::sync::Arc; #[tokio::main] async fn main() -> datafusion::error::Result<()> { // Create a SessionContext let ctx = SessionContext::new(); // Create in-memory data let schema = SchemaRef::new(Schema::new(vec![ Field::new("id", DataType::UInt8, false), Field::new("bank_account", DataType::UInt64, true), ])); let id_array = UInt8Array::from(vec![1, 2, 3]); let account_array = UInt64Array::from(vec![9000, 5000, 7500]); let batch = RecordBatch::try_new( schema.clone(), vec![Arc::new(id_array), Arc::new(account_array)], )?; // Register as a table let mem_table = MemTable::try_new(schema, vec![vec![batch]])?; ctx.register_table("users", Arc::new(mem_table))?; // Execute SQL query let df = ctx.sql("SELECT * FROM users WHERE bank_account > 6000").await?; // Collect and display results let results = df.collect().await?; df.show().await?; Ok(()) } ``` ## DataFrame API for Data Manipulation Programmatically construct queries using a DataFrame API similar to Apache Spark, supporting filtering, selection, aggregation, and joins. ```rust use datafusion::prelude::*; use datafusion::arrow::array::{ArrayRef, StringArray, Int32Array}; use datafusion::arrow::record_batch::RecordBatch; use std::sync::Arc; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); // Create in-memory data using arrays let names: ArrayRef = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "David"])); let ages: ArrayRef = Arc::new(Int32Array::from(vec![25, 30, 35, 40])); let batch = RecordBatch::try_from_iter(vec![("name", names), ("age", ages)])?; // Register the batch as a table ctx.register_batch("people", batch)?; // Use DataFrame API to query let df = ctx.table("people").await? .filter(col("age").gt(lit(30)))? .select_columns(&["name", "age"])?; // Execute and display df.show().await?; // Alternatively, use the dataframe! macro for quick creation let df2 = dataframe!( "product" => ["Laptop", "Phone", "Tablet"], "price" => [1200, 800, 400] )?; df2.show().await?; Ok(()) } ``` ## Reading Parquet Files Read and query Parquet files with automatic schema inference and predicate pushdown for efficient data access. ```rust use datafusion::prelude::*; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use std::sync::Arc; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); // Simple approach: read_parquet with default options let df = ctx.read_parquet( "/path/to/data.parquet", ParquetReadOptions::default() ).await?; // Display schema information df.clone().describe().await?.show().await?; // Query the parquet data df.select_columns(&["id", "name", "timestamp"])? .filter(col("id").gt(lit(100)))? .show() .await?; // Advanced: Register directory with multiple parquet files let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension("parquet"); ctx.register_listing_table( "large_dataset", "file:///data/parquet_files/", listing_options, None, None, ).await?; // Query across all files let results = ctx.sql("SELECT COUNT(*) FROM large_dataset WHERE status = 'active'") .await? .collect() .await?; Ok(()) } ``` ## Reading and Writing CSV Files Process CSV files with custom schema specification and configurable parsing options. ```rust use datafusion::prelude::*; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::config::CsvOptions; use datafusion::common::parsers::CompressionTypeVariant; use datafusion::dataframe::DataFrameWriteOptions; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); // Read CSV with automatic schema inference let df = ctx.read_csv("data.csv", CsvReadOptions::default()).await?; df.show().await?; // Read CSV with explicit schema let schema = Schema::new(vec![ Field::new("id", DataType::Utf8, false), Field::new("timestamp", DataType::Utf8, false), Field::new("value", DataType::Int32, true), Field::new("score", DataType::Float32, true), ]); let csv_options = CsvReadOptions { schema: Some(&schema), has_header: true, delimiter: b',', ..Default::default() }; let df = ctx.read_csv("data.csv", csv_options).await?; // Process and write to compressed CSV df.filter(col("value").gt(lit(50)))? .write_csv( "./output_csv/", DataFrameWriteOptions::new(), Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)), ) .await?; // Query CSV directly using SQL with enable_url_table let dyn_ctx = ctx.enable_url_table(); let results = dyn_ctx .sql("SELECT id, value FROM 'data.csv' WHERE value > 100") .await? .collect() .await?; Ok(()) } ``` ## Writing Data to Multiple Formats Export DataFrame results to tables, Parquet, CSV, or JSON formats with configurable options. ```rust use datafusion::prelude::*; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::arrow::array::StringViewArray; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::MemTable; use std::sync::Arc; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); // Create sample data let array = StringViewArray::from(vec!["alpha", "beta", "gamma"]); let schema = Arc::new(Schema::new(vec![ Field::new("data", DataType::Utf8View, false) ])); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?; let mem_table = MemTable::try_new(schema, vec![vec![batch]])?; ctx.register_table("source_data", Arc::new(mem_table))?; let df = ctx.table("source_data").await?; // Create external table for insertion ctx.sql( "CREATE EXTERNAL TABLE output_table(data VARCHAR) STORED AS PARQUET LOCATION './output_table/'" ).await?.collect().await?; // Write to table (INSERT INTO equivalent) df.clone().write_table("output_table", DataFrameWriteOptions::new()).await?; // Write to Parquet df.clone().write_parquet( "./output_parquet/", DataFrameWriteOptions::new(), None, ).await?; // Write to CSV df.clone().write_csv( "./output_csv/", DataFrameWriteOptions::new(), None, ).await?; // Write to JSON df.write_json( "./output_json/", DataFrameWriteOptions::new(), None, ).await?; Ok(()) } ``` ## Custom User-Defined Functions (UDF) Define and register custom scalar functions for use in both SQL and DataFrame APIs. ```rust use datafusion::prelude::*; use datafusion::arrow::array::{ArrayRef, Float64Array, Float32Array}; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::record_batch::RecordBatch; use datafusion::logical_expr::{ColumnarValue, Volatility}; use datafusion::common::cast::as_float64_array; use std::sync::Arc; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); // Create test data let base: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])); let exp: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])); let batch = RecordBatch::try_from_iter(vec![("base", base), ("exponent", exp)])?; ctx.register_batch("numbers", batch)?; // Define the UDF implementation let pow_impl = Arc::new(|args: &[ColumnarValue]| { assert_eq!(args.len(), 2); let args = ColumnarValue::values_to_arrays(args)?; let base = as_float64_array(&args[0]).expect("cast failed"); let exponent = as_float64_array(&args[1]).expect("cast failed"); let result = base .iter() .zip(exponent.iter()) .map(|(b, e)| match (b, e) { (Some(b), Some(e)) => Some(b.powf(e)), _ => None, }) .collect::(); Ok(ColumnarValue::from(Arc::new(result) as ArrayRef)) }); // Create and register the UDF let pow_udf = create_udf( "pow", vec![DataType::Float64, DataType::Float64], DataType::Float64, Volatility::Immutable, pow_impl, ); ctx.register_udf(pow_udf.clone()); // Use in DataFrame API let df = ctx.table("numbers").await?; let result_df = df.select(vec![ col("base"), col("exponent"), pow_udf.call(vec![col("base"), col("exponent")]).alias("result"), ])?; result_df.show().await?; // Use in SQL let sql_result = ctx.sql("SELECT base, exponent, pow(base, exponent) as result FROM numbers").await?; sql_result.show().await?; Ok(()) } ``` ## Custom Data Sources Implement custom TableProvider to integrate external data sources with DataFusion's query engine. ```rust use datafusion::prelude::*; use datafusion::datasource::{provider_as_source, TableProvider, TableType}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::array::{UInt8Builder, UInt64Builder}; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::logical_expr::LogicalPlanBuilder; use datafusion::error::Result; use datafusion::catalog::Session; use datafusion::execution::context::TaskContext; use async_trait::async_trait; use std::sync::Arc; use std::any::Any; #[derive(Clone, Debug)] struct User { id: u8, bank_account: u64, } #[derive(Clone, Debug)] pub struct CustomDataSource { users: Vec, } impl CustomDataSource { fn new() -> Self { Self { users: vec![ User { id: 1, bank_account: 9000 }, User { id: 2, bank_account: 100 }, User { id: 3, bank_account: 1000 }, ], } } fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("id", DataType::UInt8, false), Field::new("bank_account", DataType::UInt64, false), ])) } fn to_record_batch(&self) -> Result { let mut id_builder = UInt8Builder::new(); let mut account_builder = UInt64Builder::new(); for user in &self.users { id_builder.append_value(user.id); account_builder.append_value(user.bank_account); } RecordBatch::try_new( Self::schema(), vec![ Arc::new(id_builder.finish()), Arc::new(account_builder.finish()), ], ) } } #[async_trait] impl TableProvider for CustomDataSource { fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { Self::schema() } fn table_type(&self) -> TableType { TableType::Base } async fn scan( &self, _state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, ) -> Result> { let batch = self.to_record_batch()?; let schema = Self::schema(); // Implementation would return custom ExecutionPlan unimplemented!("See full example for ExecutionPlan implementation") } } #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); let custom_source = CustomDataSource::new(); // Build logical plan with custom datasource let logical_plan = LogicalPlanBuilder::scan_with_filters( "accounts", provider_as_source(Arc::new(custom_source)), None, vec![], )?.build()?; // Create DataFrame and query let df = DataFrame::new(ctx.state(), logical_plan) .select_columns(&["id", "bank_account"])? .filter(col("bank_account").gt(lit(500u64)))?; let results = df.collect().await?; println!("Found {} users with accounts > 500", results[0].num_rows()); Ok(()) } ``` ## Subqueries with DataFrame API Execute scalar, IN, and EXISTS subqueries programmatically using the DataFrame API. ```rust use datafusion::prelude::*; use datafusion::functions_aggregate::average::avg; use datafusion::functions_aggregate::min_max::max; use datafusion::common::ScalarValue; use datafusion::arrow::datatypes::DataType; use std::sync::Arc; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); // Register test data tables ctx.register_csv("t1", "data/table1.csv", CsvReadOptions::default()).await?; ctx.register_csv("t2", "data/table2.csv", CsvReadOptions::default()).await?; // Scalar subquery: // SELECT c1, c2 FROM t1 WHERE (SELECT avg(t2.c2) FROM t2 WHERE t1.c1 = t2.c1) > 0 let scalar_subquery_df = ctx.table("t1").await? .filter( scalar_subquery(Arc::new( ctx.table("t2").await? .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))? .aggregate(vec![], vec![avg(col("t2.c2"))])? .select(vec![avg(col("t2.c2"))])? .into_unoptimized_plan(), )).gt(lit(0u8)) )? .select(vec![col("t1.c1"), col("t1.c2")])? .limit(0, Some(3))?; scalar_subquery_df.show().await?; // IN subquery: // SELECT c1, c2 FROM t1 WHERE c2 IN (SELECT max(c2) FROM t2 WHERE c1 > 0) let in_subquery_df = ctx.table("t1").await? .filter(in_subquery( col("t1.c2"), Arc::new( ctx.table("t2").await? .filter(col("t2.c1").gt(lit(ScalarValue::UInt8(Some(0)))))? .aggregate(vec![], vec![max(col("t2.c2"))])? .select(vec![max(col("t2.c2"))])? .into_unoptimized_plan(), ), ))? .select(vec![col("t1.c1"), col("t1.c2")])? .limit(0, Some(3))?; in_subquery_df.show().await?; // EXISTS subquery: // SELECT c1, c2 FROM t1 WHERE EXISTS (SELECT c2 FROM t2 WHERE t1.c1 = t2.c1) let exists_subquery_df = ctx.table("t1").await? .filter(exists(Arc::new( ctx.table("t2").await? .filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))? .select(vec![col("t2.c2")])? .into_unoptimized_plan(), )))? .select(vec![col("t1.c1"), col("t1.c2")])? .limit(0, Some(3))?; exists_subquery_df.show().await?; Ok(()) } ``` ## Summary Apache DataFusion serves as a powerful foundation for building analytical systems, offering production-ready query execution with extensive customization capabilities. Primary use cases include embedding a SQL engine into Rust applications, building domain-specific query engines for specialized workloads, creating ETL pipelines with complex transformations, and developing new database platforms that leverage DataFusion's query optimization and execution infrastructure. The engine excels in scenarios requiring high-performance analytics on structured data, particularly when working with columnar formats like Parquet and Arrow. Integration patterns typically involve creating a SessionContext as the primary entry point, registering data sources (in-memory tables, files, or custom providers), and executing queries via SQL or DataFrame APIs. For advanced use cases, developers can extend DataFusion by implementing custom TableProviders for new data sources, registering UDFs/UDAFs/UDWFs for domain-specific computations, creating custom logical optimizer rules, and implementing custom physical execution plans. The modular architecture ensures that extensions integrate seamlessly with built-in optimizations like predicate pushdown, projection pruning, and partition-aware execution.