Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Theme
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Create API Key
Add Docs
PipeCat
https://github.com/pipecat-ai/pipecat
Admin
Pipecat is an open-source Python framework for building real-time voice and multimodal
...
Tokens:
248,397
Snippets:
1,283
Trust Score:
7.9
Update:
5 days ago
Context
Skills
Chat
Benchmark
75.1
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Pipecat Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services (STT, TTS, LLM), transports (WebRTC, WebSocket), and conversation pipelines using a frame-based architecture. The framework enables developers to build voice assistants, AI companions, multimodal interfaces, and complex dialog systems with ultra-low latency. The core architecture revolves around Frame objects flowing through a pipeline of FrameProcessors. Data frames (audio, text, video) and control frames flow downstream from input to output, while acknowledgments and errors flow upstream. Pipecat integrates with 60+ AI service providers including OpenAI, Anthropic, Deepgram, Cartesia, and ElevenLabs, and supports multiple transport layers including Daily WebRTC, LiveKit, and WebSocket connections. ## Pipeline and PipelineTask The Pipeline class chains FrameProcessors together in sequence. PipelineTask manages pipeline execution, handles lifecycle events, and provides monitoring capabilities including heartbeats and idle detection. ```python import asyncio from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.frames.frames import LLMRunFrame async def main(): # Initialize services stt = DeepgramSTTService(api_key="DEEPGRAM_API_KEY") llm = OpenAILLMService( api_key="OPENAI_API_KEY", settings=OpenAILLMService.Settings( model="gpt-4.1", system_instruction="You are a helpful voice assistant." ) ) tts = CartesiaTTSService( api_key="CARTESIA_API_KEY", settings=CartesiaTTSService.Settings(voice="71a7ad14-091c-4e8e-a314-022ece01c121") ) # Create context and aggregators for turn management context = LLMContext() user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()) ) # Build the pipeline pipeline = Pipeline([ transport.input(), # Audio input from transport stt, # Speech-to-text user_aggregator, # Aggregate user turns llm, # LLM processing tts, # Text-to-speech transport.output(), # Audio output to transport assistant_aggregator # Aggregate assistant responses ]) # Create and run task task = PipelineTask( pipeline, params=PipelineParams( enable_metrics=True, enable_usage_metrics=True, audio_in_sample_rate=16000, audio_out_sample_rate=24000 ), idle_timeout_secs=300 ) # Event handlers @task.event_handler("on_pipeline_started") async def on_started(task, frame): print("Pipeline started") @task.event_handler("on_pipeline_finished") async def on_finished(task, frame): print("Pipeline finished") runner = PipelineRunner(handle_sigint=True) await runner.run(task) asyncio.run(main()) ``` ## LLMContext and Message Management LLMContext manages conversation history, tool definitions, and multimedia content for LLM interactions. It provides methods for adding messages, images, and audio to the conversation context. ```python from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.frames.frames import AudioRawFrame # Create context with tools weather_function = FunctionSchema( name="get_weather", description="Get current weather for a location", properties={ "location": {"type": "string", "description": "City name"}, "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]} }, required=["location"] ) tools = ToolsSchema(standard_tools=[weather_function]) context = LLMContext(tools=tools) # Add messages to context context.add_message({"role": "system", "content": "You are a weather assistant."}) context.add_message({"role": "user", "content": "What's the weather in Paris?"}) # Add image to context image_message = await LLMContext.create_image_message( role="user", format="RGB", size=(640, 480), image=image_bytes, text="What do you see in this image?" ) context.add_message(image_message) # Add audio to context audio_frames = [AudioRawFrame(audio=data, sample_rate=16000, num_channels=1)] audio_message = await LLMContext.create_audio_message( role="user", audio_frames=audio_frames, text="Please transcribe this audio" ) context.add_message(audio_message) # Get all messages messages = context.messages # Returns list of messages ``` ## Custom FrameProcessor FrameProcessor is the base class for all pipeline components. Create custom processors to implement specialized frame handling logic. ```python from pipecat.processors.frame_processor import FrameProcessor, FrameDirection from pipecat.frames.frames import Frame, TextFrame, TranscriptionFrame, MetricsFrame class TranscriptionLogger(FrameProcessor): """Custom processor that logs all transcriptions passing through.""" def __init__(self, log_prefix: str = "", **kwargs): super().__init__(**kwargs) self._log_prefix = log_prefix async def process_frame(self, frame: Frame, direction: FrameDirection): # Always call parent first await super().process_frame(frame, direction) # Handle specific frame types if isinstance(frame, TranscriptionFrame): print(f"{self._log_prefix}Transcription: {frame.text}") elif isinstance(frame, MetricsFrame): print(f"{self._log_prefix}Metrics: {frame.data}") # IMPORTANT: Always push frames to maintain pipeline flow await self.push_frame(frame, direction) class TextTransformer(FrameProcessor): """Transform text frames before they reach TTS.""" async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, TextFrame): # Transform the text transformed_text = frame.text.upper() # Create new frame with transformed text new_frame = TextFrame(text=transformed_text) await self.push_frame(new_frame, direction) else: await self.push_frame(frame, direction) # Use in pipeline pipeline = Pipeline([ transport.input(), stt, TranscriptionLogger(log_prefix="[USER] "), user_aggregator, llm, TextTransformer(), tts, transport.output(), assistant_aggregator ]) ``` ## OpenAI LLM Service OpenAILLMService provides integration with OpenAI's chat completion API with support for function calling, streaming responses, and runtime settings updates. ```python from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.llm_service import FunctionCallParams from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema # Initialize service with settings llm = OpenAILLMService( api_key="OPENAI_API_KEY", settings=OpenAILLMService.Settings( model="gpt-4.1", system_instruction="You are a helpful assistant.", temperature=0.7, max_tokens=1000 ) ) # Define and register function handlers async def get_weather(params: FunctionCallParams): location = params.arguments.get("location", "Unknown") # Fetch weather data... await params.result_callback({ "location": location, "temperature": 72, "conditions": "sunny" }) async def search_database(params: FunctionCallParams): query = params.arguments.get("query", "") # Perform search... await params.result_callback({"results": ["item1", "item2"]}) # Register functions llm.register_function("get_weather", get_weather) llm.register_function("search_database", search_database) # Register catch-all handler for any function llm.register_function(None, lambda params: handle_any_function(params)) # Event handlers @llm.event_handler("on_function_calls_started") async def on_function_calls_started(service, function_calls): for call in function_calls: print(f"Calling function: {call.function_name}") @llm.event_handler("on_completion_timeout") async def on_timeout(service): print("LLM completion timed out") # Define tools schema tools = ToolsSchema(standard_tools=[ FunctionSchema( name="get_weather", description="Get weather for a location", properties={ "location": {"type": "string", "description": "City name"} }, required=["location"] ) ]) # Create context with tools context = LLMContext(tools=tools) ``` ## OpenAI Realtime LLM Service OpenAIRealtimeLLMService enables speech-to-speech conversations using OpenAI's Realtime API with built-in VAD and turn detection. ```python from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService from pipecat.services.openai.realtime.events import ( AudioConfiguration, AudioInput, InputAudioTranscription, InputAudioNoiseReduction, SemanticTurnDetection, SessionProperties ) llm = OpenAIRealtimeLLMService( api_key="OPENAI_API_KEY", settings=OpenAIRealtimeLLMService.Settings( system_instruction="""You are a friendly voice assistant. Keep responses concise and conversational.""", session_properties=SessionProperties( audio=AudioConfiguration( input=AudioInput( transcription=InputAudioTranscription(), turn_detection=SemanticTurnDetection(), noise_reduction=InputAudioNoiseReduction(type="near_field") ) ) ) ) ) # Register function handlers llm.register_function("get_weather", fetch_weather) llm.register_function("book_appointment", book_appointment) # Simplified pipeline - no separate STT/TTS needed pipeline = Pipeline([ transport.input(), user_aggregator, llm, # Handles both STT and TTS internally transport.output(), assistant_aggregator ]) ``` ## Deepgram STT Service DeepgramSTTService provides real-time speech-to-text transcription with support for interim results, multiple languages, and custom vocabulary. ```python from pipecat.services.deepgram.stt import DeepgramSTTService # Initialize with settings stt = DeepgramSTTService( api_key="DEEPGRAM_API_KEY", settings=DeepgramSTTService.Settings( model="nova-3-general", language="en-US", smart_format=True, punctuate=True, interim_results=True, endpointing=300, # ms of silence before finalizing utterance_end_ms=1000 ) ) # Event handlers @stt.event_handler("on_connected") async def on_connected(stt): print("Connected to Deepgram") @stt.event_handler("on_disconnected") async def on_disconnected(stt): print("Disconnected from Deepgram") @stt.event_handler("on_connection_error") async def on_error(stt, error): print(f"Deepgram error: {error}") # Use in pipeline pipeline = Pipeline([ transport.input(), stt, user_aggregator, llm, tts, transport.output(), assistant_aggregator ]) ``` ## Cartesia TTS Service CartesiaTTSService provides high-quality text-to-speech synthesis with support for multiple voices, emotions, and real-time streaming. ```python from pipecat.services.cartesia.tts import CartesiaTTSService, CartesiaEmotion, GenerationConfig from pipecat.frames.frames import TTSSpeakFrame # Initialize with voice and emotion settings tts = CartesiaTTSService( api_key="CARTESIA_API_KEY", settings=CartesiaTTSService.Settings( voice="71a7ad14-091c-4e8e-a314-022ece01c121", # Voice ID language="en", model="sonic-3", generation_config=GenerationConfig( speed=1.0, volume=1.0, emotion=CartesiaEmotion.EXCITED ) ) ) # Event handlers @tts.event_handler("on_connected") async def on_connected(tts): print("Connected to Cartesia") @tts.event_handler("on_tts_request") async def on_tts_request(tts, context_id, text): print(f"TTS request: {text}") # Speak text directly (outside normal pipeline flow) await tts.queue_frame(TTSSpeakFrame(text="Hello! How can I help you today?")) # Use in pipeline pipeline = Pipeline([ transport.input(), stt, user_aggregator, llm, tts, transport.output(), assistant_aggregator ]) ``` ## Transport Configuration Transports handle external I/O for audio and video. Pipecat supports Daily WebRTC, LiveKit, WebSocket, and local transports. ```python from pipecat.transports.base_transport import TransportParams from pipecat.transports.daily.transport import DailyParams, DailyTransport from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams, FastAPIWebsocketTransport from pipecat.audio.filters.krisp_viva_filter import KrispVivaFilter from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer # Daily WebRTC transport daily_transport = DailyTransport( room_url="https://your-domain.daily.co/room-name", token="your-daily-token", bot_name="Assistant", params=DailyParams( audio_in_enabled=True, audio_out_enabled=True, audio_in_sample_rate=16000, audio_out_sample_rate=24000, audio_in_filter=KrispVivaFilter(), # Noise suppression video_in_enabled=True, video_out_enabled=True, video_out_width=1280, video_out_height=720 ) ) # Event handlers @daily_transport.event_handler("on_client_connected") async def on_client_connected(transport, client): print(f"Client connected: {client}") # Start conversation context.add_message({"role": "developer", "content": "Greet the user."}) await task.queue_frames([LLMRunFrame()]) @daily_transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client): print("Client disconnected") await task.cancel() # WebSocket transport for telephony (Twilio, etc.) websocket_transport = FastAPIWebsocketTransport( params=FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, audio_in_sample_rate=8000, # Telephony sample rate audio_out_sample_rate=8000 ) ) # Use transport in pipeline pipeline = Pipeline([ daily_transport.input(), stt, user_aggregator, llm, tts, daily_transport.output(), assistant_aggregator ]) ``` ## Voice Activity Detection (VAD) SileroVADAnalyzer provides voice activity detection for determining when users start and stop speaking, enabling natural turn-taking in conversations. ```python from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, UserTurnStoppedMessage, AssistantTurnStoppedMessage ) # Configure VAD vad = SileroVADAnalyzer( params=VADParams( confidence=0.7, # Speech detection threshold start_secs=0.2, # Time before speech is confirmed stop_secs=0.8, # Silence duration to end turn min_volume=0.6 # Minimum audio volume threshold ) ) # Create aggregators with VAD context = LLMContext() user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( vad_analyzer=vad, user_turn_stop_timeout=5.0, # Max wait for user to finish user_idle_timeout=30.0, # Idle detection timeout audio_idle_timeout=1.0 # Handle mic mute during speech ) ) # Handle turn events @user_aggregator.event_handler("on_user_turn_stopped") async def on_user_turn(aggregator, strategy, message: UserTurnStoppedMessage): print(f"User said: {message.content}") @assistant_aggregator.event_handler("on_assistant_turn_stopped") async def on_assistant_turn(aggregator, message: AssistantTurnStoppedMessage): print(f"Assistant said: {message.content}") @user_aggregator.event_handler("on_user_turn_idle") async def on_idle(aggregator): print("User is idle - could prompt them") ``` ## Pipeline Observers Observers monitor frame flow without modifying the pipeline. Use them for logging, metrics collection, and debugging. ```python from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver from pipecat.observers.loggers.llm_log_observer import LLMLogObserver # Built-in observers transcription_observer = TranscriptionLogObserver() metrics_observer = MetricsLogObserver() llm_observer = LLMLogObserver() # Custom observer class CustomObserver(BaseObserver): async def on_push_frame(self, data: FramePushed): print(f"Frame pushed: {data.frame.name} by {data.processor.name}") async def on_process_frame(self, data: FrameProcessed): print(f"Frame processed: {data.frame.name}") # Attach observers to task task = PipelineTask( pipeline, params=PipelineParams(enable_metrics=True), observers=[ transcription_observer, metrics_observer, CustomObserver() ] ) ``` ## Frames Reference Frames are the data units that flow through pipelines. Key frame types include data frames for content and control frames for pipeline management. ```python from pipecat.frames.frames import ( # Audio frames OutputAudioRawFrame, InputAudioRawFrame, TTSAudioRawFrame, TTSSpeakFrame, # Text frames TextFrame, TranscriptionFrame, InterimTranscriptionFrame, LLMTextFrame, # LLM control frames LLMRunFrame, LLMContextFrame, LLMMessagesAppendFrame, LLMSetToolsFrame, LLMUpdateSettingsFrame, # Pipeline control frames StartFrame, EndFrame, CancelFrame, StopFrame, InterruptionFrame, # Speaking state frames UserStartedSpeakingFrame, UserStoppedSpeakingFrame, BotStartedSpeakingFrame, BotStoppedSpeakingFrame, # Image/Video frames ImageRawFrame, UserImageRawFrame ) # Queue frames to the pipeline task await task.queue_frames([ LLMRunFrame(), # Trigger LLM inference TTSSpeakFrame(text="Hello!"), # Speak text directly ]) # Append messages to context await task.queue_frames([ LLMMessagesAppendFrame(messages=[ {"role": "user", "content": "New user message"} ]) ]) # Update LLM settings at runtime await task.queue_frames([ LLMUpdateSettingsFrame(settings={"temperature": 0.5}) ]) ``` ## Complete Voice Agent Example A complete example showing a voice agent with function calling, event handling, and proper error management. ```python import asyncio import os from dotenv import load_dotenv from loguru import logger from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.services.openai.llm import OpenAILLMService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.llm_service import FunctionCallParams from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame from pipecat.transports.daily.transport import DailyTransport, DailyParams load_dotenv() # Function handlers async def get_weather(params: FunctionCallParams): location = params.arguments.get("location", "Unknown") await params.result_callback({ "location": location, "temperature": 72, "conditions": "sunny" }) async def book_appointment(params: FunctionCallParams): date = params.arguments.get("date") time = params.arguments.get("time") await params.result_callback({ "confirmed": True, "date": date, "time": time }) async def main(): # Initialize services stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), settings=CartesiaTTSService.Settings(voice="71a7ad14-091c-4e8e-a314-022ece01c121") ) llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), settings=OpenAILLMService.Settings( model="gpt-4.1", system_instruction="""You are a helpful voice assistant for scheduling. Keep responses brief and conversational.""" ) ) # Register functions llm.register_function("get_weather", get_weather) llm.register_function("book_appointment", book_appointment) # Speak while processing function calls @llm.event_handler("on_function_calls_started") async def on_function_calls(service, calls): await tts.queue_frame(TTSSpeakFrame("Let me check that for you.")) # Define tools tools = ToolsSchema(standard_tools=[ FunctionSchema( name="get_weather", description="Get weather for a location", properties={"location": {"type": "string"}}, required=["location"] ), FunctionSchema( name="book_appointment", description="Book an appointment", properties={ "date": {"type": "string", "description": "Date in YYYY-MM-DD"}, "time": {"type": "string", "description": "Time in HH:MM"} }, required=["date", "time"] ) ]) # Setup context and aggregators context = LLMContext(tools=tools) user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()) ) # Setup transport transport = DailyTransport( room_url=os.getenv("DAILY_ROOM_URL"), token=os.getenv("DAILY_TOKEN"), bot_name="Assistant", params=DailyParams(audio_in_enabled=True, audio_out_enabled=True) ) # Build pipeline pipeline = Pipeline([ transport.input(), stt, user_aggregator, llm, tts, transport.output(), assistant_aggregator ]) # Create task task = PipelineTask( pipeline, params=PipelineParams(enable_metrics=True, enable_usage_metrics=True), idle_timeout_secs=300 ) # Transport events @transport.event_handler("on_client_connected") async def on_connected(transport, client): logger.info("Client connected") context.add_message({"role": "developer", "content": "Greet the user warmly."}) await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") async def on_disconnected(transport, client): logger.info("Client disconnected") await task.cancel() # Task events @task.event_handler("on_pipeline_error") async def on_error(task, frame): logger.error(f"Pipeline error: {frame}") # Run runner = PipelineRunner(handle_sigint=True) await runner.run(task) if __name__ == "__main__": asyncio.run(main()) ``` ## Summary Pipecat excels at building real-time voice AI applications through its modular pipeline architecture. The framework handles the complexity of coordinating speech recognition, language models, and speech synthesis while providing clean abstractions for developers. Common use cases include voice assistants, customer service bots, interactive storytelling applications, and multimodal AI interfaces that combine voice with video and images. Integration patterns typically follow a standard flow: transport input captures audio, STT converts speech to text, context aggregators manage conversation history, LLMs generate responses, TTS converts text back to speech, and transport output delivers audio to users. The event-driven architecture enables reactive behaviors like interruption handling, function calling, and dynamic context updates. Developers can extend functionality through custom FrameProcessors and observers while leveraging 60+ pre-built service integrations for rapid development.