# CUDA Python CUDA Python is NVIDIA's official Python SDK for accessing the CUDA platform, providing comprehensive Pythonic interfaces to CUDA Runtime, Driver, and compiler toolchain. The project is structured as a metapackage containing multiple independently-versioned subpackages: `cuda.core` for idiomatic high-level access, `cuda.bindings` for low-level C API bindings, and `cuda.pathfinder` for locating CUDA components in the Python environment. It supports both CUDA 12.x and 13.x, enabling developers to write GPU-accelerated applications entirely in Python. The primary goal of CUDA Python is to flatten the learning curve for GPU development by providing Pythonic abstractions while maintaining full access to CUDA's capabilities. The `cuda.core` module offers high-level classes for device management, stream handling, memory allocation, kernel compilation, and graph-based execution. Meanwhile, `cuda.bindings` provides direct access to CUDA Driver, Runtime, NVRTC, nvJitLink, NVVM, and cuFile APIs for users requiring fine-grained control over GPU operations. ## Device Management The `Device` class is the entry point for cuda.core features, representing a GPU and managing CUDA contexts. It provides singleton objects for each device, ensuring interoperability across multiple libraries in the same process. ```python from cuda.core import Device # Get device by ID (0 is the default) dev = Device(0) # Initialize device and set as current dev.set_current() # Access device properties print(f"Device: {dev.name}") print(f"Compute Capability: {dev.compute_capability}") # e.g., (9, 0) print(f"Architecture: sm_{dev.arch}") # e.g., "sm_90" print(f"UUID: {dev.uuid}") print(f"PCI Bus ID: {dev.pci_bus_id}") # Query device properties props = dev.properties print(f"Multiprocessors: {props.multiprocessor_count}") print(f"Max threads per block: {props.max_threads_per_block}") print(f"Shared memory per block: {props.max_shared_memory_per_block} bytes") print(f"Warp size: {props.warp_size}") print(f"Memory pools supported: {props.memory_pools_supported}") # Check peer access between devices dev0 = Device(0) dev1 = Device(1) if dev0.can_access_peer(dev1): print("P2P access enabled between device 0 and 1") # Synchronize all operations on device dev.sync() # Get all available devices all_devices = Device.get_all_devices() for d in all_devices: print(f"Device {d.device_id}: {d.name}") ``` ## Stream Management Streams represent queues of GPU operations executed in order. Work within a single stream executes sequentially, while work across streams can execute concurrently. ```python from cuda.core import Device, StreamOptions dev = Device() dev.set_current() # Create a new stream with default options (non-blocking) stream = dev.create_stream() # Create stream with custom options options = StreamOptions( nonblocking=True, # Does not synchronize with NULL stream priority=0 # Lower number = higher priority ) stream = dev.create_stream(options=options) # Stream properties print(f"Is nonblocking: {stream.is_nonblocking}") print(f"Priority: {stream.priority}") print(f"Handle: {stream.handle}") print(f"Device: {stream.device}") # Synchronize stream (wait for all work to complete) stream.sync() # Wait for an event or another stream other_stream = dev.create_stream() stream.wait(other_stream) # Wait for other_stream's work # Record an event on the stream event = stream.record() # Access default streams from cuda.core import LEGACY_DEFAULT_STREAM, PER_THREAD_DEFAULT_STREAM # Create stream from foreign handle (e.g., from CuPy) foreign_handle = int(cupy_stream.ptr) stream = Stream.from_handle(foreign_handle) # Clean up stream.close() ``` ## Event Management Events record points in stream execution for synchronization, timing, and dependency management. ```python from cuda.core import Device, EventOptions dev = Device() dev.set_current() stream = dev.create_stream() # Create event with timing enabled event_opts = EventOptions( enable_timing=True, # Enable timing data collection busy_waited_sync=False, # Use spin-wait vs blocking sync ipc_enabled=False # Enable inter-process communication ) event = dev.create_event(options=event_opts) # Record events for timing start = stream.record(options=EventOptions(enable_timing=True)) # ... perform GPU operations ... end = stream.record(options=EventOptions(enable_timing=True)) end.sync() # Calculate elapsed time (in milliseconds) elapsed_ms = end - start print(f"Elapsed time: {elapsed_ms} ms") # Wait for event to complete event.sync() # Check if event completed (non-blocking) is_done = event.is_done # Stream waits for event stream.wait(event) # IPC events for multi-process communication ipc_event = stream.record(options=EventOptions(ipc_enabled=True)) descriptor = ipc_event.get_ipc_descriptor() # Send descriptor to another process... # Clean up event.close() stream.close() ``` ## Memory Management CUDA Python provides various memory resources for different use cases including device memory, pinned (page-locked) host memory, and managed memory. ```python from cuda.core import ( Device, Buffer, DeviceMemoryResource, PinnedMemoryResource, LegacyPinnedMemoryResource, ManagedMemoryResource, ) import numpy as np dev = Device() dev.set_current() stream = dev.create_stream() # Allocate device memory using default memory resource buf = dev.allocate(size=1024, stream=stream) print(f"Buffer handle: {buf.handle}") print(f"Buffer size: {buf.size} bytes") # Access memory resource directly device_mr = dev.memory_resource device_buf = device_mr.allocate(4096, stream=stream) # Pinned (page-locked) memory - accessible from GPU with zero-copy pinned_mr = LegacyPinnedMemoryResource() pinned_buf = pinned_mr.allocate(4096, stream=stream) # New-style pinned memory with async operations pinned_mr_new = PinnedMemoryResource() pinned_buf_new = pinned_mr_new.allocate(4096, stream=stream) # Managed memory (unified memory) - automatically migrated managed_mr = ManagedMemoryResource() managed_buf = managed_mr.allocate(4096, stream=stream) # Copy data between buffers src_buf = dev.allocate(1024, stream=stream) dst_buf = dev.allocate(1024, stream=stream) src_buf.copy_to(dst_buf, stream=stream) # Copy from host to device host_data = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32) src_buf.copy_from(host_data, stream=stream) # Fill buffer with a value buf.fill(value=0, stream=stream) # Zero-fill buf.fill(value=b'\xff', stream=stream) # Fill with byte pattern # DLPack interop (NumPy 2.1+, CuPy) import cupy as cp device_array = cp.from_dlpack(device_buf) pinned_array = np.from_dlpack(pinned_buf) # Create buffer from existing pointer (non-owning) external_ptr = some_library.get_gpu_pointer() buf = Buffer.from_handle(ptr=external_ptr, size=4096) # IPC for multi-process buffer sharing descriptor = buf.get_ipc_descriptor() # In another process: imported_buf = Buffer.from_ipc_descriptor(device_mr, descriptor, stream) # Deallocate (async on stream) buf.close(stream) # Context manager for automatic cleanup with dev.allocate(1024, stream=stream) as buf: # Use buffer... pass # Automatically closed stream.close() ``` ## Program Compilation The `Program` class compiles CUDA C++, PTX, or NVVM IR source code into executable object code using NVRTC or NVVM backends. ```python from cuda.core import Device, Program, ProgramOptions import sys dev = Device() dev.set_current() # CUDA C++ kernel source cuda_code = """ template __global__ void vector_add(const T* A, const T* B, T* C, size_t N) { const unsigned int tid = threadIdx.x + blockIdx.x * blockDim.x; for (size_t i = tid; i < N; i += gridDim.x * blockDim.x) { C[i] = A[i] + B[i]; } } """ # Configure compilation options options = ProgramOptions( name="vector_add_program", arch=f"sm_{dev.arch}", # Target architecture std="c++17", # C++ standard relocatable_device_code=False, # Enable for separate compilation debug=False, # Debug symbols lineinfo=False, # Line number info ftz=False, # Flush denormals to zero prec_sqrt=True, # Precise sqrt prec_div=True, # Precise division fma=True, # Fused multiply-add use_fast_math=False, # Fast but less precise math max_register_count=None, # Limit registers per thread define_macro=[ # Preprocessor defines "DEBUG", ("VERSION", "2"), ], include_path=["/path/to/headers"], ) # Create program from C++ source prog = Program(cuda_code, code_type="c++", options=options) # Compile to different target types # "cubin" - GPU binary, "ptx" - Parallel Thread Execution, "ltoir" - LTO IR object_code = prog.compile( target_type="cubin", name_expressions=("vector_add", "vector_add"), # Template instantiations logs=sys.stdout, # Optional: capture compilation logs ) print(f"Backend used: {prog.backend}") # "NVRTC", "NVVM", or linker backend # Compile PTX source directly ptx_code = """ .version 8.0 .target sm_90 .address_size 64 .visible .entry simple_kernel(.param .u64 ptr) { ret; } """ ptx_prog = Program(ptx_code, code_type="ptx") ptx_obj = ptx_prog.link("cubin") # NVVM IR compilation (requires cuda-bindings >= 12.9) nvvm_code = """ target datalayout = "e-p:64:64:64-i1:8:8-..." target triple = "nvptx64-nvidia-cuda" define void @simple_kernel() { ret void } """ nvvm_prog = Program(nvvm_code, code_type="nvvm", options=ProgramOptions( use_libdevice=True, # Load NVIDIA's math library )) nvvm_obj = nvvm_prog.compile("ptx") # Clean up prog.close() ``` ## ObjectCode and Kernel Loading `ObjectCode` represents compiled GPU code, and `Kernel` represents a function that can be launched on the GPU. ```python from cuda.core import Device, ObjectCode, Kernel dev = Device() dev.set_current() # Load cubin from file obj = ObjectCode.from_cubin("/path/to/kernel.cubin") # Load from bytes with open("/path/to/kernel.cubin", "rb") as f: cubin_data = f.read() obj = ObjectCode.from_cubin(cubin_data, name="my_kernels") # Load from PTX obj = ObjectCode.from_ptx(ptx_source, symbol_mapping={ "kernel_name": "_Z11kernel_namePfS_m", # Map names to mangled symbols }) # Other formats obj = ObjectCode.from_fatbin(fatbin_data) obj = ObjectCode.from_ltoir(ltoir_data) obj = ObjectCode.from_library(library_data) # Get kernel from object code kernel = obj.get_kernel("vector_add") # Kernel attributes attrs = kernel.attributes print(f"Max threads per block: {attrs.max_threads_per_block()}") print(f"Shared memory: {attrs.shared_size_bytes()} bytes") print(f"Registers per thread: {attrs.num_regs()}") print(f"Local memory: {attrs.local_size_bytes()} bytes") # Kernel occupancy calculations occ = kernel.occupancy max_blocks = occ.max_active_blocks_per_multiprocessor( block_size=256, dynamic_shared_memory_size=0 ) print(f"Max blocks per SM: {max_blocks}") # Get optimal block size result = occ.max_potential_block_size( dynamic_shared_memory_needed=0, block_size_limit=0 # 0 = no limit ) print(f"Optimal grid: {result.min_grid_size}, block: {result.max_block_size}") # Object code properties print(f"Code type: {obj.code_type}") print(f"Name: {obj.name}") print(f"Handle: {obj.handle}") ``` ## Kernel Launch The `launch` function executes kernels on the GPU with specified configuration. ```python from cuda.core import Device, LaunchConfig, Program, ProgramOptions, launch import cupy as cp dev = Device() dev.set_current() stream = dev.create_stream() # Compile kernel code = """ extern "C" __global__ void saxpy(float a, const float* x, const float* y, float* out, size_t N) { unsigned int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < N) { out[tid] = a * x[tid] + y[tid]; } } """ prog = Program(code, code_type="c++", options=ProgramOptions(arch=f"sm_{dev.arch}")) obj = prog.compile("cubin") kernel = obj.get_kernel("saxpy") # Prepare data N = 10000 a = 2.0 x = cp.random.random(N, dtype=cp.float32) y = cp.random.random(N, dtype=cp.float32) out = cp.empty(N, dtype=cp.float32) # Configure launch block_size = 256 grid_size = (N + block_size - 1) // block_size config = LaunchConfig( grid=grid_size, # Number of blocks (can be int or tuple) block=block_size, # Threads per block (can be int or tuple) shmem_size=0, # Dynamic shared memory in bytes ) # Launch kernel launch(stream, config, kernel, cp.float32(a), # Scalar argument x.data.ptr, # Device pointer (from CuPy) y.data.ptr, out.data.ptr, cp.uint64(N)) stream.sync() # Multi-dimensional grid/block config_3d = LaunchConfig( grid=(16, 16, 1), # 3D grid of blocks block=(8, 8, 4), # 3D block of threads ) # Thread Block Clusters (Hopper+ GPUs, sm_90+) config_cluster = LaunchConfig( grid=(4, 4, 1), # Grid of clusters cluster=(2, 2, 1), # Blocks per cluster block=(128, 1, 1), # Threads per block ) # Cooperative kernel launch config_coop = LaunchConfig( grid=grid_size, block=block_size, cooperative_launch=True, # Enable cooperative groups ) stream.close() ``` ## CUDA Graphs CUDA Graphs capture a sequence of operations and replay them with reduced launch overhead. This is ideal for repetitive workloads. ```python from cuda.core import Device, LaunchConfig, Program, ProgramOptions, launch dev = Device() dev.set_current() stream = dev.create_stream() # Compile kernels code = """ extern "C" __global__ void kernel_a(float* data, size_t N) { unsigned int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < N) data[tid] += 1.0f; } extern "C" __global__ void kernel_b(float* data, size_t N) { unsigned int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < N) data[tid] *= 2.0f; } """ prog = Program(code, code_type="c++", options=ProgramOptions(arch=f"sm_{dev.arch}")) obj = prog.compile("cubin") kernel_a = obj.get_kernel("kernel_a") kernel_b = obj.get_kernel("kernel_b") # Allocate memory import cupy as cp data = cp.zeros(1000, dtype=cp.float32) N = cp.uint64(data.size) config = LaunchConfig(grid=4, block=256) # Create graph builder from stream graph_builder = stream.create_graph_builder() # Or from device: graph_builder = dev.create_graph_builder() # Begin capturing operations graph_builder.begin_building() # Add operations to graph launch(graph_builder, config, kernel_a, data.data.ptr, N) launch(graph_builder, config, kernel_b, data.data.ptr, N) # Memory operations can also be captured buf = dev.allocate(1024, stream=graph_builder) buf.fill(0, stream=graph_builder) # Complete and instantiate graph graph_def = graph_builder.end_building() graph = graph_def.complete() # Upload graph to stream (optional optimization) graph.upload(stream) # Execute graph (replay all captured operations) for _ in range(100): # Efficient repeated execution graph.launch(stream) stream.sync() # Debug: print graph structure from cuda.core.graph import GraphDebugPrintOptions debug_opts = GraphDebugPrintOptions( verbose=True, kernel_node_params=True, ) graph_def.debug_print(options=debug_opts) # Clean up buf.close(stream) graph.close() graph_builder.close() stream.close() ``` ## Linker The `Linker` combines multiple object codes into a single executable, supporting relocatable device code. ```python from cuda.core import Device, Program, ProgramOptions, Linker, LinkerOptions, ObjectCode dev = Device() dev.set_current() # Compile separate modules with relocatable device code code_a = """ extern "C" __device__ float helper_func(float x) { return x * x; } """ code_b = """ extern "C" __device__ float helper_func(float x); extern "C" __global__ void main_kernel(float* data, size_t N) { unsigned int tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid < N) data[tid] = helper_func(data[tid]); } """ options = ProgramOptions( arch=f"sm_{dev.arch}", relocatable_device_code=True, # Required for separate compilation ) prog_a = Program(code_a, code_type="c++", options=options) obj_a = prog_a.compile("cubin") prog_b = Program(code_b, code_type="c++", options=options) obj_b = prog_b.compile("cubin") # Link object codes together linker_options = LinkerOptions( name="linked_program", arch=f"sm_{dev.arch}", link_time_optimization=False, debug=False, ) linker = Linker(obj_a, obj_b, options=linker_options) linked_obj = linker.link(target_type="cubin") # Get logs print(f"Info log: {linker.get_info_log()}") print(f"Error log: {linker.get_error_log()}") print(f"Backend: {linker.backend}") # "nvJitLink" or "driver" # Get kernel from linked code kernel = linked_obj.get_kernel("main_kernel") linker.close() ``` ## Low-Level Bindings (cuda.bindings) For direct access to CUDA C APIs, use the low-level bindings module. ```python from cuda.bindings import driver, runtime, nvrtc # Driver API example driver.cuInit(0) device_count = driver.cuDeviceGetCount()[1] print(f"Found {device_count} CUDA devices") device = driver.cuDeviceGet(0)[1] name = driver.cuDeviceGetName(256, device)[1] print(f"Device name: {name.decode()}") # Create context context = driver.cuCtxCreate(0, device)[1] # Runtime API example runtime.cudaSetDevice(0) props = runtime.cudaGetDeviceProperties(0)[1] print(f"Device: {props.name.decode()}") # Allocate memory via runtime ptr = runtime.cudaMalloc(1024)[1] runtime.cudaMemset(ptr, 0, 1024) runtime.cudaFree(ptr) # NVRTC compilation code = """extern "C" __global__ void kernel() {}""" prog = nvrtc.nvrtcCreateProgram(code.encode(), b"kernel.cu", 0, [], [])[1] nvrtc.nvrtcCompileProgram(prog, 0, []) ptx_size = nvrtc.nvrtcGetPTXSize(prog)[1] ptx = nvrtc.nvrtcGetPTX(prog, ptx_size)[1] nvrtc.nvrtcDestroyProgram(prog) # Clean up driver.cuCtxDestroy(context) ``` ## System Utilities The `cuda.core.system` module provides system-level information via NVML. ```python from cuda.core import system # Get number of available devices num_devices = system.get_num_devices() print(f"Number of CUDA devices: {num_devices}") # Get driver version driver_ver = system.driver_version print(f"CUDA Driver version: {driver_ver}") # Access system-level device info (requires cuda-bindings 13.1.2+ or 12.9.6+) from cuda.core import Device dev = Device(0) dev.set_current() # Convert to system device for NVML access sys_dev = dev.to_system_device() ``` ## Summary CUDA Python serves as the comprehensive Python interface for NVIDIA GPU computing, offering both high-level Pythonic abstractions through `cuda.core` and low-level direct API access through `cuda.bindings`. The high-level API simplifies common workflows like device initialization, memory management, kernel compilation, and graph-based execution, while the low-level bindings provide full access to CUDA Driver, Runtime, NVRTC, NVVM, and nvJitLink for advanced use cases. The unified memory model with `Buffer` and `MemoryResource` classes enables seamless interoperability with NumPy, CuPy, and other array libraries through DLPack. For production GPU applications, CUDA Python integrates naturally with the broader Python data science ecosystem. Typical integration patterns include: using CuPy arrays as kernel arguments while managing execution through cuda.core streams; leveraging CUDA Graphs for repetitive workloads in training loops; implementing custom GPU operators for PyTorch or JAX using the compilation API; and building multi-GPU applications with explicit memory management and P2P communication. The separation between `cuda.core` and `cuda.bindings` allows developers to start with high-level APIs and gradually incorporate low-level control as needed for performance optimization.