Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Add Docs
Apache Arrow
https://github.com/apache/arrow
Admin
Apache Arrow is a universal columnar format and multi-language toolbox for fast data interchange and
...
Tokens:
418,544
Snippets:
2,473
Trust Score:
9.1
Update:
19 hours ago
Context
Skills
Chat
Benchmark
89.4
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Apache Arrow Apache Arrow is a universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics. It provides a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. Arrow enables zero-copy data sharing between processes and systems, making it ideal for high-performance data processing pipelines. The Arrow ecosystem includes libraries for C++, Python (PyArrow), R, Ruby, Go, Java, JavaScript, Rust, and more. Core components include the Arrow Columnar Format for efficient in-memory representation, the Arrow IPC Format for serialization and interprocess communication, Arrow Flight RPC for high-performance network data transfer, and native readers/writers for popular file formats like Parquet, CSV, and JSON. This documentation focuses on the Python (PyArrow) and C++ APIs. ## Core Data Types and Arrays PyArrow provides factory functions for creating Arrow data types, which define how values are stored and interpreted in memory. Arrays are the fundamental data structure holding columnar data. ```python import pyarrow as pa # Create data types int_type = pa.int32() string_type = pa.string() timestamp_type = pa.timestamp('ms') list_type = pa.list_(pa.int64()) struct_type = pa.struct([('x', pa.int32()), ('y', pa.string())]) # Create arrays with type inference arr = pa.array([1, 2, None, 3]) # Output: <pyarrow.lib.Int64Array object> [1, 2, null, 3] # Create arrays with explicit type arr_typed = pa.array([1, 2, 3], type=pa.uint16()) # Output: <pyarrow.lib.UInt16Array object> [1, 2, 3] # Array properties print(f"Type: {arr.type}, Length: {len(arr)}, Null count: {arr.null_count}") # Output: Type: int64, Length: 4, Null count: 1 # Slicing (zero-copy) sliced = arr[1:3] # Output: [2, null] # Create nested arrays nested_arr = pa.array([[], None, [1, 2], [None, 1]]) # Output: list<item: int64> # Create struct arrays struct_arr = pa.array([{'x': 1, 'y': 'foo'}, {'x': 2, 'y': 'bar'}]) # Create dictionary (categorical) arrays indices = pa.array([0, 1, 0, 1, 2]) dictionary = pa.array(['foo', 'bar', 'baz']) dict_array = pa.DictionaryArray.from_arrays(indices, dictionary) ``` ## Schema, RecordBatch, and Table Schema defines column names and types. RecordBatch holds multiple arrays with a schema. Table is a logical dataset composed of one or more RecordBatches, enabling efficient concatenation without memory copying. ```python import pyarrow as pa # Create a schema schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), ('score', pa.float64()), ('tags', pa.list_(pa.string())) ]) # Create arrays for each column ids = pa.array([1, 2, 3, 4]) names = pa.array(['Alice', 'Bob', 'Charlie', None]) scores = pa.array([95.5, 87.0, None, 92.3]) tags = pa.array([['a', 'b'], ['c'], None, ['d', 'e']]) # Create a RecordBatch from arrays batch = pa.RecordBatch.from_arrays( [ids, names, scores, tags], schema=schema ) print(f"Columns: {batch.num_columns}, Rows: {batch.num_rows}") # Output: Columns: 4, Rows: 4 # Access columns print(batch['name']) # or batch[1] or batch.column('name') # Output: ["Alice", "Bob", "Charlie", null] # Slice without copying batch_slice = batch.slice(1, 2) # Start at row 1, get 2 rows # Create a Table from RecordBatches batches = [batch, batch] # Multiple batches table = pa.Table.from_batches(batches) print(f"Table rows: {table.num_rows}") # 8 rows # Create a Table directly using pa.table() table = pa.table({ 'x': [1, 2, 3], 'y': ['a', 'b', 'c'], 'z': [1.0, 2.0, 3.0] }) # Table operations table_selected = table.select(['x', 'y']) # Select columns table_dropped = table.drop(['z']) # Drop columns table_appended = table.append_column('w', pa.array([True, False, True])) # Concatenate tables combined = pa.concat_tables([table, table]) # Convert to pandas df = table.to_pandas() # Create from pandas table_from_pandas = pa.Table.from_pandas(df) ``` ## Reading and Writing Parquet Files Parquet is a columnar storage format optimized for analytics. PyArrow provides high-performance Parquet I/O with support for compression, partitioning, and predicate pushdown. ```python import pyarrow as pa import pyarrow.parquet as pq import pandas as pd # Create sample data df = pd.DataFrame({ 'id': [1, 2, 3, 4, 5], 'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'], 'score': [95.5, 87.0, 92.3, 88.7, 91.2], 'category': ['A', 'B', 'A', 'B', 'A'] }) table = pa.Table.from_pandas(df) # Write to Parquet pq.write_table(table, 'data.parquet') # Write with compression options pq.write_table(table, 'data_compressed.parquet', compression='snappy', # or 'gzip', 'brotli', 'zstd', 'lz4' use_dictionary=True) # Read entire Parquet file table = pq.read_table('data.parquet') df = table.to_pandas() # Read specific columns (efficient - only reads requested columns) table = pq.read_table('data.parquet', columns=['id', 'name']) # Read with row group filtering parquet_file = pq.ParquetFile('data.parquet') print(f"Row groups: {parquet_file.num_row_groups}") print(f"Schema: {parquet_file.schema}") print(f"Metadata: {parquet_file.metadata}") # Read specific row group batch = parquet_file.read_row_group(0) # Write partitioned dataset pq.write_to_dataset( table, root_path='partitioned_data', partition_cols=['category'] ) # Read partitioned dataset dataset = pq.ParquetDataset('partitioned_data') table = dataset.read() # Or use read_table directly on a directory table = pq.read_table('partitioned_data') # Read with predicate pushdown (filter at read time) table = pq.read_table('data.parquet', filters=[('score', '>', 90)]) # Write with multiple row groups using ParquetWriter with pq.ParquetWriter('multi_rowgroup.parquet', table.schema) as writer: for i in range(3): writer.write_table(table) ``` ## Reading and Writing CSV Files Arrow provides multi-threaded CSV reading with automatic type inference and customizable parsing options. ```python import pyarrow as pa import pyarrow.csv as csv # Basic CSV reading table = csv.read_csv('data.csv') df = table.to_pandas() # Read with custom options table = csv.read_csv( 'data.csv', read_options=csv.ReadOptions( skip_rows=1, # Skip header row if providing custom names column_names=['id', 'name', 'value'], block_size=1024*1024 # 1MB blocks for parallel reading ), parse_options=csv.ParseOptions( delimiter=',', quote_char='"', escape_char=None, newlines_in_values=False ), convert_options=csv.ConvertOptions( column_types={ 'id': pa.int64(), 'value': pa.float64() }, null_values=['', 'NA', 'NULL', 'NaN'], strings_can_be_null=True, include_columns=['id', 'name'] # Read only specific columns ) ) # Read compressed CSV (auto-detected from extension) table = csv.read_csv('data.csv.gz') # Incremental reading for large files reader = csv.open_csv('large_file.csv') for batch in reader: # Process each batch process_batch(batch) # Basic CSV writing csv.write_csv(table, 'output.csv') # Write with options csv.write_csv( table, 'output.csv', write_options=csv.WriteOptions( include_header=True, delimiter=',' ) ) # Write compressed CSV with pa.CompressedOutputStream('output.csv.gz', 'gzip') as out: csv.write_csv(table, out) # Incremental writing with csv.CSVWriter('output.csv', table.schema) as writer: for batch in record_batches: writer.write_batch(batch) ``` ## Arrow IPC (Feather) Format Arrow IPC format enables zero-copy data sharing between processes and languages. The Feather format (version 2) is Arrow IPC with optional compression. ```python import pyarrow as pa import pyarrow.ipc as ipc import pyarrow.feather as feather # Create sample data table = pa.table({ 'x': [1, 2, 3, 4, 5], 'y': ['a', 'b', 'c', 'd', 'e'], 'z': [1.1, 2.2, 3.3, 4.4, 5.5] }) # Write to Feather format (recommended) feather.write_feather(table, 'data.feather') feather.write_feather(table, 'data_compressed.feather', compression='zstd') # Read Feather table = feather.read_table('data.feather') df = feather.read_feather('data.feather') # Returns pandas DataFrame # Write using IPC file format with pa.OSFile('data.arrow', 'wb') as f: writer = ipc.new_file(f, table.schema) writer.write_table(table) writer.close() # Read IPC file with pa.OSFile('data.arrow', 'rb') as f: reader = ipc.open_file(f) table = reader.read_all() # Or read individual batches batch = reader.get_batch(0) # IPC streaming format (for continuous data) sink = pa.BufferOutputStream() writer = ipc.new_stream(sink, table.schema) for batch in table.to_batches(): writer.write_batch(batch) writer.close() # Read from stream buf = sink.getvalue() reader = ipc.open_stream(buf) table = reader.read_all() # Memory mapping for zero-copy reads mmap = pa.memory_map('data.arrow', 'r') reader = ipc.open_file(mmap) table = reader.read_all() ``` ## Compute Functions PyArrow provides a rich set of compute functions for element-wise operations, aggregations, filtering, and sorting. ```python import pyarrow as pa import pyarrow.compute as pc # Create sample arrays arr1 = pa.array([1, 2, 3, 4, 5]) arr2 = pa.array([10, 20, 30, 40, 50]) str_arr = pa.array(['hello', 'WORLD', 'PyArrow', None]) # Arithmetic operations result = pc.add(arr1, arr2) # [11, 22, 33, 44, 55] result = pc.multiply(arr1, pc.scalar(10)) # [10, 20, 30, 40, 50] # Using Python operators (also supported) result = arr1 + arr2 result = arr1 * 10 # Aggregations total = pc.sum(arr1) # <Int64Scalar: 15> mean = pc.mean(arr1) # <DoubleScalar: 3.0> min_max = pc.min_max(arr1) # StructScalar: [('min', 1), ('max', 5)] # Comparisons mask = pc.greater(arr1, pc.scalar(2)) # [false, false, true, true, true] mask = pc.equal(arr1, arr2) # String operations upper = pc.utf8_upper(str_arr) # ['HELLO', 'WORLD', 'PYARROW', null] lower = pc.utf8_lower(str_arr) length = pc.utf8_length(str_arr) contains = pc.match_substring(str_arr, 'o') # Filtering arr = pa.array([1, 2, 3, 4, 5]) mask = pa.array([True, False, True, False, True]) filtered = pc.filter(arr, mask) # [1, 3, 5] # Sorting indices = pc.sort_indices(arr1, sort_keys=[('', 'descending')]) sorted_arr = pc.take(arr1, indices) # Table operations table = pa.table({'x': [3, 1, 2], 'y': ['c', 'a', 'b']}) sorted_indices = pc.sort_indices(table, sort_keys=[('x', 'ascending')]) sorted_table = pc.take(table, sorted_indices) # Group by aggregation table = pa.table({ 'category': ['A', 'A', 'B', 'B', 'C'], 'value': [1, 2, 3, 4, 5] }) result = table.group_by('category').aggregate([ ('value', 'sum'), ('value', 'mean'), ('value', 'count') ]) # Returns: category: ['A', 'B', 'C'], value_sum: [3, 7, 5], ... # Filtering with expressions filter_expr = (pc.field('value') > 2) & (pc.field('category') == 'B') filtered_table = table.filter(filter_expr) # Table joins left = pa.table({'id': [1, 2, 3], 'value': ['a', 'b', 'c']}) right = pa.table({'id': [2, 3, 4], 'score': [10, 20, 30]}) joined = left.join(right, keys='id', join_type='inner') ``` ## Dataset API for Large-Scale Data The Dataset API enables efficient querying of large, potentially partitioned datasets across multiple files. ```python import pyarrow as pa import pyarrow.dataset as ds import pyarrow.compute as pc # Create a dataset from a directory of Parquet files dataset = ds.dataset('data_directory/', format='parquet') # Create dataset with explicit partitioning dataset = ds.dataset( 'data_directory/', format='parquet', partitioning=ds.partitioning( pa.schema([('year', pa.int32()), ('month', pa.int32())]), flavor='hive' # year=2024/month=01/ ) ) # Scan with projection (column selection) table = dataset.to_table(columns=['id', 'name', 'value']) # Scan with filter (predicate pushdown) table = dataset.to_table( columns=['id', 'name', 'value'], filter=(ds.field('value') > 100) & (ds.field('year') == 2024) ) # Scan to batches for memory efficiency for batch in dataset.to_batches(batch_size=10000): process_batch(batch) # Scanner with more control scanner = dataset.scanner( columns=['id', 'value'], filter=ds.field('value') > 50, batch_size=10000 ) for batch in scanner.to_batches(): process_batch(batch) # Get number of rows without reading all data print(scanner.count_rows()) # Write partitioned dataset ds.write_dataset( table, 'output_directory/', format='parquet', partitioning=ds.partitioning( pa.schema([('category', pa.string())]), flavor='hive' ), existing_data_behavior='overwrite_or_ignore' ) # Create dataset from multiple sources dataset = ds.dataset([ ds.dataset('data_2023/', format='parquet'), ds.dataset('data_2024/', format='parquet') ]) # Dataset joins ds1 = ds.dataset(table1) ds2 = ds.dataset(table2) joined = ds1.join(ds2, keys='id') ``` ## Arrow Flight RPC Arrow Flight is an RPC framework for high-performance data transfer over the network, designed for exchanging Arrow data with minimal serialization overhead. ```python import pyarrow as pa import pyarrow.flight as flight # --- Client Example --- # Connect to a Flight server client = flight.FlightClient("grpc://localhost:8815") # List available flights for flight_info in client.list_flights(): print(f"Flight: {flight_info.descriptor}") print(f" Total bytes: {flight_info.total_bytes}") print(f" Total records: {flight_info.total_records}") # Get flight info for a specific dataset descriptor = flight.FlightDescriptor.for_path("my_dataset") info = client.get_flight_info(descriptor) # Retrieve data (DoGet) ticket = info.endpoints[0].ticket reader = client.do_get(ticket) table = reader.read_all() # Upload data (DoPut) descriptor = flight.FlightDescriptor.for_path("new_dataset") writer, metadata_reader = client.do_put(descriptor, table.schema) writer.write_table(table) writer.close() # Execute action action = flight.Action("healthcheck", b"") for result in client.do_action(action): print(result.body.to_pybytes()) # --- Server Example --- class MyFlightServer(flight.FlightServerBase): def __init__(self, location, **kwargs): super().__init__(location, **kwargs) self.tables = {} def list_flights(self, context, criteria): for name, table in self.tables.items(): descriptor = flight.FlightDescriptor.for_path(name) endpoints = [flight.FlightEndpoint(name.encode(), [self.location])] yield flight.FlightInfo( table.schema, descriptor, endpoints, table.num_rows, table.nbytes ) def get_flight_info(self, context, descriptor): name = descriptor.path[0].decode() table = self.tables[name] endpoints = [flight.FlightEndpoint(name.encode(), [self.location])] return flight.FlightInfo( table.schema, descriptor, endpoints, table.num_rows, table.nbytes ) def do_get(self, context, ticket): name = ticket.ticket.decode() table = self.tables[name] return flight.RecordBatchStream(table) def do_put(self, context, descriptor, reader, writer): name = descriptor.path[0].decode() self.tables[name] = reader.read_all() # Start server server = MyFlightServer("grpc://0.0.0.0:8815") server.serve() ``` ## C++ API: Building Arrays and Tables The C++ API provides the core Arrow implementation with explicit memory management and type safety. ```cpp #include <arrow/api.h> #include <arrow/io/api.h> #include <arrow/ipc/api.h> #include <parquet/arrow/reader.h> #include <parquet/arrow/writer.h> #include <iostream> arrow::Status RunExample() { // Build arrays using ArrayBuilder arrow::Int64Builder int_builder; ARROW_RETURN_NOT_OK(int_builder.AppendValues({1, 2, 3, 4, 5})); std::shared_ptr<arrow::Array> int_array; ARROW_ASSIGN_OR_RAISE(int_array, int_builder.Finish()); arrow::StringBuilder str_builder; ARROW_RETURN_NOT_OK(str_builder.AppendValues({"a", "b", "c", "d", "e"})); std::shared_ptr<arrow::Array> str_array; ARROW_ASSIGN_OR_RAISE(str_array, str_builder.Finish()); // Create schema auto schema = arrow::schema({ arrow::field("id", arrow::int64()), arrow::field("name", arrow::utf8()) }); // Create RecordBatch auto batch = arrow::RecordBatch::Make(schema, 5, {int_array, str_array}); std::cout << "RecordBatch:\n" << batch->ToString() << std::endl; // Create Table from RecordBatches std::vector<std::shared_ptr<arrow::RecordBatch>> batches = {batch, batch}; ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches(batches)); // Write to Parquet std::shared_ptr<arrow::io::FileOutputStream> outfile; ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("output.parquet")); PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable( *table, arrow::default_memory_pool(), outfile, 1024)); // Read from Parquet std::shared_ptr<arrow::io::ReadableFile> infile; ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("output.parquet")); std::unique_ptr<parquet::arrow::FileReader> reader; PARQUET_ASSIGN_OR_THROW(reader, parquet::arrow::OpenFile(infile, arrow::default_memory_pool())); std::shared_ptr<arrow::Table> read_table; PARQUET_ASSIGN_OR_THROW(read_table, reader->ReadTable()); // Write to IPC/Arrow file ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("output.arrow")); ARROW_ASSIGN_OR_RAISE(auto ipc_writer, arrow::ipc::MakeFileWriter(outfile, schema)); ARROW_RETURN_NOT_OK(ipc_writer->WriteTable(*table)); ARROW_RETURN_NOT_OK(ipc_writer->Close()); return arrow::Status::OK(); } int main() { arrow::Status st = RunExample(); if (!st.ok()) { std::cerr << "Error: " << st << std::endl; return 1; } return 0; } ``` ## Pandas Interoperability PyArrow seamlessly integrates with pandas, enabling efficient conversion between Arrow and pandas data structures. ```python import pyarrow as pa import pandas as pd import numpy as np # Create pandas DataFrame df = pd.DataFrame({ 'int_col': [1, 2, 3, None], 'float_col': [1.1, 2.2, np.nan, 4.4], 'str_col': ['a', 'b', None, 'd'], 'bool_col': [True, False, None, True], 'date_col': pd.date_range('2024-01-01', periods=4) }) # Convert pandas to Arrow table = pa.Table.from_pandas(df) # Convert without preserving index table = pa.Table.from_pandas(df, preserve_index=False) # Convert with explicit schema schema = pa.schema([ ('int_col', pa.int32()), ('float_col', pa.float32()), ('str_col', pa.string()), ('bool_col', pa.bool_()), ('date_col', pa.timestamp('ns')) ]) table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) # Convert Arrow back to pandas df_back = table.to_pandas() # Convert with zero-copy where possible df_zero_copy = table.to_pandas(self_destruct=True) # Convert with specific options df_back = table.to_pandas( strings_to_categorical=True, # Convert string columns to categorical categories=['str_col'], date_as_object=False, timestamp_as_object=False ) # Convert individual arrays arr = pa.array([1, 2, 3, 4]) series = arr.to_pandas() # RecordBatch to pandas batch = pa.RecordBatch.from_pandas(df) df_from_batch = batch.to_pandas() # Efficient round-trip with Parquet import pyarrow.parquet as pq # Write pandas DataFrame directly to Parquet pq.write_table(pa.Table.from_pandas(df), 'data.parquet') # Read Parquet directly to pandas df = pq.read_table('data.parquet').to_pandas() # Or use pandas directly df = pd.read_parquet('data.parquet') # Uses PyArrow under the hood ``` ## Memory Management and Buffers Arrow provides explicit control over memory allocation and buffer management for efficient data handling. ```python import pyarrow as pa # Check memory pool usage pool = pa.default_memory_pool() print(f"Backend: {pool.backend_name}") print(f"Bytes allocated: {pool.bytes_allocated()}") print(f"Max memory: {pool.max_memory()}") # Total allocated across all pools print(f"Total allocated: {pa.total_allocated_bytes()}") # Create buffers buf = pa.allocate_buffer(1024) print(f"Buffer size: {buf.size}") # Create buffer from Python bytes data = b"Hello, Arrow!" buf = pa.py_buffer(data) # Create buffer from numpy array (zero-copy) import numpy as np arr = np.array([1, 2, 3, 4], dtype=np.int64) buf = pa.py_buffer(arr) # Memory-map a file for zero-copy reads mmap = pa.memory_map('large_file.arrow', 'r') # Use with IPC reader reader = pa.ipc.open_file(mmap) # Create memory-mapped file for writing mmap = pa.create_memory_map('output.bin', 1024*1024) # 1MB # Compression compressed = pa.compress(b"data to compress", codec='zstd') decompressed = pa.decompress(compressed, codec='zstd') # Available compression codecs print(f"Is zstd available: {pa.Codec.is_available('zstd')}") print(f"Is snappy available: {pa.Codec.is_available('snappy')}") # Using different memory pool backends if 'jemalloc' in pa.supported_memory_backends(): jemalloc_pool = pa.jemalloc_memory_pool() arr = pa.array([1, 2, 3], memory_pool=jemalloc_pool) # Log memory allocations (debugging) pa.log_memory_allocations(enable=True) ``` ## Summary Apache Arrow provides a unified columnar memory format that enables zero-copy data sharing and high-performance analytics across programming languages. The primary use cases include: building efficient data pipelines that need to exchange data between systems without serialization overhead, reading and writing columnar file formats like Parquet for analytics workloads, implementing high-performance data services using Arrow Flight RPC, and integrating with pandas and other data science tools while maintaining performance. Integration patterns typically involve using Arrow as the in-memory data representation layer, with PyArrow serving as the bridge between Python data science ecosystems (pandas, NumPy) and high-performance storage/compute systems. For large-scale data processing, the Dataset API enables efficient predicate pushdown and column pruning across partitioned datasets. For network data transfer, Arrow Flight provides a battle-tested RPC framework that maintains Arrow's zero-copy semantics across process boundaries. The C++ implementation serves as the foundation, with language bindings providing idiomatic APIs while sharing the same memory format.