### scDataset with HuggingFace Datasets Source: https://scdataset.github.io/stable/quickstart Shows how to integrate scDataset with datasets from the HuggingFace `datasets` library. The example demonstrates loading a dataset split and then passing it directly to scDataset for efficient processing. ```python from datasets import load_dataset dataset_hf = load_dataset("your/dataset", split="train") dataset = scDataset(dataset_hf, Streaming(), batch_size=64) ``` -------------------------------- ### Minimal scDataset Example with DataLoader Source: https://scdataset.github.io/stable/quickstart Demonstrates the simplest way to use scDataset as a drop-in replacement for existing datasets. It shows how to initialize scDataset with a data source and a streaming strategy, and then integrate it with PyTorch's DataLoader. Note that batch_size should be set to None in DataLoader when using scDataset. ```python from scdataset import scDataset, Streaming from torch.utils.data import DataLoader import numpy as np # Your existing data (numpy array, AnnData, HuggingFace Dataset, etc.) data = np.random.randn(1000, 100) # 1000 samples, 100 features # Create scDataset with streaming strategy dataset = scDataset(data, Streaming(), batch_size=64, fetch_factor=16) # Use with DataLoader (note: batch_size=None) loader = DataLoader(dataset, batch_size=None, num_workers=4, prefetch_factor=17) for batch in loader: print(f"Batch shape: {batch.shape}") # (64, 100) # Your training code here break ``` -------------------------------- ### Example Configuration for DataLoader - scdataset Source: https://scdataset.github.io/stable/quickstart Configure the DataLoader with specific parameters for efficient data loading, including prefetching and multi-worker processing. This example shows how to set fetch_factor, num_workers, and prefetch_factor for GPU training. ```python dataset = scDataset( data, BlockShuffling(block_size=256), batch_size=64, fetch_factor=256, # Large fetch for efficiency ) loader = DataLoader( dataset, batch_size=None, num_workers=8, # 4-12 workers typically optimal prefetch_factor=257, # fetch_factor + 1 pin_memory=True, # For GPU training ) ``` -------------------------------- ### Install and Import Libraries for scDataset Source: https://scdataset.github.io/stable/notebooks/tahoe_tutorial Installs necessary Python packages including scipy, scikit-learn, tqdm, torch, anndata, and scDataset. Imports essential libraries for data manipulation, machine learning, and deep learning. ```python # Install required packages (uncomment if running in a fresh environment) # %pip install scipy scikit-learn tqdm torch anndata scDataset # Import libraries import numpy as np import torch from torch import nn from torch.utils.data import DataLoader from tqdm import tqdm from scipy import sparse from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder import anndata as ad from anndata.experimental import AnnCollection from scdataset import scDataset, Streaming, BlockShuffling ``` -------------------------------- ### Install and Import Libraries for scDataset and PyTorch Source: https://scdataset.github.io/stable/_sources/notebooks/tahoe_tutorial Installs necessary Python packages including scipy, scikit-learn, tqdm, torch, anndata, and scDataset. It then imports essential libraries for data manipulation, machine learning, and deep learning. ```python # Install required packages (uncomment if running in a fresh environment) # %pip install scipy scikit-learn tqdm torch anndata scDataset # Import libraries import numpy as np import torch from torch import nn from torch.utils.data import DataLoader from tqdm import tqdm from scipy import sparse from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder import anndata as ad from anndata.experimental import AnnCollection from scdataset import scDataset, Streaming, BlockShuffling ``` -------------------------------- ### Install scDataset from GitHub Source: https://scdataset.github.io/stable/installation Installs the latest development version of scDataset directly from its GitHub repository using pip. This is useful for accessing the newest features or contributing to the project. ```bash pip install git+https://github.com/scDataset/scDataset.git ``` -------------------------------- ### scDataset with AnnData Objects Source: https://scdataset.github.io/stable/quickstart Demonstrates integrating scDataset with AnnData objects. It covers direct usage of the expression matrix (`adata.X`) and provides an example of a custom `fetch_callback` for more complex scenarios, such as accessing specific parts of the AnnData object or performing on-the-fly transformations. ```python import anndata as ad import scanpy as sc # Load your single-cell data adata = sc.datasets.pbmc3k() # Use the expression matrix dataset = scDataset(adata.X, Streaming(), batch_size=64) # Or create a custom fetch callback for more complex data def fetch_adata(collection, indices): return collection[indices].X.toarray() dataset = scDataset(adata, Streaming(), batch_size=64, fetch_callback=fetch_adata) ``` -------------------------------- ### Setup scDataset for Train/Test Splits with Sampling Strategies Source: https://scdataset.github.io/stable/notebooks/tahoe_tutorial Configures scDataset for training and testing by wrapping an AnnCollection. It utilizes BlockShuffling for randomized training data access and Streaming for deterministic testing data access. Parameters like batch_size and fetch_factor can be tuned for hardware performance. ```python # Set up scDataset for train and test splits batch_size = 64 fetch_factor = 16 num_workers = 12 # Training split with block shuffling for randomization train_strategy = BlockShuffling(block_size=8, indices=train_idx) scdata_train = scDataset( data_collection=collection, strategy=train_strategy, batch_size=batch_size, fetch_factor=fetch_factor, fetch_transform=fetch_transform, batch_transform=batch_transform, ) train_loader = DataLoader( scdata_train, batch_size=None, num_workers=num_workers, prefetch_factor=fetch_factor+1, persistent_workers=True, pin_memory=True ) # Test split with streaming for deterministic evaluation test_strategy = Streaming(indices=test_idx) scdata_test = scDataset( data_collection=collection, strategy=test_strategy, batch_size=batch_size, fetch_factor=fetch_factor, fetch_transform=fetch_transform, batch_transform=batch_transform, ) test_loader = DataLoader( scdata_test, batch_size=None, num_workers=num_workers, prefetch_factor=fetch_factor+1, persistent_workers=True, pin_memory=True ) ``` -------------------------------- ### Initialize scDataset with NumPy data and Streaming strategy Source: https://scdataset.github.io/stable/generated/scdataset.scDataset This example demonstrates how to initialize the scDataset with a NumPy array as the data collection and the Streaming strategy for sampling. It shows how to create the dataset and get the number of batches. ```python >>> from scdataset import scDataset >>> from scdataset.strategy import Streaming >>> import numpy as np >>> # Simple streaming dataset >>> data = np.random.randn(1000, 50) # 1000 samples, 50 features >>> strategy = Streaming() >>> dataset = scDataset(data, strategy, batch_size=32) >>> len(dataset) # Number of batches 32 ``` -------------------------------- ### Basic AnnCollection Setup for Lazy Concatenation Source: https://scdataset.github.io/stable/examples Illustrates the basic setup of AnnCollection from anndata.experimental to lazily concatenate multiple AnnData objects. This approach is memory-efficient for large datasets stored across multiple files. ```python import anndata as ad from anndata.experimental import AnnCollection import numpy as np import scipy.sparse as sp from functools import partial from scdataset import scDataset, BlockShuffling, MultiIndexable from torch.utils.data import DataLoader # Load multiple AnnData files (backed mode for memory efficiency) file_paths = [ "plate_1.h5ad", "plate_2.h5ad", "plate_3.h5ad" ] adatas = [ad.read_h5ad(f, backed='r') for f in file_paths] # Create AnnCollection for lazy concatenation collection = AnnCollection(adatas) print(f"Total cells: {len(collection)}") print(f"Number of genes: {collection.shape[1]}") ``` -------------------------------- ### Get System Info and Suggest Parameters (Python) Source: https://scdataset.github.io/stable/_modules/scdataset/experimental/auto_config This Python code snippet attempts to import the 'psutil' library to gather system information such as available RAM, total RAM, and CPU count. If 'psutil' is not installed, it issues a warning and uses conservative default values. It then calculates suggested parameters like 'num_workers', 'fetch_factor', and various block sizes based on the collected system information and provided data characteristics. The function also estimates memory usage for different configurations. ```python import os import warnings def suggest_parameters( data_collection, min_workers=1, max_workers=None, batch_size=1, target_ram_fraction=0.75, fetch_transform=None, batch_transform=None, fetch_callback=None, batch_callback=None, verbose=True, ): """ Suggests optimal parameters for scDataset based on system resources and data. Parameters ---------- data_collection : object The data collection object used to estimate sample size. min_workers : int, optional Minimum number of workers to suggest, by default 1. max_workers : int, optional Maximum number of workers to suggest. If None, it defaults to CPU count. batch_size : int, optional The batch size used in data loading, by default 1. target_ram_fraction : float, optional The fraction of available RAM to target for memory usage, by default 0.75. fetch_transform : callable, optional A transformation function applied during fetching. batch_transform : callable, optional A transformation function applied to batches. fetch_callback : callable, optional A callback function executed after fetching. batch_callback : callable, optional A callback function executed after batching. verbose : bool, optional If True, prints detailed suggestions, by default True. Returns ------- dict A dictionary containing suggested parameters: num_workers, fetch_factor, prefetch_factor, block_size_conservative, block_size_balanced, block_size_aggressive, estimated_memory_per_fetch_mb, estimated_total_memory_mb, and system_info. Raises ------ ImportError If psutil is not installed (optional dependency). Warns ----- UserWarning If psutil is not available, uses conservative defaults. """ result = {} system_info = {} # Try to get system information try: import psutil available_ram = psutil.virtual_memory().available total_ram = psutil.virtual_memory().total cpu_count = os.cpu_count() or 4 system_info["available_ram_gb"] = available_ram / (1024**3) system_info["total_ram_gb"] = total_ram / (1024**3) system_info["cpu_count"] = cpu_count has_psutil = True except ImportError: warnings.warn( "psutil not installed. Using conservative defaults. " "Install psutil for better parameter suggestions: pip install psutil", stacklevel=2, ) # Conservative defaults available_ram = 8 * 1024**3 # Assume 8GB available total_ram = 16 * 1024**3 # Assume 16GB total cpu_count = 4 system_info["available_ram_gb"] = "unknown (psutil not installed)" system_info["total_ram_gb"] = "unknown (psutil not installed)" system_info["cpu_count"] = cpu_count has_psutil = False # Calculate num_workers if max_workers is None: max_workers = cpu_count num_workers = min(max(cpu_count // 2, min_workers), max_workers) result["num_workers"] = num_workers # Estimate sample size (applying transforms/callbacks for accurate estimation) # Placeholder for estimate_sample_size function sample_size = estimate_sample_size( data_collection, fetch_transform=fetch_transform, batch_transform=batch_transform, fetch_callback=fetch_callback, batch_callback=batch_callback, ) system_info["estimated_sample_size_bytes"] = sample_size # Calculate maximum fetch_factor based on RAM constraint # Formula: 2 * batch_size * fetch_factor * num_workers * sample_size < target_ram_fraction * available_ram # The factor of 2 accounts for prefetch_factor = fetch_factor + 1 (prefetch buffer doubles memory) target_ram = target_ram_fraction * available_ram if sample_size > 0 and batch_size > 0 and num_workers > 0: # Account for prefetch doubling memory (factor of 2) max_fetch_factor = int( target_ram / (2 * batch_size * num_workers * sample_size) ) # Clamp to reasonable range fetch_factor = max(1, min(max_fetch_factor, 256)) else: fetch_factor = 8 # Default fallback result["fetch_factor"] = fetch_factor # Calculate block sizes result["block_size_conservative"] = max(1, fetch_factor // 2) result["block_size_balanced"] = max(1, fetch_factor) result["block_size_aggressive"] = max(1, fetch_factor * 2) # Prefetch factor should be fetch_factor + 1 for optimal performance result["prefetch_factor"] = fetch_factor + 1 # Calculate estimated memory usage (includes prefetch buffer - hence * 2) memory_per_fetch = batch_size * fetch_factor * sample_size memory_total = memory_per_fetch * num_workers * 2 # * 2 for prefetch buffer result["estimated_memory_per_fetch_mb"] = memory_per_fetch / (1024**2) result["estimated_total_memory_mb"] = memory_total / (1024**2) result["system_info"] = system_info if verbose: print("=" * 60) print("scDataset Parameter Suggestions") print("=" * 60) print() print("System Information:") if has_psutil: print(f" Available RAM: {system_info['available_ram_gb']:.1f} GB") print(f" Total RAM: {system_info['total_ram_gb']:.1f} GB") else: print(" RAM info: Not available (install psutil)") print(f" CPU cores: {system_info['cpu_count']}") print( f" Estimated sample size: {sample_size:,} bytes ({sample_size/1024:.1f} KB)" ) print() print("Suggested Parameters:") print(f" num_workers: {num_workers}") print(f" fetch_factor: {fetch_factor}") print(f" prefetch_factor: {result['prefetch_factor']}") print() print("Block Size Options (choose based on your needs):") print(f" block_size_conservative: {result['block_size_conservative']}") print(" └─ More randomness, good for training") print(f" block_size_balanced: {result['block_size_balanced']}") print(" └─ Balanced randomness and throughput (recommended)") print(f" block_size_aggressive: {result['block_size_aggressive']}") print(" └─ Maximum throughput, less randomness") print() print("Memory Estimates (includes prefetch buffer):") print(f" Memory per fetch: {result['estimated_memory_per_fetch_mb']:.2f} MB") print(f" Total estimated memory: {result['estimated_total_memory_mb']:.2f} MB") return result # Placeholder for the estimate_sample_size function, as it's not provided. # In a real scenario, this function would be defined elsewhere or imported. def estimate_sample_size(data_collection, **kwargs): # This is a dummy implementation. Replace with actual logic. # It should return the estimated size of a sample in bytes. print("Estimating sample size...") # Example: return a fixed value or calculate based on data_collection return 1024 * 10 # Assuming 10 KB sample size for demonstration # Example Usage (assuming you have a data_collection object): # class MockDataCollection: # pass # data = MockDataCollection() # suggestions = suggest_parameters(data) # print(suggestions) ``` -------------------------------- ### Basic DDP Setup with scDataset and PyTorch Source: https://scdataset.github.io/stable/ddp This snippet demonstrates the basic setup for Distributed Data Parallel (DDP) training using scDataset and PyTorch. It includes initializing the distributed environment, creating the scDataset instance with automatic rank detection, setting up the DataLoader, and wrapping the model with DDP. Ensure your data loading function `load_adata()` and model `YourModel()` are defined elsewhere. ```python import os import torch import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader from scdataset import scDataset, BlockShuffling def setup_distributed(): """Initialize distributed training.""" dist.init_process_group(backend="nccl") rank = dist.get_rank() world_size = dist.get_world_size() return rank, world_size def main(): rank, world_size = setup_distributed() local_rank = int(os.environ["LOCAL_RANK"]) torch.cuda.set_device(local_rank) # Load your data source adata = load_adata() # Your data loading function # Create scDataset - it automatically detects rank and world_size dataset = scDataset( adata, BlockShuffling(block_size=64), batch_size=128, fetch_factor=64, fetch_callback=my_fetch_fn ) # Create DataLoader (no DistributedSampler needed!) loader = DataLoader( dataset, batch_size=None, # Batching handled by scDataset num_workers=4, prefetch_factor=65 # fetch_factor + 1 ) # Standard DDP model setup model = YourModel().to(local_rank) model = DDP(model, device_ids=[local_rank]) for epoch in range(num_epochs): for batch in loader: batch = batch.to(local_rank) # Training code here pass dist.destroy_process_group() if __name__ == "__main__": main() ``` -------------------------------- ### Basic HuggingFace Dataset Loading with scDataset Source: https://scdataset.github.io/stable/examples Demonstrates loading a HuggingFace dataset and integrating it with scDataset using a custom batch callback. ```python from datasets import load_dataset from torch.utils.data import DataLoader # Load a HuggingFace dataset hf_dataset = load_dataset("imdb", split="train[:1000]") # Custom batch callback for HuggingFace datasets def extract_hf_batch(fetched_data, batch_indices): """Extract a batch from HuggingFace dataset fetched data.""" batch = {} for key, values in fetched_data.items(): batch[key] = [values[i] for i in batch_indices] return batch # Create dataset with custom batch callback dataset = scDataset( hf_dataset, Streaming(), batch_size=64, fetch_factor=16, batch_callback=extract_hf_batch ) for batch in DataLoader(dataset, batch_size=None, num_workers=4, prefetch_factor=17): # batch will be a dictionary with dataset features print("Batch keys:", batch.keys()) print("Batch size:", len(batch['text'])) break ``` -------------------------------- ### Complete Distributed Training Example with scDataset and DDP Source: https://scdataset.github.io/stable/ddp A full Python script demonstrating distributed training using scDataset, PyTorch DDP, and standard PyTorch components. It covers initialization, data loading, model setup, and the training loop. ```python import os import torch import torch.nn as nn import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader import anndata as ad from scdataset import scDataset, BlockShuffling from scdataset.transforms import adata_to_mindex def train(): # Initialize distributed dist.init_process_group(backend="nccl") local_rank = int(os.environ["LOCAL_RANK"]) torch.cuda.set_device(local_rank) # Load data (each rank loads independently) adata = ad.read_h5ad("large_dataset.h5ad", backed='r') # Create dataset - DDP handled automatically dataset = scDataset( adata, BlockShuffling(block_size=32), fetch_factor=32, batch_size=512, fetch_callback=lambda d, idx: adata_to_mindex(d[idx]) ) loader = DataLoader(dataset, batch_size=None, num_workers=4, prefetch_factor=33) # Model setup model = nn.Sequential( nn.Linear(adata.n_vars, 256), nn.ReLU(), nn.Linear(256, 128) ).to(local_rank) model = DDP(model, device_ids=[local_rank]) optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) criterion = nn.MSELoss() # Training loop - shuffling changes automatically each epoch! for epoch in range(10): for batch_idx, batch in enumerate(loader): batch = batch.to(local_rank) output = model(batch) loss = criterion(output, batch[:, :128]) # Reconstruction optimizer.zero_grad() loss.backward() optimizer.step() if batch_idx % 100 == 0 and local_rank == 0: print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}") dist.destroy_process_group() if __name__ == "__main__": train() ``` -------------------------------- ### Install scDataset using pip Source: https://scdataset.github.io/stable/index Installs the scDataset library from the Python Package Index (PyPI). This is the recommended method for most users. ```bash pip install scDataset ``` -------------------------------- ### On-the-Fly Normalization with scDataset Source: https://scdataset.github.io/stable/examples Demonstrates applying log1p normalization and standardization to batches on-the-fly using scDataset's batch_transform. ```python import numpy as np def log_normalize(batch): # Apply log1p normalization per batch return np.log1p(batch) def standardize_genes(batch): # Standardize genes (features) across batch return (batch - batch.mean(axis=0)) / (batch.std(axis=0) + 1e-8) dataset = scDataset( data, BlockShuffling(block_size=16), batch_size=64, batch_transform=lambda x: standardize_genes(log_normalize(x)) ) ``` -------------------------------- ### scDataset with AnnCollection (Multiple Files) Source: https://scdataset.github.io/stable/quickstart Illustrates how to use scDataset with `AnnCollection` for datasets spanning multiple AnnData files. It emphasizes using backed mode for memory efficiency and employs `adata_to_mindex` transform to materialize backed data, suitable for large-scale single-cell analysis. ```python import anndata as ad from anndata.experimental import AnnCollection from scdataset import scDataset, BlockShuffling from scdataset.transforms import adata_to_mindex from torch.utils.data import DataLoader # Load multiple AnnData files in backed mode (memory-efficient) adatas = [ ad.read_h5ad("plate1.h5ad", backed='r'), ad.read_h5ad("plate2.h5ad", backed='r'), ] collection = AnnCollection(adatas) # Create dataset with adata_to_mindex to materialize backed data dataset = scDataset( collection, BlockShuffling(block_size=32), batch_size=64, fetch_factor=32, fetch_transform=adata_to_mindex # Calls to_adata() internally ) loader = DataLoader(dataset, batch_size=None, num_workers=8, prefetch_factor=33) ``` -------------------------------- ### Training Loop Example Source: https://scdataset.github.io/stable/examples A basic training loop structure demonstrating epoch and batch iteration for training and validation steps. ```python for epoch in range(num_epochs): # Training for batch in train_loader: train_step(batch) # Validation for batch in val_loader: val_step(batch) ``` -------------------------------- ### scDataset with NumPy Arrays Source: https://scdataset.github.io/stable/quickstart Shows how to use scDataset with NumPy arrays as the data source. This is a straightforward integration where the NumPy array is directly passed to the scDataset constructor along with a sampling strategy. ```python import numpy as np data = np.random.randn(5000, 2000) dataset = scDataset(data, Streaming(), batch_size=64) ``` -------------------------------- ### Distributed Data Parallel (DDP) Usage Example Source: https://scdataset.github.io/stable/_modules/scdataset/scdataset Demonstrates how to use scDataset in a Distributed Data Parallel (DDP) training setup. It shows how to initialize the process group and how the dataset automatically detects or can be manually configured with DDP parameters. ```python # In DDP training script: # import torch.distributed as dist # dist.init_process_group(...) # dataset = scDataset(data, strategy, batch_size=32) # Auto-detects DDP # Or manually specify: # dataset = scDataset(data, strategy, batch_size=32, rank=0, world_size=4) ``` -------------------------------- ### Data Augmentation with scDataset Source: https://scdataset.github.io/stable/examples Shows how to implement data augmentation techniques like adding noise and dropout to batches using scDataset's batch_transform. ```python import numpy as np def add_noise(batch, noise_level=0.1): # Add Gaussian noise for data augmentation noise = np.random.normal(0, noise_level, batch.shape) return batch + noise def dropout_genes(batch, dropout_rate=0.1): # Randomly set some genes to zero mask = np.random.random(batch.shape) > dropout_rate return batch * mask def augment_batch(batch): batch = add_noise(batch) batch = dropout_genes(batch) return batch.astype(np.float32) dataset = scDataset( data, BlockShuffling(block_size=16), batch_size=64, batch_transform=augment_batch ) ``` -------------------------------- ### Initialize scDataset with custom batch transformation Source: https://scdataset.github.io/stable/generated/scdataset.scDataset This example shows how to initialize the scDataset with a custom batch transformation function. The `normalize_batch` function is applied to each batch before it is yielded. ```python >>> # With custom transforms >>> def normalize_batch(batch): ... return (batch - batch.mean()) / batch.std() >>> dataset = scDataset( ... data, strategy, batch_size=32, ... batch_transform=normalize_batch ... ) ``` -------------------------------- ### Iterate through batches from scDataset Source: https://scdataset.github.io/stable/generated/scdataset.scDataset This example demonstrates how to iterate through the batches provided by an scDataset instance. It prints the shape of the first batch encountered. ```python >>> # Iterate through batches >>> for batch in dataset: ... print(batch.shape) ... break (32, 50) ``` -------------------------------- ### Custom Processing for HuggingFace Data with scDataset Source: https://scdataset.github.io/stable/examples Illustrates custom processing of HuggingFace dataset batches into numpy arrays for specific features and labels using scDataset. ```python import numpy as np def extract_hf_batch(fetched_data, batch_indices): """Extract a batch from HuggingFace dataset fetched data.""" batch = {} for key, values in fetched_data.items(): batch[key] = [values[i] for i in batch_indices] return batch def process_hf_batch(batch_dict): """Process HuggingFace batch into numpy arrays.""" # Extract and process specific features features = np.array(batch_dict['expression']) labels = np.array(batch_dict['cell_type_id']) return { 'features': features.astype(np.float32), 'labels': labels.astype(np.int64) } dataset = scDataset( hf_dataset, BlockShuffling(block_size=16), batch_size=64, batch_callback=extract_hf_batch, batch_transform=process_hf_batch ) ``` -------------------------------- ### Applying Data Transforms - scdataset Source: https://scdataset.github.io/stable/quickstart Apply custom data transformations at different stages of the data loading pipeline. This example shows how to define and apply a preprocessing function to fetched data and a normalization function per batch. ```python def preprocess_fetch(data): # Apply to fetched data before batching return data.astype(np.float32) def normalize_batch(batch): # Apply per-batch normalization return (batch - batch.mean()) / batch.std() dataset = scDataset( data, Streaming(), batch_size=64, fetch_transform=preprocess_fetch, batch_transform=normalize_batch ) ``` -------------------------------- ### Setting up scDataset DataLoaders (Python) Source: https://scdataset.github.io/stable/_sources/notebooks/tahoe_tutorial Configures scDataset for training and testing using different sampling strategies. It creates PyTorch DataLoaders with specified batch sizes, workers, and prefetching. Dependencies include scDataset, PyTorch DataLoader, BlockShuffling, and Streaming. ```python # Set up scDataset for train and test splits batch_size = 64 fetch_factor = 16 num_workers = 12 # Training split with block shuffling for randomization train_strategy = BlockShuffling(block_size=8, indices=train_idx) scdata_train = scDataset( data_collection=collection, strategy=train_strategy, batch_size=batch_size, fetch_factor=fetch_factor, fetch_transform=fetch_transform, batch_transform=batch_transform, ) train_loader = DataLoader( scdata_train, batch_size=None, num_workers=num_workers, prefetch_factor=fetch_factor+1, persistent_workers=True, pin_memory=True ) # Test split with streaming for deterministic evaluation test_strategy = Streaming(indices=test_idx) scdata_test = scDataset( data_collection=collection, strategy=test_strategy, batch_size=batch_size, fetch_factor=fetch_factor, fetch_transform=fetch_transform, batch_transform=batch_transform, ) test_loader = DataLoader( scdata_test, batch_size=None, num_workers=num_workers, prefetch_factor=fetch_factor+1, persistent_workers=True, pin_memory=True ) ``` -------------------------------- ### Log Training Progress with Batch Loss Source: https://scdataset.github.io/stable/notebooks/tahoe_tutorial These snippets illustrate logging the training progress of a model, specifically showing the current training status (percentage, iterations, time) and the loss for individual batches. This is useful for monitoring training convergence. ```text Training: 0%| | 7/68512 [00:04<9:25:04, 2.02it/s] Train batch 0: loss = 3.9899 Training: 2%|▏ | 1050/68512 [00:22<19:27, 57.77it/s] Train batch 1000: loss = 0.3440 Training: 3%|▎ | 2042/68512 [00:40<13:59, 79.20it/s] Train batch 2000: loss = 0.0654 Training: 4%|▍ | 3070/68512 [00:57<07:00, 155.47it/s] Train batch 3000: loss = 0.0337 Training: 6%|▌ | 4019/68512 [01:15<08:11, 131.25it/s] Train batch 4000: loss = 0.1021 Training: 7%|▋ | 5074/68512 [01:36<17:49, 59.33it/s] Train batch 5000: loss = 0.4971 Training: 9%|▉ | 6078/68512 [01:53<13:07, 79.33it/s] Train batch 6000: loss = 0.1427 Training: 10%|█ | 7046/68512 [02:11<12:39, 80.96it/s] Train batch 7000: loss = 0.0129 Training: 12%|█▏ | 8049/68512 [02:28<09:09, 110.06it/s] Train batch 8000: loss = 0.1937 Training: 13%|█▎ | 9013/68512 [02:46<07:25, 133.53it/s] Train batch 9000: loss = 0.3482 Training: 15%|█▍ | 10053/68512 [03:06<17:54, 54.43it/s] Train batch 10000: loss = 0.0561 Training: 16%|█▌ | 11042/68512 [03:24<13:05, 73.19it/s] Train batch 11000: loss = 0.0025 Training: 18%|█▊ | 12058/68512 [03:42<09:05, 103.56it/s] Train batch 12000: loss = 0.0513 Training: 19%|█▉ | 13040/68512 [03:59<07:20, 125.81it/s] Train batch 13000: loss = 0.5847 Training: 20%|██ | 13998/68512 [04:17<08:27, 107.43it/s] Train batch 14000: loss = 0.1122 Training: 22%|██▏ | 15064/68512 [04:37<14:59, 59.45it/s] Train batch 15000: loss = 0.4273 Training: 23%|██▎ | 16057/68512 [04:55<11:03, 79.10it/s] Train batch 16000: loss = 0.2817 Training: 25%|██▍ | 17074/68512 [05:13<06:52, 124.59it/s] Train batch 17000: loss = 0.1180 Training: 26%|██▋ | 18038/68512 [05:30<06:43, 124.95it/s] Train batch 18000: loss = 1.0325 Training: 28%|██▊ | 18996/68512 [05:48<06:43, 122.75it/s] Train batch 19000: loss = 0.0683 Training: 29%|██▉ | 20063/68512 [06:08<13:23, 60.33it/s] Train batch 20000: loss = 0.0278 Training: 31%|███ | 21055/68512 [06:26<09:51, 80.28it/s] Train batch 21000: loss = 0.2712 Training: 32%|███▏ | 22042/68512 [06:44<07:30, 103.09it/s] Train batch 22000: loss = 0.0053 Training: 34%|███▎ | 23025/68512 [07:01<06:25, 118.01it/s] Train batch 23000: loss = 0.1565 Training: 35%|███▍ | 23977/68512 [07:19<06:59, 106.27it/s] Train batch 24000: loss = 0.0108 Training: 37%|███▋ | 25058/68512 [07:40<12:04, 59.95it/s] Train batch 25000: loss = 0.2106 Training: 38%|███▊ | 26054/68512 [07:57<08:41, 81.48it/s] Train batch 26000: loss = 0.0147 Training: 39%|███▉ | 27042/68512 [08:15<06:30, 106.25it/s] Train batch 27000: loss = 0.3865 Training: 41%|████ | 28020/68512 [08:32<05:19, 126.71it/s] Train batch 28000: loss = 0.9068 Training: 42%|████▏ | 29047/68512 [08:53<15:08, 43.45it/s] Train batch 29000: loss = 0.1300 Training: 44%|████▍ | 30047/68512 [09:11<10:44, 59.64it/s] Train batch 30000: loss = 0.4086 Training: 45%|████▌ | 31051/68512 [09:28<07:38, 81.75it/s] Train batch 31000: loss = 0.0032 Training: 47%|████▋ | 32061/68512 [09:46<05:28, 110.96it/s] Train batch 32000: loss = 0.2427 Training: 48%|████▊ | 33019/68512 [10:03<05:29, 107.56it/s] Train batch 33000: loss = 0.5098 Training: 50%|████▉ | 34043/68512 [10:24<13:14, 43.40it/s] Train batch 34000: loss = 1.5931 Training: 51%|█████ | 35070/68512 [10:42<06:53, 80.96it/s] Train batch 35000: loss = 0.2432 Training: 53%|█████▎ | 36051/68512 [11:00<06:31, 82.90it/s] Train batch 36000: loss = 0.6859 Training: 54%|█████▍ | 37051/68512 [11:17<04:08, 126.49it/s] Train batch 37000: loss = 0.5389 Training: 55%|█████▌ | 38017/68512 [11:35<04:00, 126.80it/s] Train batch 38000: loss = 0.5876 ``` -------------------------------- ### Estimate Sample Size with NumPy Array Source: https://scdataset.github.io/stable/generated/scdataset.experimental.estimate_sample_size This example demonstrates how to estimate the memory size of a single sample from a NumPy array. It initializes a NumPy array and then calls estimate_sample_size to get the estimated size in bytes. ```python import numpy as np data = np.random.randn(1000, 2000) # 1000 samples, 2000 features size = estimate_sample_size(data) print(f"Estimated sample size: {size} bytes") ``` -------------------------------- ### scDataset Initialization with Different Strategies Source: https://scdataset.github.io/stable/ddp Demonstrates initializing scDataset with various data partitioning strategies like Streaming, BlockShuffling, BlockWeightedSampling, and ClassBalancedSampling. These strategies automatically partition data across GPUs when used with DDP. ```python import scanpy as sc from scdataset import scDataset, Streaming, BlockShuffling, BlockWeightedSampling, ClassBalancedSampling # Assume adata is a loaded AnnData object and w, l are weights and labels respectively # adata = sc.read_h5ad('your_data.h5ad') # w = [...] # your weights # l = [...] # your labels # Simple streaming dataset1 = scDataset(adata, Streaming(), ...) # Block shuffling dataset2 = scDataset(adata, BlockShuffling(block_size=64), ...) # Weighted sampling dataset3 = scDataset(adata, BlockWeightedSampling(weights=w), ...) # Class-balanced dataset4 = scDataset(adata, ClassBalancedSampling(labels=l), ...) # All automatically partition data across GPUs when run with torchrun ``` -------------------------------- ### Automatic Parameter Suggestion - scdataset Source: https://scdataset.github.io/stable/quickstart Use the suggest_parameters function to automatically tune scdataset parameters for optimal performance. This function takes the data collection and batch size as input and returns a dictionary of suggested parameters. ```python from scdataset.experimental import suggest_parameters params = suggest_parameters(data_collection, batch_size=64) print(params) # {'fetch_factor': 64, 'block_size': 32, 'num_workers': 8} ``` -------------------------------- ### Initialize and Use BlockShuffling Strategy Source: https://scdataset.github.io/stable/generated/scdataset.strategy.BlockShuffling Demonstrates initializing the BlockShuffling strategy with a block size and `drop_last` parameter, then generating shuffled indices from a given range. The example shows how `drop_last=True` affects the number of returned indices by discarding the last incomplete block. ```python >>> strategy = BlockShuffling(block_size=2, drop_last=True) >>> indices = strategy.get_indices(range(5), seed=42) >>> len(indices) # Drops the last incomplete block 4 ``` -------------------------------- ### scDataset Block Shuffling Sampling Strategy Source: https://scdataset.github.io/stable/quickstart Shows how to implement the BlockShuffling strategy in scDataset. This strategy shuffles data in blocks, which can improve I/O performance while still introducing a degree of randomness into the data loading process. ```python from scdataset import BlockShuffling # Shuffle in blocks for better I/O while maintaining some randomness strategy = BlockShuffling(block_size=16) dataset = scDataset(data, strategy, batch_size=64) ``` -------------------------------- ### Use bionemo_to_tensor as a fetch_callback in scDataset Source: https://scdataset.github.io/stable/generated/scdataset.transforms.bionemo_to_tensor Demonstrates how to integrate the bionemo_to_tensor function as a fetch_callback when initializing an scDataset. This setup allows scDataset to process data from a BioNeMo SingleCellMemMapDataset, converting sparse matrices to dense tensors for analysis. Ensure the bionemo-scdl package is installed. ```python from scdataset import scDataset, BlockShuffling from scdataset.transforms import bionemo_to_tensor from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset bionemo_data = SingleCellMemMapDataset(data_path='/path/to/data') dataset = scDataset( bionemo_data, BlockShuffling(), batch_size=64, fetch_callback=bionemo_to_tensor ) ``` -------------------------------- ### scDataset Class Balanced Sampling Strategy Source: https://scdataset.github.io/stable/quickstart Explains the ClassBalancedSampling strategy for scDataset, which automatically balances the sampling of different classes. This is particularly useful for classification tasks with imbalanced datasets, ensuring that each class is represented proportionally over time. ```python from scdataset import ClassBalancedSampling # Automatically balance classes labels = np.random.choice(['A', 'B', 'C'], size=len(data)) strategy = ClassBalancedSampling(labels, total_size=10000) dataset = scDataset(data, strategy, batch_size=64) ``` -------------------------------- ### Launching Distributed Training with torchrun Source: https://scdataset.github.io/stable/ddp These commands illustrate how to launch distributed training using the `torchrun` utility. The first command is for single-node, multi-GPU training, specifying the number of processes per node. The second command is a placeholder for launching multi-node training, which typically requires additional configuration for node communication. ```bash # Single Node, Multiple GPUs: torchrun --nproc_per_node=4 train.py # Multiple Nodes: # (Requires further configuration for node communication) ``` -------------------------------- ### scDataset Streaming Sampling Strategy Source: https://scdataset.github.io/stable/quickstart Illustrates the use of the Streaming sampling strategy in scDataset. It covers both sequential access without shuffling and sequential access with buffer-level shuffling, which is similar to behavior seen in libraries like Ray Data and WebDataset. ```python from scdataset import Streaming # Sequential access without shuffling strategy = Streaming() dataset = scDataset(data, strategy, batch_size=64) # Sequential access with buffer-level shuffling (similar to Ray Data/WebDataset) strategy = Streaming(shuffle=True) dataset = scDataset(data, strategy, batch_size=64) # This shuffles batches within each fetch buffer while maintaining # sequential order between buffers ``` -------------------------------- ### PyTorch Lightning DataModule for scDataset Source: https://scdataset.github.io/stable/examples Provides a custom PyTorch Lightning DataModule for efficiently loading and managing scDataset data. It includes setup for training and validation datasets using different data strategies (BlockShuffling, Streaming) and configures PyTorch DataLoaders for optimal performance. ```python import pytorch_lightning as pl from torch.utils.data import DataLoader class SingleCellDataModule(pl.LightningDataModule): def __init__(self, data_path, batch_size=64, num_workers=4): super().__init__() self.data_path = data_path self.batch_size = batch_size self.num_workers = num_workers def setup(self, stage=None): # Load your data self.data = load_data(self.data_path) # Split indices indices = np.arange(len(self.data)) train_idx, val_idx = train_test_split(indices, test_size=0.2) # Create datasets self.train_dataset = scDataset( self.data, BlockShuffling(block_size=16, indices=train_idx), batch_size=self.batch_size ) self.val_dataset = scDataset( self.data, Streaming(indices=val_idx), batch_size=self.batch_size ) def train_dataloader(self): return DataLoader( self.train_dataset, batch_size=None, num_workers=self.num_workers, prefetch_factor=self.train_dataset.fetch_factor + 1 ) def val_dataloader(self): return DataLoader( self.val_dataset, batch_size=None, num_workers=self.num_workers, prefetch_factor=self.val_dataset.fetch_factor + 1 ) ``` -------------------------------- ### Distributed Training Setup with torchrun Source: https://scdataset.github.io/stable/ddp Commands to launch distributed training on multiple nodes using torchrun. Specify the number of processes per node, total nodes, rank of the current node, master IP address, and master port. ```bash # On node 0 (master): torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 \ --master_addr= --master_port=29500 train.py # On node 1: torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 \ --master_addr= --master_port=29500 train.py ``` -------------------------------- ### scDataset Weighted Sampling Strategy Source: https://scdataset.github.io/stable/quickstart Demonstrates the BlockWeightedSampling strategy for scDataset, allowing samples to be drawn based on custom weights. This is useful for scenarios where certain samples should be prioritized, such as rare classes in imbalanced datasets. It also allows specifying a total size for sampling per epoch. ```python from scdataset import BlockWeightedSampling # Sample with custom weights (e.g., higher weight for rare samples) weights = np.random.rand(len(data)) # Custom weights per sample strategy = BlockWeightedSampling( weights=weights, total_size=10000, # Generate 10000 samples per epoch block_size=16 ) dataset = scDataset(data, strategy, batch_size=64) ```