Try Live
Add Docs
Rankings
Pricing
Docs
Install
Install
Docs
Pricing
More...
More...
Try Live
Rankings
Enterprise
Create API Key
Add Docs
WFDB
https://github.com/mit-lcp/wfdb-python
Admin
WFDB is a Python package that provides tools for reading, writing, and processing physiologic
...
Tokens:
44,035
Snippets:
297
Trust Score:
9.1
Update:
2 weeks ago
Context
Skills
Chat
Benchmark
87.3
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# WFDB Python Package WFDB (Waveform Database) is a Python library that provides comprehensive tools for reading, writing, and processing physiologic signal and annotation data. The package implements open standards for storing, sharing, and analyzing biomedical signals such as ECGs, EEGs, and other physiological recordings. It has been widely used for decades to support biomedical research, clinical studies, and education. The library is heavily inspired by the original WFDB Software Package and replicates many of its command-line APIs in Python. Core I/O functionality follows the Waveform Database (WFDB) specifications for data interchange. Key features include native streaming from PhysioNet databases, support for multi-segment records, annotation handling, signal visualization with ECG grids, QRS detection algorithms, and format conversion utilities (CSV, EDF, MATLAB, WAV). ## Reading WFDB Records ### wfdb.rdrecord Read a WFDB record and return signal data with all record descriptors as a Record object. This is the primary function for loading physiologic signals. ```python import wfdb # Read a complete record from local files record = wfdb.rdrecord('sample-data/100') print(f"Record: {record.record_name}") print(f"Sampling frequency: {record.fs} Hz") print(f"Number of signals: {record.n_sig}") print(f"Signal length: {record.sig_len} samples") print(f"Signal names: {record.sig_name}") print(f"Units: {record.units}") print(f"Signal shape: {record.p_signal.shape}") # Read specific channels and sample range record = wfdb.rdrecord('sample-data/100', sampfrom=0, sampto=3000, channels=[0, 1]) # Stream data directly from PhysioNet record = wfdb.rdrecord('100', pn_dir='mitdb', sampto=10000) # Read digital values instead of physical units record = wfdb.rdrecord('sample-data/100', physical=False) print(f"Digital signal dtype: {record.d_signal.dtype}") ``` ### wfdb.rdsamp Simplified function to read signals and basic metadata. Returns a tuple of (signals, fields) instead of a Record object. ```python import wfdb import numpy as np # Read signals and fields directly signals, fields = wfdb.rdsamp('sample-data/100', sampto=10000) print(f"Signal array shape: {signals.shape}") print(f"Sampling frequency: {fields['fs']}") print(f"Signal names: {fields['sig_name']}") print(f"Units: {fields['units']}") # Read specific channels by name signals, fields = wfdb.rdsamp('100', pn_dir='mitdb', channel_names=['MLII', 'V5'], sampfrom=1000, sampto=5000) # Calculate basic statistics for i, name in enumerate(fields['sig_name']): print(f"{name}: mean={np.mean(signals[:,i]):.3f}, std={np.std(signals[:,i]):.3f}") ``` ### wfdb.rdheader Read only the header file without loading signal data. Useful for inspecting record metadata. ```python import wfdb # Read header from local file header = wfdb.rdheader('sample-data/100') print(f"Record name: {header.record_name}") print(f"Sampling frequency: {header.fs} Hz") print(f"Number of signals: {header.n_sig}") print(f"Signal length: {header.sig_len}") print(f"Base date: {header.base_date}") print(f"Base time: {header.base_time}") print(f"Comments: {header.comments}") # Read header from PhysioNet header = wfdb.rdheader('100', pn_dir='mitdb') # Read multi-segment record header with segment info header = wfdb.rdheader('p000878-2137-10-26-16-57', pn_dir='mimic3wdb/matched/p00/p000878', rd_segments=True) print(f"Segments: {header.seg_name}") print(f"Segment lengths: {header.seg_len}") ``` ## Writing WFDB Records ### wfdb.wrsamp Write signal data to WFDB format files (header .hea and data .dat files). ```python import wfdb import numpy as np # Create synthetic ECG-like data fs = 360 # Sampling frequency duration = 10 # seconds t = np.linspace(0, duration, fs * duration) signal1 = 0.5 * np.sin(2 * np.pi * 1.2 * t) + 0.1 * np.random.randn(len(t)) signal2 = 0.3 * np.sin(2 * np.pi * 1.2 * t + np.pi/4) + 0.1 * np.random.randn(len(t)) signals = np.column_stack([signal1, signal2]) # Write record with physical signals wfdb.wrsamp( record_name='my_ecg_record', fs=fs, units=['mV', 'mV'], sig_name=['Lead_I', 'Lead_II'], p_signal=signals, fmt=['16', '16'], comments=['Synthetic ECG data', 'Created with WFDB Python'] ) # Write with specific ADC parameters wfdb.wrsamp( record_name='my_record_custom', fs=500, units=['mV', 'mV'], sig_name=['ECG1', 'ECG2'], p_signal=signals[:5000], fmt=['212', '212'], adc_gain=[200, 200], baseline=[0, 0], write_dir='/tmp/output' ) ``` ### Record.wrsamp Write records using the Record object's instance method for more control. ```python import wfdb import numpy as np # Create a Record object manually record = wfdb.Record( record_name='custom_record', fs=250, n_sig=2, sig_len=5000, file_name=['custom_record.dat', 'custom_record.dat'], fmt=['16', '16'], adc_gain=[200, 200], baseline=[0, 0], units=['mV', 'mV'], sig_name=['ECG', 'Resp'], p_signal=np.random.randn(5000, 2) * 0.5 ) # Write using instance method record.wrsamp(write_dir='/tmp/output') # Read and modify existing record, then save record = wfdb.rdrecord('sample-data/100', sampto=10000) record.record_name = '100_modified' record.comments = ['Modified record'] record.wrsamp(write_dir='/tmp/output') ``` ## Reading and Writing Annotations ### wfdb.rdann Read annotation files containing beat labels, rhythm changes, and other markers. ```python import wfdb # Read all annotations ann = wfdb.rdann('sample-data/100', 'atr') print(f"Number of annotations: {len(ann.sample)}") print(f"Annotation samples: {ann.sample[:10]}") print(f"Annotation symbols: {ann.symbol[:10]}") # Read annotations for a specific sample range ann = wfdb.rdann('sample-data/100', 'atr', sampfrom=0, sampto=300000) # Stream annotations from PhysioNet ann = wfdb.rdann('100', 'atr', pn_dir='mitdb') # Get summary of annotation labels ann = wfdb.rdann('sample-data/100', 'atr', summarize_labels=True) print(ann.contained_labels) # Access auxiliary notes for i, (sample, symbol, note) in enumerate(zip(ann.sample[:5], ann.symbol[:5], ann.aux_note[:5])): print(f"Sample {sample}: {symbol} - {note}") ``` ### wfdb.wrann Write annotation files with beat locations and labels. ```python import wfdb import numpy as np # Write basic annotations sample_locs = np.array([100, 450, 800, 1150, 1500]) symbols = ['N', 'N', 'V', 'N', 'N'] wfdb.wrann( record_name='my_record', extension='atr', sample=sample_locs, symbol=symbols ) # Write annotations with auxiliary notes wfdb.wrann( record_name='my_record', extension='atr', sample=sample_locs, symbol=symbols, aux_note=[None, None, 'PVC detected', None, None], fs=360 ) # Copy and modify annotations ann = wfdb.rdann('sample-data/100', 'atr', sampto=5000) wfdb.wrann( record_name='100_copy', extension='atr', sample=ann.sample, symbol=ann.symbol, write_dir='/tmp/output' ) ``` ### wfdb.show_ann_labels and wfdb.show_ann_classes Display standard WFDB annotation labels and classes. ```python import wfdb # Show all standard annotation labels wfdb.show_ann_labels() # Output includes: N (Normal beat), V (PVC), A (Atrial premature), etc. # Show annotation classes (file extensions) wfdb.show_ann_classes() # Output includes: atr (Reference annotations), qrs (QRS detections), etc. ``` ## Downloading from PhysioNet ### wfdb.dl_database Download complete databases from PhysioNet. ```python import wfdb import os # Download a small database wfdb.dl_database('ahadb', dl_dir='./downloaded_data') # Download specific records wfdb.dl_database( 'mitdb', dl_dir='./mitdb_subset', records=['100', '101', '102'], annotators=['atr'] ) # Resume interrupted download (checks existing files) wfdb.dl_database( 'ltafdb', dl_dir='./ltafdb', overwrite=False # Skip already downloaded files ) ``` ### wfdb.dl_files Download specific files from a PhysioNet database. ```python import wfdb import os # Download specific files wfdb.dl_files( db='mitdb', dl_dir='./specific_files', files=['100.hea', '100.dat', '100.atr'] ) # Download documentation wfdb.dl_files( db='mitdb', dl_dir='./docs', files=['RECORDS', 'ANNOTATORS'] ) ``` ### wfdb.get_dbs and wfdb.get_record_list Query available databases and records. ```python import wfdb # Get list of all PhysioNet databases databases = wfdb.get_dbs() for db_name, description in databases[:5]: print(f"{db_name}: {description}") # Get list of records in a database records = wfdb.get_record_list('mitdb') print(f"MIT-BIH database has {len(records)} records") print(f"First 5 records: {records[:5]}") # Get records from a nested database records = wfdb.get_record_list('mimic3wdb/matched/p00/p000878') ``` ## Plotting Signals and Annotations ### wfdb.plot_wfdb Plot WFDB Record and Annotation objects with ECG grids and annotation symbols. ```python import wfdb # Read record and annotations record = wfdb.rdrecord('sample-data/100', sampto=3000) annotation = wfdb.rdann('sample-data/100', 'atr', sampto=3000) # Basic plot wfdb.plot_wfdb(record=record, annotation=annotation) # Plot with ECG grid and annotation symbols wfdb.plot_wfdb( record=record, annotation=annotation, plot_sym=True, time_units='seconds', title='MIT-BIH Record 100', figsize=(12, 6), ecg_grids='all' ) # Plot record only wfdb.plot_wfdb( record=record, time_units='seconds', title='ECG Signal', figsize=(10, 4) ) # Get figure object for further customization fig = wfdb.plot_wfdb( record=record, return_fig=True ) fig.savefig('ecg_plot.png', dpi=150) ``` ### wfdb.plot_items Low-level plotting function for custom signal and annotation arrays. ```python import wfdb import numpy as np # Read data record = wfdb.rdrecord('sample-data/100', sampto=3000) ann = wfdb.rdann('sample-data/100', 'atr', sampto=3000) # Plot with custom styling wfdb.plot_items( signal=record.p_signal, ann_samp=[ann.sample, ann.sample], # Annotations for each channel ann_sym=[ann.symbol, ann.symbol], fs=record.fs, time_units='seconds', sig_name=record.sig_name, sig_units=record.units, title='Custom ECG Plot', figsize=(12, 6), ecg_grids='all', sig_style=['b-', 'g-'], ann_style=['r*', 'r*'] ) # Plot synthetic data t = np.linspace(0, 5, 1000) signal = np.sin(2 * np.pi * t) peaks = np.array([100, 300, 500, 700, 900]) wfdb.plot_items( signal=signal, ann_samp=[peaks], fs=200, time_units='seconds', title='Synthetic Signal with Peaks' ) ``` ## Signal Processing ### QRS Detection with XQRS Detect QRS complexes using the XQRS algorithm optimized for ECG signals. ```python import wfdb from wfdb import processing # Read ECG signal sig, fields = wfdb.rdsamp('sample-data/100', channels=[0]) # Create XQRS detector and run detection xqrs = processing.XQRS(sig=sig[:, 0], fs=fields['fs']) xqrs.detect() print(f"Detected {len(xqrs.qrs_inds)} QRS complexes") print(f"First 10 QRS locations: {xqrs.qrs_inds[:10]}") # Plot results wfdb.plot_items( signal=sig, ann_samp=[xqrs.qrs_inds], fs=fields['fs'], time_units='seconds', title='XQRS Detection Results' ) # Use convenience function qrs_inds = processing.xqrs_detect( sig=sig[:, 0], fs=fields['fs'], verbose=True ) # Custom configuration conf = processing.XQRS.Conf( hr_init=80, # Initial heart rate estimate hr_max=200, # Maximum heart rate hr_min=40, # Minimum heart rate qrs_width=0.1 # Expected QRS width in seconds ) xqrs = processing.XQRS(sig=sig[:, 0], fs=fields['fs'], conf=conf) xqrs.detect() ``` ### QRS Detection with GQRS Alternative QRS detection algorithm from the original WFDB package. ```python import wfdb from wfdb import processing # Read record record = wfdb.rdrecord('sample-data/100', channels=[0]) # Detect QRS using physical signal qrs_locs = processing.gqrs_detect( sig=record.p_signal[:, 0], fs=record.fs ) print(f"Detected {len(qrs_locs)} QRS complexes") # Detect using digital signal (exact replication of original algorithm) record_dig = wfdb.rdrecord('sample-data/100', channels=[0], physical=False) qrs_locs_dig = processing.gqrs_detect( d_sig=record_dig.d_signal[:, 0], fs=record_dig.fs, adc_gain=record_dig.adc_gain[0], adc_zero=record_dig.adc_zero[0] ) ``` ### Heart Rate Computation Calculate heart rate from QRS detections. ```python import wfdb from wfdb import processing # Detect QRS sig, fields = wfdb.rdsamp('sample-data/100', channels=[0]) qrs_inds = processing.xqrs_detect(sig=sig[:, 0], fs=fields['fs']) # Calculate R-R intervals rr_intervals = processing.calc_rr( qrs_locs=qrs_inds, fs=fields['fs'], rr_units='seconds' ) print(f"Mean R-R interval: {rr_intervals.mean():.3f} seconds") # Calculate mean heart rate mean_hr = processing.calc_mean_hr( rr=rr_intervals, rr_units='seconds' ) print(f"Mean heart rate: {mean_hr:.1f} BPM") # Calculate instantaneous heart rate heart_rate = processing.compute_hr( sig_len=len(sig), qrs_inds=qrs_inds, fs=fields['fs'] ) # Get R-R intervals from annotation file rr = processing.ann2rr('sample-data/100', 'atr', format='s', as_array=True) print(f"First 5 R-R intervals: {rr[:5]}") ``` ### Peak Correction and Detection Refine detected peak locations and find local peaks. ```python import wfdb from wfdb import processing # Read signal sig, fields = wfdb.rdsamp('sample-data/100', channels=[0], sampto=10000) # Initial QRS detection qrs_inds = processing.xqrs_detect(sig=sig[:, 0], fs=fields['fs']) # Correct peak locations to local maxima corrected_peaks = processing.correct_peaks( sig=sig[:, 0], peak_inds=qrs_inds, search_radius=int(0.05 * fields['fs']), # 50ms search window smooth_window_size=int(0.02 * fields['fs']), # 20ms smoothing peak_dir='up' # Look for upward peaks ) # Find all local peaks in signal local_peaks = processing.find_local_peaks( sig=sig[:, 0], radius=int(0.1 * fields['fs']) # 100ms neighborhood ) print(f"Found {len(local_peaks)} local peaks") ``` ### Signal Resampling Resample signals and annotations to different sampling frequencies. ```python import wfdb from wfdb import processing # Read signal and annotation sig, fields = wfdb.rdsamp('sample-data/100', channels=[0], sampto=10000) ann = wfdb.rdann('sample-data/100', 'atr', sampto=10000) # Resample signal only resampled_sig, resampled_t = processing.resample_sig( x=sig[:, 0], fs=fields['fs'], fs_target=250 # Target 250 Hz ) print(f"Original samples: {len(sig)}, Resampled: {len(resampled_sig)}") # Resample signal with annotations resampled_sig, resampled_ann = processing.resample_singlechan( x=sig[:, 0], ann=ann, fs=fields['fs'], fs_target=250 ) print(f"Original annotations: {len(ann.sample)}") print(f"Resampled annotations: {len(resampled_ann.sample)}") # Resample annotation indices only new_ann_samples = processing.resample_ann( ann_sample=ann.sample, fs=fields['fs'], fs_target=250 ) ``` ### Annotation Comparison and Evaluation Compare detected annotations against reference annotations. ```python import wfdb from wfdb import processing # Read signal and reference annotations sig, fields = wfdb.rdsamp('sample-data/100', channels=[0]) ann_ref = wfdb.rdann('sample-data/100', 'atr') # Run QRS detection xqrs = processing.XQRS(sig=sig[:, 0], fs=fields['fs']) xqrs.detect() # Compare detections with reference comparitor = processing.Comparitor( ref_sample=ann_ref.sample[1:], # Skip first annotation test_sample=xqrs.qrs_inds, window_width=int(0.1 * fields['fs']), # 100ms tolerance signal=sig[:, 0] ) comparitor.compare() # Print performance metrics comparitor.print_summary() # Plot comparison comparitor.plot(title='QRS Detection Comparison', figsize=(14, 6)) # Use convenience function comparitor = processing.compare_annotations( ref_sample=ann_ref.sample[1:], test_sample=xqrs.qrs_inds, window_width=int(0.1 * fields['fs']), signal=sig[:, 0] ) ``` ## Format Conversions ### CSV to WFDB Convert CSV files containing signal data to WFDB format. ```python from wfdb.io.convert.csv import csv_to_wfdb # Convert CSV with header row csv_to_wfdb( file_name='sample-data/100.csv', fs=360, units='mV' ) # Convert with custom parameters csv_to_wfdb( file_name='my_signals.csv', fs=500, units=['mV', 'mV', 'uV'], sig_name=['ECG1', 'ECG2', 'EMG'], fmt=['16', '16', '16'], comments=['Converted from CSV'], header=True, delimiter=',' ) # Get record object without writing files record = csv_to_wfdb( file_name='my_signals.csv', fs=500, units='mV', record_only=True ) ``` ### EDF to WFDB Read European Data Format (EDF) files into WFDB Record objects. ```python from wfdb.io.convert.edf import read_edf, wfdb_to_edf # Read EDF file record = read_edf('my_recording.edf') print(f"Signals: {record.sig_name}") print(f"Sampling frequency: {record.fs}") # Read EDF from PhysioNet record = read_edf( 'x001_FAROS.edf', pn_dir='simultaneous-measurements/raw_data' ) # Convert WFDB to EDF wfdb_to_edf('100', pn_dir='mitdb', output_filename='100_converted.edf') ``` ### WFDB to MATLAB Export WFDB records to MATLAB .mat format. ```python from wfdb.io.convert.matlab import wfdb_to_mat # Convert record to MATLAB format wfdb_to_mat('100', pn_dir='mitdb') # Creates 100m.mat and 100m.hea files # Convert specific channels and sample range wfdb_to_mat( '100', pn_dir='mitdb', sampfrom=0, sampto=10000, channels=[0] ) ``` ### WAV Conversion Convert between WFDB and WAV audio formats. ```python from wfdb.io.convert.wav import read_wav, wfdb_to_wav # Read WAV file as WFDB Record record = read_wav('sample-data/SC4001E0-PSG.wav') # Convert WFDB to WAV wfdb_to_wav('100', pn_dir='mitdb', output_filename='100_audio.wav') ``` ## Record Object Methods ### Digital-Analog Conversion Convert between physical and digital signal representations. ```python import wfdb # Read record in digital format record = wfdb.rdrecord('sample-data/100', physical=False) print(f"Digital signal range: [{record.d_signal.min()}, {record.d_signal.max()}]") # Convert digital to analog (physical) p_signal = record.dac() print(f"Physical signal range: [{p_signal.min():.3f}, {p_signal.max():.3f}]") # In-place conversion record.dac(inplace=True) print(f"Now has p_signal: {record.p_signal is not None}") print(f"d_signal cleared: {record.d_signal is None}") # Convert analog to digital record.adc(inplace=True) ``` ### DataFrame Conversion Convert Record to pandas DataFrame for analysis. ```python import wfdb record = wfdb.rdrecord('sample-data/100', sampto=10000) # Convert to DataFrame df = record.to_dataframe() print(df.head()) print(f"Columns: {df.columns.tolist()}") print(f"Index type: {type(df.index)}") # Use DataFrame for analysis print(f"Signal statistics:\n{df.describe()}") ``` ### Multi-Segment Records Work with multi-segment records that contain multiple continuous segments. ```python import wfdb # Read multi-segment record as single record (default) record = wfdb.rdrecord( 'p000878-2137-10-26-16-57', pn_dir='mimic3wdb/matched/p00/p000878', sampto=10000 ) # Read as MultiRecord object multi_record = wfdb.rdrecord( 'p000878-2137-10-26-16-57', pn_dir='mimic3wdb/matched/p00/p000878', m2s=False # Don't convert multi to single ) print(f"Number of segments: {len(multi_record.seg_name)}") print(f"Segment names: {multi_record.seg_name}") # Convert MultiRecord to single Record single_record = multi_record.multi_to_single(physical=True) # Query signal ranges in variable layout records ranges = multi_record.contained_ranges('II') print(f"Signal 'II' present in ranges: {ranges}") ``` ## Summary WFDB Python is the essential toolkit for working with physiologic signal data in biomedical research and clinical applications. The library excels at reading and writing standardized WFDB format files, enabling seamless data interchange between researchers and healthcare systems worldwide. Its native integration with PhysioNet provides direct access to hundreds of curated databases containing ECG, EEG, and other physiologic recordings. The package's processing capabilities include robust QRS detection algorithms (XQRS and GQRS), heart rate computation, signal resampling, and annotation comparison tools essential for algorithm development and validation. Combined with matplotlib-based visualization supporting standard ECG paper grids, researchers can quickly analyze and present physiologic signals. Format conversion utilities for CSV, EDF, MATLAB, and WAV ensure interoperability with other analysis environments, making WFDB Python a comprehensive solution for the entire physiologic signal analysis workflow.