### Clone and Run Starter App
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Clone the starter app repository and install dependencies to get a basic ChatKit app running.
```sh
git clone https://github.com/openai/openai-chatkit-starter-app.git
cd openai-chatkit-starter-app/chatkit
npm install
npm run dev
```
--------------------------------
### Install ChatKit Python Package
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Install the Python package for ChatKit to set up the server-side component.
```sh
pip install openai-chatkit
```
--------------------------------
### Install ChatKit React Bindings
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Install the necessary npm package for integrating ChatKit with a React application.
```sh
npm install @openai/chatkit-react
```
--------------------------------
### Postgres Store Implementation for ChatKit
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md
Example implementation of a `Store` interface using PostgreSQL to manage threads and items. It includes schema initialization and methods for loading and saving thread metadata.
```python
class MyPostgresStore(Store[RequestContext]):
def __init__(self, conninfo: str) -> None:
self._conninfo = conninfo
self._init_schema()
def _init_schema(self) -> None:
with self._connection() as conn, conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS threads (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB NOT NULL
);
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
thread_id TEXT NOT NULL
REFERENCES threads (id)
ON DELETE CASCADE,
user_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB NOT NULL
);
"""
)
conn.commit()
async def load_thread(
self, thread_id: str, context: RequestContext
) -> ThreadMetadata:
with self._connection() as conn, conn.cursor(row_factory=tuple_row) as cur:
cur.execute(
"SELECT data FROM threads WHERE id = %s AND user_id = %s",
(thread_id, context.user_id),
)
row = cur.fetchone()
if row is None:
raise NotFoundError(f"Thread {thread_id} not found")
return ThreadMetadata.model_validate(row[0])
async def save_thread(
self, thread: ThreadMetadata, context: RequestContext
) -> None:
payload = thread.model_dump(mode="json")
with self._connection() as conn, conn.cursor() as cur:
cur.execute(
"""
INSERT INTO threads (id, user_id, created_at, data)
VALUES (%s, %s, %s, %s)
""",
(thread.id, context.user_id, thread.created_at, payload),
)
conn.commit()
# Implement the remaining Store methods following the same pattern.
```
--------------------------------
### Implement In-Memory ChatKit Store
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Provides an in-memory store implementation for ChatKit servers. This is useful for the quickstart to keep examples minimal and for testing. It omits persistence and attachment handling, which are required for production.
```python
from collections import defaultdict
from chatkit.store import NotFoundError, Store
from chatkit.types import Attachment, Page, ThreadItem, ThreadMetadata
class MyChatKitStore(Store[dict]):
def __init__(self):
self.threads: dict[str, ThreadMetadata] = {}
self.items: dict[str, list[ThreadItem]] = defaultdict(list)
async def load_thread(self, thread_id: str, context: dict) -> ThreadMetadata:
if thread_id not in self.threads:
raise NotFoundError(f"Thread {thread_id} not found")
return self.threads[thread_id]
async def save_thread(self, thread: ThreadMetadata, context: dict) -> None:
self.threads[thread.id] = thread
async def load_threads(
self, limit: int, after: str | None, order: str, context: dict
) -> Page[ThreadMetadata]:
threads = list(self.threads.values())
return self._paginate(
threads, after, limit, order, sort_key=lambda t: t.created_at, cursor_key=lambda t: t.id
)
async def load_thread_items(
self, thread_id: str, after: str | None, limit: int, order: str, context: dict
) -> Page[ThreadItem]:
items = self.items.get(thread_id, [])
return self._paginate(
items, after, limit, order, sort_key=lambda i: i.created_at, cursor_key=lambda i: i.id
)
async def add_thread_item(
self, thread_id: str, item: ThreadItem, context: dict
) -> None:
self.items[thread_id].append(item)
async def save_item(
self, thread_id: str, item: ThreadItem, context: dict
) -> None:
items = self.items[thread_id]
for idx, existing in enumerate(items):
if existing.id == item.id:
items[idx] = item
return
items.append(item)
async def load_item(
self, thread_id: str, item_id: str, context: dict
) -> ThreadItem:
for item in self.items.get(thread_id, []):
if item.id == item_id:
return item
raise NotFoundError(f"Item {item_id} not found in thread {thread_id}")
async def delete_thread(self, thread_id: str, context: dict) -> None:
self.threads.pop(thread_id, None)
self.items.pop(thread_id, None)
async def delete_thread_item(
self, thread_id: str, item_id: str, context: dict
) -> None:
self.items[thread_id] = [
item for item in self.items.get(thread_id, []) if item.id != item_id
]
def _paginate(self, rows: list, after: str | None, limit: int, order: str, sort_key, cursor_key):
sorted_rows = sorted(rows, key=sort_key, reverse=order == "desc")
start = 0
if after:
for idx, row in enumerate(sorted_rows):
if cursor_key(row) == after:
start = idx + 1
break
data = sorted_rows[start : start + limit]
has_more = start + limit < len(sorted_rows)
next_after = cursor_key(data[-1]) if has_more and data else None
return Page(data=data, has_more=has_more, after=next_after)
# Attachments are intentionally not implemented for the quickstart
async def save_attachment(
self, attachment: Attachment, context: dict
) -> None:
raise NotImplementedError()
async def load_attachment(
self, attachment_id: str, context: dict
) -> Attachment:
raise NotImplementedError()
async def delete_attachment(self, attachment_id: str, context: dict) -> None:
raise NotImplementedError()
```
--------------------------------
### Handle Actions on the Client
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Provide the 'widgets.onAction' callback when initializing ChatKit on the client. This example shows handling a 'save_profile' action and optionally forwarding a follow-up action to the server.
```ts
const chatkit = useChatKit({
// ...
widgets: {
onAction: async (action, widgetItem) => {
if (action.type === "save_profile") {
const result = await saveProfile(action.payload);
// Optionally invoke a server action after client-side work completes.
await chatkit.sendCustomAction(
{
type: "save_profile_complete",
payload: {...result, user_id: action.payload.user_id},
},
widgetItem.id,
);
}
},
},
});
```
--------------------------------
### Implement Localization with gettext
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/prepare-your-app-for-production.md
This example demonstrates how to use the `gettext` library to load translations based on the provided locale and translate strings for UI elements and error messages within a function tool.
```python
from pathlib import Path
import gettext
from agents import RunContextWrapper, function_tool
from chatkit.agents import AgentContext
LOCALE_DIR = Path(__file__).with_suffix("").parent / "locales"
_translations: dict[str, gettext.NullTranslations] = {}
def get_translations(locale: str) -> gettext.NullTranslations:
"""Return a gettext translation object for the given locale."""
if locale not in _translations:
_translations[locale] = gettext.translation(
"messages", # your .po/.mo domain
localedir=LOCALE_DIR,
languages=[locale],
fallback=True,
)
return _translations[locale]
@function_tool()
async def load_document(
ctx: RunContextWrapper[AgentContext],
document_id: str,
):
locale = ctx.context.request_context.locale
_ = get_translations(locale).gettext
await ctx.context.stream_progress(
icon="document",
text=_("Loading document…"),
)
doc = await get_document_by_id(document_id)
if not doc:
raise ValueError(_("We couldn’t find that document."))
return doc
```
--------------------------------
### Send Messages with sendUserMessage
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-your-app-draft-and-send-messages.md
Initiate a chat turn directly using `sendUserMessage`. This example shows sending a canned support message and starting a new thread.
```tsx
export function Inbox() {
const {
control,
sendUserMessage,
setThreadId,
} = useChatKit({
// ...
});
const handleHelpClick = () => {
// Send a canned message from a fresh thread
sendUserMessage({text: "I need help with my billing.", newThread: true});
};
return (
<>
>
);
}
```
--------------------------------
### ChatKitServer and FastAPI Endpoint
Source: https://context7.com/openai/chatkit-python/llms.txt
This example demonstrates how to implement a minimal ChatKit server using an in-memory store and expose it via a FastAPI endpoint. The `MyChatKitServer` subclass overrides the `respond` method to provide a simple greeting. The FastAPI app handles incoming requests, processes them using the server, and returns either a streaming response for SSE or a JSON response.
```APIDOC
## POST /chatkit
### Description
Handles incoming chat requests, processes them using the ChatKit server, and returns either a streaming response (Server-Sent Events) for ongoing conversations or a JSON response for completed ones.
### Method
POST
### Endpoint
/chatkit
### Request Body
- **(bytes)** - Required - Raw JSON payload representing the chat request.
### Response
#### Success Response (200 OK)
- **(StreamingResponse)** - For streaming responses (SSE).
- **(Response)** - For non-streaming JSON responses.
### Request Example
```python
# Example of sending a request to the /chatkit endpoint
# This would typically be handled by the ChatKit.js frontend
```
### Response Example
#### Streaming Response (SSE)
```
event: thread_item_created
data: {"type": "assistant_message", "thread_id": "...", "id": "...", "created_at": "...", "content": [{"type": "text", "text": "Hello from ChatKit!"}]}
event: thread_item_done
data: {"type": "thread_item_done", "item_id": "..."}
```
#### JSON Response
```json
{
"response": "..."
}
```
```
--------------------------------
### Implement ChatKitServer Respond Method
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md
Subclass ChatKitServer and implement the respond method to yield streaming events for a user's turn. This example provides a basic text response.
```python
from collections.abc import AsyncIterator
from datetime import datetime
from chatkit.server import ChatKitServer
from chatkit.types import (
AssistantMessageContent,
AssistantMessageItem,
ThreadItemDoneEvent,
ThreadMetadata,
ThreadStreamEvent,
UserMessageItem,
)
class MyChatKitServer(ChatKitServer[MyRequestContext]):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: MyRequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
# Replace this with your inference pipeline.
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
thread_id=thread.id,
id=self.store.generate_item_id("message", thread, context),
created_at=datetime.now(),
content=[AssistantMessageContent(text="Hi there!")],
)
)
```
--------------------------------
### Handle Actions on the Server
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Implement the 'ChatKitServer.action' method to process incoming actions. This example handles a 'send_message' action, stores it as a hidden context item, and streams an updated widget.
```python
from datetime import datetime
from chatkit.server import ChatKitServer, stream_widget
from chatkit.types import HiddenContextItem, WidgetItem
class MyChatKitServer(ChatKitServer[RequestContext]):
async def action(self, thread, action, sender, context):
if action.type == "send_message":
await send_to_chat(action.payload["text"])
# Record the user action so the model can see it on the next turn.
hidden = HiddenContextItem(
id="generated-item-id",
thread_id=thread.id,
created_at=datetime.now(),
content=f"User sent message: {action.payload['text']}",
)
# HiddenContextItems need to be manually saved because ChatKitServer
# only auto-saves streamed items, and HiddenContextItem should never be streamed to the client.
await self.store.add_thread_item(thread.id, hidden, context)
# Stream an updated widget back to the client.
updated_widget = build_message_widget(text=action.payload["text"])
async for event in stream_widget(
thread,
updated_widget,
generate_id=lambda item_type: self.store.generate_item_id(
item_type, thread, context
),
):
yield event
```
--------------------------------
### Define a Button with an onClickAction
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Configure actions directly in the widget definition using components like Button.onClickAction. This example shows a button that triggers a 'send_message' action.
```jsx
```
--------------------------------
### Streaming Agent Responses and Handling Guardrails
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md
Stream agent responses to the client using `stream_agent_response`. This example shows how to catch `InputGuardrailTripwireTriggered` and `OutputGuardrailTripwireTriggered` exceptions and yield appropriate `ErrorEvent`s.
```python
from agents import (
InputGuardrailTripwireTriggered,
OutputGuardrailTripwireTriggered,
Runner,
)
from chatkit.agents import stream_agent_response
from chatkit.types import ErrorEvent
result = Runner.run_streamed(
assistant_agent,
input_items,
context=agent_context,
)
try:
async for event in stream_agent_response(agent_context, result):
yield event
except InputGuardrailTripwireTriggered:
yield ErrorEvent(message="We blocked that message for safety.")
except OutputGuardrailTripwireTriggered:
yield ErrorEvent(
message="The assistant response was blocked.",
allow_retry=False,
)
```
--------------------------------
### Implement ChatKitServer Transcription
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/accept-rich-user-input.md
Implement the transcribe method in ChatKitServer to handle audio input from the client and return a TranscriptionResult. This example uses the OpenAI Audio API for transcription.
```python
async def transcribe(self, audio_input: AudioInput, context: RequestContext) -> TranscriptionResult:
ext = {
"audio/webm": "webm",
"audio/mp4": "m4a",
"audio/ogg": "ogg",
}.get(audio_input.media_type)
if not ext:
raise HTTPException(status_code=400, detail="Unexpected audio format")
audio_file = io.BytesIO(audio_input.data)
audio_file.name = f"audio.{ext}"
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=audio_file
)
return TranscriptionResult(text=transcription.text)
```
--------------------------------
### Build and Render a Widget
Source: https://context7.com/openai/chatkit-python/llms.txt
Load a widget template and build a widget with dynamic data. This is typically done once at startup.
```python
order_template = WidgetTemplate.from_file("widgets/order_status.widget")
class MyChatKitServer(ChatKitServer[dict]):
async def respond(self, thread, input_user_message, context):
order = await fetch_order(context["order_id"])
widget = order_template.build(OrderData(
order_id=order.id,
status=order.status,
items=order.line_items,
))
yield ThreadItemDoneEvent(
item=WidgetItem(
id=self.store.generate_item_id("message", thread, context),
thread_id=thread.id,
created_at=datetime.now(),
widget=widget,
copy_text=f"Order {order.id}: {order.status}",
)
)
```
--------------------------------
### Load and build widget templates
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Load a `.widget` file using `WidgetTemplate.from_file` and hydrate it with runtime data using `.build()`. Placeholders in the template are rendered before streaming.
```python
from chatkit.widgets import WidgetTemplate
message_template = WidgetTemplate.from_file("widgets/channel_message.widget")
def build_message_widget(user_name: str, message: str):
# Replace this helper with whatever your integration uses to build widgets.
return message_template.build(
{
"user_name": user_name,
"message": message,
}
)
```
--------------------------------
### Get ChatKit Commands with useChatKit
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-your-app-draft-and-send-messages.md
Destructure commands like `setComposerValue` and `sendUserMessage` from `useChatKit` alongside the `control` object. These commands are safe to call when ChatKit is not loading or streaming.
```tsx
import {ChatKit, useChatKit} from "@openai/chatkit-react";
export function Inbox() {
const {
control,
setComposerValue,
sendUserMessage,
setThreadId,
} = useChatKit({
// ... your normal options (api, history, composer, etc.)
});
return ;
}
```
--------------------------------
### Passing Request Context to Server Process
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md
Demonstrates how to construct a request context and pass it to the server.process method, ensuring it flows into subsequent operations.
```python
context = MyRequestContext(user_id="abc123")
result = await server.process(await request.body(), context)
```
--------------------------------
### Enable Partial Image Streaming in Tool Config
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/stream-generated-images.md
Configure the `ImageGenerationTool` to enable streaming of partial images by setting the `partial_images` parameter in its `tool_config`. This allows users to see progressive previews.
```python
from agents.tool import ImageGenerationTool
image_tool = ImageGenerationTool(
tool_config={"type": "image_generation", "partial_images": 3},
)
```
--------------------------------
### Two-Phase Upload Client Configuration
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/accept-rich-user-input.md
Configure the client for a two-phase upload strategy, where the client first creates attachment metadata and then uploads the file bytes.
```javascript
{
type: "two_phase",
}
```
--------------------------------
### Direct Upload Client Configuration
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/accept-rich-user-input.md
Configure the client for direct uploads by specifying the upload strategy and the URL for handling file uploads.
```javascript
{
type: "direct",
uploadUrl: "/files",
}
```
--------------------------------
### Mirror ChatKit State in React with useChatKit
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/keep-your-app-in-sync-with-chatkit.md
Use the `useChatKit` hook to subscribe to ChatKit's lifecycle events and update your React component's state accordingly. This example demonstrates how to manage thread changes and loading/responding states to control UI elements.
```tsx
import {ChatKit, useChatKit} from "@openai/chatkit-react";
export function Inbox({clientToken}: { clientToken: string }) {
const {
control,
sendUserMessage,
focusComposer,
setThreadId,
} = useChatKit({
// ... your normal options (api, history, composer, etc.)
onThreadChange: ({threadId}) => setActiveThread(threadId),
onThreadLoadStart: () => setIsLoading(true),
onThreadLoadEnd: () => setIsLoading(false),
onResponseStart: () => setIsResponding(true),
onResponseEnd: () => setIsResponding(false),
});
const isBusy = isLoading || isResponding;
return (
<>
!isBusy && setThreadId(undefined)}
onFocusComposer={() => !isBusy && focusComposer()}
onSendQuickMessage={(text) =>
!isBusy && sendUserMessage({text})
}
/>
>
);
}
```
--------------------------------
### Server Tool to Fetch App Context
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/pass-extra-app-context-to-your-model.md
Implement a server-side tool that fetches application-specific context, like workspace details, based on request context. This allows the model to access dynamic information.
```python
@function_tool(description_override="Fetch the user's workspace context.")
async def get_workspace_context(ctx: RunContextWrapper[AgentContext]):
workspace = await load_workspace(ctx.context.request_context.org_id)
return {
"workspace_id": workspace.id,
"features": workspace.feature_flags,
}
```
--------------------------------
### Route to different tools or agents based on tool choice
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-users-pick-tools-and-models.md
Dynamically select an agent or tool set on the server based on the `tool_choice` ID received from the client's inference options. This allows for specialized handling of different user intents.
```python
if tool_choice == "summarize":
agent = summarization_agent
elif tool_choice == "search_tickets":
agent = ticket_search_agent
else:
agent = default_agent
result = Runner.run_streamed(agent, input_items, context=agent_context)
```
--------------------------------
### Configure tools in the composer UI
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-users-pick-tools-and-models.md
Define the tools available in the composer's tool menu by providing an array of tool configurations when initializing ChatKit on the client. Each tool needs a unique `id`, an `icon`, and a `label`.
```typescript
const chatkit = useChatKit({
// ...
composer: {
tools: [
{
id: "summarize",
icon: "book-open",
label: "Summarize",
placeholderOverride: "Summarize the current page or document.",
},
{
id: "search_tickets",
icon: "search",
label: "Search tickets",
shortLabel: "Search",
placeholderOverride: "Search support tickets for similar issues.",
},
],
},
});
```
--------------------------------
### WidgetTemplate for File-Based Widget Authoring
Source: https://context7.com/openai/chatkit-python/llms.txt
Loads `.widget` files (JSON + Jinja2 template) authored at widgets.chatkit.studio and renders them with data. The preferred way to produce `WidgetRoot` objects.
```python
from chatkit.widgets import WidgetTemplate
from chatkit.types import ThreadItemDoneEvent, WidgetItem
from pydantic import BaseModel
from datetime import datetime
class OrderData(BaseModel):
order_id: str
status: str
items: list[str]
```
--------------------------------
### Minimal ChatKit Python Server
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Create a minimal Python server using `FastAPI` and `ChatKitServer` that responds with a fixed message. This serves as a basic endpoint for the React UI.
```python
# Other imports omitted for brevity; see the starter repo for a runnable file with all imports.
from chatkit.server import ChatKitServer
app = FastAPI()
class MyChatKitServer(ChatKitServer[dict]):
async def respond(
self,
thread: ThreadMetadata,
input_user_message: UserMessageItem | None,
context: dict,
) -> AsyncIterator[ThreadStreamEvent]:
# Streams a fixed "Hello, world!" assistant message
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
thread_id=thread.id,
id=self.store.generate_item_id("message", thread, context),
created_at=datetime.now(),
content=[AssistantMessageContent(text="Hello, world!")],
),
)
# Create your server by passing a store implementation.
# MyChatKitStore is defined in the next section.
server = MyChatKitServer(store=MyChatKitStore())
@app.post("/chatkit")
async def chatkit(request: Request):
result = await server.process(await request.body(), context={})
if isinstance(result, StreamingResult):
return StreamingResponse(result, media_type="text/event-stream")
return Response(content=result.json, media_type="application/json")
```
--------------------------------
### Pair Mentions with Retrieval Tool Calls
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/accept-rich-user-input.md
Configure your system prompt to instruct the assistant to call a retrieval tool when it encounters an `ARTICLE_TAG`. Ensure that the `tag_to_message_content` method includes the necessary ID for the tool to fetch details.
```python
from agents import Agent, StopAtTools, RunContextWrapper, function_tool
from chatkit.agents import AgentContext
@function_tool(description_override="Fetch full article content by id.")
async def fetch_article(ctx: RunContextWrapper[AgentContext], article_id: str):
article = await load_article_content(article_id)
return {
"title": article.title,
"content": article.body,
"url": article.url,
}
assistant = Agent[AgentContext](
...,
tools=[fetch_article],
)
```
--------------------------------
### Configure ChatKit Client with Domain Key
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/prepare-your-app-for-production.md
Configure the ChatKit client options with your backend API URL and the domain key obtained from the OpenAI domain allowlist. This ensures that only authorized hostnames can embed your ChatKit experience.
```javascript
const options = {
api: {
url: "https://your-domain.com/api/chatkit",
// Copy this value from the domain allowlist entry.
domainKey: "your-domain-key",
},
};
```
--------------------------------
### Enable Attachments in Client
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/accept-rich-user-input.md
Turn on attachments in the composer and configure client-side limits for accepted MIME types, count, and size.
```typescript
const chatkit = useChatKit({
// ...
composer: {
attachments: {
enabled: true,
// configure accepted MIME types, count, and size limits here
},
},
});
```
--------------------------------
### Stream a widget from a tool
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Tools can enqueue widgets using `AgentContext.stream_widget`, which are then forwarded to the client by `stream_agent_response`.
```python
from agents import RunContextWrapper, function_tool
from chatkit.agents import AgentContext
@function_tool(description_override="Display a sample widget to the user.")
async def sample_widget(ctx: RunContextWrapper[AgentContext]):
message_widget = build_message_widget(...)
await ctx.context.stream_widget(message_widget)
```
--------------------------------
### Configure model picker in the composer UI
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-users-pick-tools-and-models.md
Expose a selection of models to the user in the composer by configuring the `models` array during ChatKit initialization. Each model can have an `id`, `label`, `description`, and a `default` flag.
```typescript
const chatkit = useChatKit({
// ...
composer: {
models: [
{
id: "gpt-4.1-mini",
label: "Fast",
description: "Answers right away",
},
{
id: "gpt-4.1",
label: "Quality",
description: "All rounder"
default: true,
},
],
},
});
```
--------------------------------
### Configure ChatKit React App
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Wire up a minimal React app using the `useChatKit` hook. Point `api.url` to your ChatKit server endpoint and provide the domain key.
```tsx
import {ChatKit, useChatKit} from "@openai/chatkit-react";
export function App() {
const chatkit = useChatKit({
api: {
url: "http://localhost:8000/chatkit",
domainKey: "local-dev", // domain keys are optional in dev
},
});
return ;
}
```
--------------------------------
### Two-Phase Upload Attachment Store Implementation
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/accept-rich-user-input.md
Implement the AttachmentStore for a two-phase upload strategy, generating attachment IDs, issuing multipart upload URLs, persisting metadata, and handling deletions.
```python
attachment_store = BlobAttachmentStore()
server = MyChatKitServer(store=data_store, attachment_store=attachment_store)
class BlobAttachmentStore(AttachmentStore[RequestContext]):
def generate_attachment_id(self, mime_type: str, context: RequestContext) -> str:
return f"att_{uuid4().hex}"
async def create_attachment(
self,
input: AttachmentCreateParams,
context: RequestContext
) -> Attachment:
att_id = self.generate_attachment_id(input.mime_type, context)
upload_url = issue_multipart_upload_url(att_id, input.mime_type) # your blob store
attachment = Attachment(
id=att_id,
mime_type=input.mime_type,
name=input.name,
upload_url=upload_url,
)
await data_store.save_attachment(attachment, context=context)
return attachment
async def delete_attachment(self, attachment_id: str, context: RequestContext) -> None:
await delete_blob(att_id=attachment_id) # your blob store
```
--------------------------------
### Inject Extra Context as Model Input
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/pass-extra-app-context-to-your-model.md
Prepend a structured context item to the input items before running the agent. This allows direct injection of app/user state into the model's input.
```python
from openai.types.responses import ResponseInputTextParam
extra_context = ResponseInputTextParam(
type="input_text",
text=(
"\n"
f"user_id: {context.user_id}\n"
f"org_id: {context.org_id}\n"
f"plan: {context.plan}\n"
""
),
)
input_items = [extra_context, *input_items]
```
--------------------------------
### Streaming Events from Server Tools
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md
Stream progress updates from a server tool using `ctx.context.stream` within the tool function. This allows the client to receive real-time updates during tool execution.
```python
from agents import RunContextWrapper, function_tool
from chatkit.agents import AgentContext
from chatkit.types import ProgressUpdateEvent
@function_tool()
async def load_document(ctx: RunContextWrapper[AgentContext], document_id: str):
await ctx.context.stream(ProgressUpdateEvent(icon="document", text="Loading document..."))
return await get_document_by_id(document_id)
```
--------------------------------
### Implement `ChatKitServer.sync_action` for Instant Widget Updates
Source: https://context7.com/openai/chatkit-python/llms.txt
Override this method for instant, single-item updates from widget actions without a full stream. It returns a `SyncCustomActionResponse`.
```python
from chatkit.actions import Action
from chatkit.types import (
SyncCustomActionResponse, ThreadItemUpdatedEvent, WidgetItem, WidgetRootUpdated,
)
from chatkit.widgets import Card, Text
class MyChatKitServer(ChatKitServer[dict]):
async def sync_action(
self,
thread: ThreadMetadata,
action: Action[str, Any],
sender: WidgetItem | None,
context: dict,
) -> SyncCustomActionResponse:
# action.type identifies which button was clicked
# action.payload carries form data
if action.type == "confirm_booking" and sender:
updated_widget = Card(children=[Text(value=f"Booked! ref={action.payload['ref']}")])
return SyncCustomActionResponse(
updated_item=sender.model_copy(update={"widget": updated_widget})
)
return SyncCustomActionResponse()
```
--------------------------------
### Read tool and model choices on the server
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-users-pick-tools-and-models.md
Access the user's selected tool and model from `UserMessageItem.inference_options` within the `respond` method of your `ChatKitServer`. Fallback to default values if options are not provided.
```python
from chatkit.types import InferenceOptions
class MyChatKitServer(ChatKitServer[RequestContext]):
async def respond(
self,
thread: ThreadMetadata,
input_user_message: UserMessageItem | None,
context: RequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
options = input_user_message and input_user_message.inference_options
model = options.model if options and options.model else "gpt-4.1-mini"
tool_choice = options.tool_choice.id if options and options.tool_choice else None
# Use `model` and `tool_choice` when building your model request...
```
--------------------------------
### Prepare Model Input for Agent
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md
Converts loaded thread items into a format suitable for agent input. Use `simple_to_agent_input` for default conversion or subclass `ThreadItemConverter` for custom logic, especially for handling attachments or tags.
```python
from agents import Runner
from chatkit.agents import AgentContext, ThreadItemConverter, simple_to_agent_input
# Option A (defaults):
input_items = await simple_to_agent_input(items)
# Option B (your integration-specific converter):
# input_items = await MyThreadItemConverter().to_agent_input(items)
agent_context = AgentContext(
thread=thread,
store=self.store,
request_context=context,
)
```
--------------------------------
### Generate Model Responses with ChatKit Agents SDK
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Implement a ChatKit server to generate dynamic responses using an OpenAI agent. Ensure OPENAI_API_KEY is set in your environment. This code converts thread items to agent input and streams the agent's response back as ChatKit events.
```python
from agents import Agent, Runner
from chatkit.agents import AgentContext, simple_to_agent_input, stream_agent_response
assistant = Agent(
name="assistant",
instructions="You are a helpful assistant.",
model="gpt-4.1-mini",
)
class MyChatKitServer(ChatKitServer[dict]):
async def respond(
self,
thread: ThreadMetadata,
input_user_message: UserMessageItem | None,
context: dict,
) -> AsyncIterator[ThreadStreamEvent]:
# Convert recent thread items (which includes the user message) to model input
items_page = await self.store.load_thread_items(
thread.id,
after=None,
limit=20,
order="asc",
context=context,
)
input_items = await simple_to_agent_input(items_page.data)
# Stream the run through ChatKit events
agent_context = AgentContext(thread=thread, store=self.store, request_context=context)
result = Runner.run_streamed(assistant, input_items, context=agent_context)
async for event in stream_agent_response(agent_context, result):
yield event
```
--------------------------------
### Implement PostgresStore for ChatKit Persistence
Source: https://context7.com/openai/chatkit-python/llms.txt
This class implements the `Store` interface for persisting threads and items using PostgreSQL. It includes methods for generating thread IDs, loading and saving threads, and adding thread items. Ensure your database schema matches the expected table structures.
```python
import json, uuid
from chatkit.store import Store, NotFoundError
from chatkit.types import Attachment, Page, ThreadItem, ThreadMetadata
class PostgresStore(Store[dict]):
def __init__(self, pool):
self.pool = pool
def generate_thread_id(self, context) -> str:
return f"thr_{uuid.uuid4().hex[:12]}"
async def load_thread(self, thread_id: str, context: dict) -> ThreadMetadata:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT data FROM threads WHERE id=$1 AND user_id=$2",
thread_id, context["user_id"]
)
if not row:
raise NotFoundError(f"Thread {thread_id} not found")
return ThreadMetadata.model_validate(json.loads(row["data"]))
async def save_thread(self, thread: ThreadMetadata, context: dict) -> None:
payload = thread.model_dump_json()
async with self.pool.acquire() as conn:
await conn.execute(
"""INSERT INTO threads (id, user_id, data) VALUES ($1,$2,$3)
ON CONFLICT (id) DO UPDATE SET data=excluded.data""",
thread.id, context["user_id"], payload
)
async def load_thread_items(self, thread_id, after, limit, order, context) -> Page[ThreadItem]:
# Implement with pagination using `after` as a cursor on item id
...
async def add_thread_item(self, thread_id, item: ThreadItem, context) -> None:
payload = item.model_dump_json()
async with self.pool.acquire() as conn:
await conn.execute(
"INSERT INTO items (id, thread_id, data) VALUES ($1,$2,$3)",
item.id, thread_id, payload
)
# Implement remaining abstract methods: save_item, load_item, delete_thread,
# delete_thread_item, load_threads, save_attachment, load_attachment, delete_attachment
```
--------------------------------
### Define Tool to Fetch Full Details
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/pass-extra-app-context-to-your-model.md
Implement a server or client tool that can retrieve complete details for a given ID. This function is designed to be called by the model when more information is required.
```python
@function_tool(description_override="Fetch full workspace details.")
async def fetch_workspace(ctx: RunContextWrapper[AgentContext], id: str):
workspace = await load_workspace(id)
return {
"id": workspace.id,
"features": workspace.feature_flags,
"limits": workspace.limits,
}
```
--------------------------------
### Control Widget Loading Behavior
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Use `loadingBehavior` to specify how actions trigger loading states within a widget. Recommended to use `auto` for adaptive behavior.
```jsx
```
--------------------------------
### Load ChatKit.js in HTML
Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md
Include the ChatKit.js script in your index.html file to enable ChatKit functionality in the browser.
```html
```
--------------------------------
### Manually Attach File and URL Annotations
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/add-annotations.md
Manually attach annotations to `AssistantMessageContent` by creating `Annotation` objects with `FileSource` or `URLSource` and specifying the character index for placement.
```python
from datetime import datetime
from chatkit.types import (
Annotation,
AssistantMessageContent,
AssistantMessageItem,
FileSource,
ThreadItemDoneEvent,
URLSource,
)
text = "Quarterly revenue grew 12% year over year."
antations = [
Annotation(
source=FileSource(filename="q1_report.pdf", title="Q1 Report"),
index=len(text) - 1, # attach near the end of the sentence
),
Annotation(
source=URLSource(
url="https://example.com/press-release",
title="Press release",
),
index=len(text) - 1,
),
]
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=self.store.generate_item_id("message", thread, context),
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=text, annotations=annotations)],
)
)
```
--------------------------------
### Implement Client Tool Callback
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/pass-extra-app-context-to-your-model.md
Implement the `onClientTool` callback in `useChatKit` to handle client-side tool calls. This function retrieves specific client state, like canvas selection, and returns it.
```ts
const chatkit = useChatKit({
// ...
onClientTool: async ({name, params}) => {
if (name === "get_canvas_selection") {
const selection = myCanvas.getSelection();
return {
nodes: selection.map((node) => {
name: node.name,
description: node.description,
}),
};
}
},
});
```
--------------------------------
### Define and Use Typed Widget Actions
Source: https://context7.com/openai/chatkit-python/llms.txt
Define typed actions using `Action` and `ActionConfig` for widget buttons. Handle dispatched actions in the `action` handler by checking the `action.type`.
```python
from typing import Literal
from chatkit.actions import Action
from chatkit.widgets import Card, Button, Col, Text
class BookAppointment(Action[Literal["book_appointment"], dict]):
pass
class CancelAppointment(Action[Literal["cancel_appointment"], dict]):
pass
# In respond() — build a widget with action buttons:
widget = Card(
children=[
Col(children=[
Text(value="Book or cancel your appointment"),
Button(
label="Confirm",
style="primary",
onClickAction=BookAppointment.create({"slot_id": "2024-12-01T10:00"}),
),
Button(
label="Cancel",
style="secondary",
onClickAction=CancelAppointment.create({"reason": "user_request"}),
),
])
]
)
# In action() handler — dispatch on type:
class MyChatKitServer(ChatKitServer[dict]):
async def action(self, thread, action, sender, context):
if action.type == "book_appointment":
slot_id = action.payload["slot_id"]
await schedule_appointment(slot_id)
yield ThreadItemDoneEvent(item=AssistantMessageItem(
thread_id=thread.id,
id=self.store.generate_item_id("message", thread, context),
created_at=datetime.now(),
content=[AssistantMessageContent(text=f"Booked slot {slot_id}!")],
))
```
--------------------------------
### Streaming Text into a Widget with accumulate_text
Source: https://context7.com/openai/chatkit-python/llms.txt
Shows how to use `accumulate_text` to stream model output directly into a widget. This requires setting up an `Agent` and `Runner` to stream events, then passing the stream and a base `Markdown` widget to `accumulate_text`. The resulting generator is then streamed to the client using `stream_widget`.
```python
from agents import Agent, Runner
from chatkit.agents import AgentContext, accumulate_text
from chatkit.server import stream_widget
from chatkit.widgets import Markdown
summary_agent = Agent(name="summariser", instructions="Summarise the text.", model="gpt-4.1-mini")
class MyChatKitServer(ChatKitServer[dict]):
async def respond(self, thread, input_user_message, context):
agent_ctx = AgentContext(thread=thread, store=self.store, request_context=context)
result = Runner.run_streamed(summary_agent, [{"role": "user", "content": "Summarise X"}],
context=agent_ctx)
base = Markdown(id="summary", value="", streaming=True)
widget_gen = accumulate_text(result.stream_events(), base)
async for event in stream_widget(thread, widget_gen):
yield event
# Streams incremental markdown updates inside a widget item
# until the run completes, then marks streaming=False
```
--------------------------------
### Define a Client-Handled Action
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md
Specify 'handler: "client"' to route actions to the frontend's 'widgets.onAction' instead of the server. Use this for immediate UI updates or chaining client-side logic.
```jsx
```
--------------------------------
### Stream Client Effects from Tools
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/update-client-during-response.md
Use `AgentContext.stream` within a tool to send `ClientEffectEvent`. This allows triggering UI actions like highlighting text without creating thread items or pausing the model stream.
```python
from agents import RunContextWrapper, function_tool
from chatkit.agents import AgentContext
from chatkit.types import ClientEffectEvent
@function_tool()
async def highlight_text(ctx: RunContextWrapper[AgentContext], index: int, length: int):
await ctx.context.stream(
ClientEffectEvent(
name="highlight_text",
data={"index": index, "length": length},
)
)
```
--------------------------------
### Add Image Generation Tool to Agent
Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/stream-generated-images.md
Integrate the ImageGenerationTool into your agent's tool list to enable image generation capabilities. This allows the model to respond to requests by creating images.
```python
from agents import Agent
from agents.tool import ImageGenerationTool
agent = Agent(
name="designer",
instructions="Generate images when asked.",
tools=[ImageGenerationTool(tool_config={"type": "image_generation"})],
)
```
--------------------------------
### ChatKitServer.process
Source: https://context7.com/openai/chatkit-python/llms.txt
This method acts as the request dispatcher. It parses raw JSON bytes into a typed `ChatKitReq`, routes the request to the appropriate handler, and returns either a `StreamingResult` (an async iterable of SSE byte chunks) or a `NonStreamingResult` (raw JSON bytes). This is the main method to be called from your HTTP handler.
```APIDOC
## `ChatKitServer.process` — request dispatcher
Parses raw JSON bytes into a typed `ChatKitReq`, routes to the correct handler, and returns either a `StreamingResult` (async iterable of SSE byte chunks) or a `NonStreamingResult` (raw JSON bytes). This is the only method you call from your HTTP handler.
```python
from chatkit.server import StreamingResult, NonStreamingResult
@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
body = await request.body()
result = await server.process(body, context={"user_id": "u_123"})
if isinstance(result, StreamingResult):
# Each chunk is: b"data: \n\n"
return StreamingResponse(result, media_type="text/event-stream")
# Non-streaming: bytes of a JSON object
return Response(content=result.json, media_type="application/json")
```
```
--------------------------------
### Use `ChatKitServer.process` to Dispatch Requests
Source: https://context7.com/openai/chatkit-python/llms.txt
This method parses incoming JSON requests, routes them to the appropriate handler, and returns either a `StreamingResult` or `NonStreamingResult`. It's the main entry point for your HTTP handler.
```python
from chatkit.server import StreamingResult, NonStreamingResult
@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
body = await request.body()
result = await server.process(body, context={"user_id": "u_123"})
if isinstance(result, StreamingResult):
# Each chunk is: b"data: \n\n"
return StreamingResponse(result, media_type="text/event-stream")
# Non-streaming: bytes of a JSON object
return Response(content=result.json, media_type="application/json")
```