### Clone and Run Starter App Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md Clone the starter app repository and install dependencies to get a basic ChatKit app running. ```sh git clone https://github.com/openai/openai-chatkit-starter-app.git cd openai-chatkit-starter-app/chatkit npm install npm run dev ``` -------------------------------- ### Install ChatKit Python Package Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md Install the Python package for ChatKit to set up the server-side component. ```sh pip install openai-chatkit ``` -------------------------------- ### Install ChatKit React Bindings Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md Install the necessary npm package for integrating ChatKit with a React application. ```sh npm install @openai/chatkit-react ``` -------------------------------- ### Postgres Store Implementation for ChatKit Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md Example implementation of a `Store` interface using PostgreSQL to manage threads and items. It includes schema initialization and methods for loading and saving thread metadata. ```python class MyPostgresStore(Store[RequestContext]): def __init__(self, conninfo: str) -> None: self._conninfo = conninfo self._init_schema() def _init_schema(self) -> None: with self._connection() as conn, conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS threads ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL, data JSONB NOT NULL ); """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS items ( id TEXT PRIMARY KEY, thread_id TEXT NOT NULL REFERENCES threads (id) ON DELETE CASCADE, user_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL, data JSONB NOT NULL ); """ ) conn.commit() async def load_thread( self, thread_id: str, context: RequestContext ) -> ThreadMetadata: with self._connection() as conn, conn.cursor(row_factory=tuple_row) as cur: cur.execute( "SELECT data FROM threads WHERE id = %s AND user_id = %s", (thread_id, context.user_id), ) row = cur.fetchone() if row is None: raise NotFoundError(f"Thread {thread_id} not found") return ThreadMetadata.model_validate(row[0]) async def save_thread( self, thread: ThreadMetadata, context: RequestContext ) -> None: payload = thread.model_dump(mode="json") with self._connection() as conn, conn.cursor() as cur: cur.execute( """ INSERT INTO threads (id, user_id, created_at, data) VALUES (%s, %s, %s, %s) """, (thread.id, context.user_id, thread.created_at, payload), ) conn.commit() # Implement the remaining Store methods following the same pattern. ``` -------------------------------- ### Implement In-Memory ChatKit Store Source: https://github.com/openai/chatkit-python/blob/main/docs/quickstart.md Provides an in-memory store implementation for ChatKit servers. This is useful for the quickstart to keep examples minimal and for testing. It omits persistence and attachment handling, which are required for production. ```python from collections import defaultdict from chatkit.store import NotFoundError, Store from chatkit.types import Attachment, Page, ThreadItem, ThreadMetadata class MyChatKitStore(Store[dict]): def __init__(self): self.threads: dict[str, ThreadMetadata] = {} self.items: dict[str, list[ThreadItem]] = defaultdict(list) async def load_thread(self, thread_id: str, context: dict) -> ThreadMetadata: if thread_id not in self.threads: raise NotFoundError(f"Thread {thread_id} not found") return self.threads[thread_id] async def save_thread(self, thread: ThreadMetadata, context: dict) -> None: self.threads[thread.id] = thread async def load_threads( self, limit: int, after: str | None, order: str, context: dict ) -> Page[ThreadMetadata]: threads = list(self.threads.values()) return self._paginate( threads, after, limit, order, sort_key=lambda t: t.created_at, cursor_key=lambda t: t.id ) async def load_thread_items( self, thread_id: str, after: str | None, limit: int, order: str, context: dict ) -> Page[ThreadItem]: items = self.items.get(thread_id, []) return self._paginate( items, after, limit, order, sort_key=lambda i: i.created_at, cursor_key=lambda i: i.id ) async def add_thread_item( self, thread_id: str, item: ThreadItem, context: dict ) -> None: self.items[thread_id].append(item) async def save_item( self, thread_id: str, item: ThreadItem, context: dict ) -> None: items = self.items[thread_id] for idx, existing in enumerate(items): if existing.id == item.id: items[idx] = item return items.append(item) async def load_item( self, thread_id: str, item_id: str, context: dict ) -> ThreadItem: for item in self.items.get(thread_id, []): if item.id == item_id: return item raise NotFoundError(f"Item {item_id} not found in thread {thread_id}") async def delete_thread(self, thread_id: str, context: dict) -> None: self.threads.pop(thread_id, None) self.items.pop(thread_id, None) async def delete_thread_item( self, thread_id: str, item_id: str, context: dict ) -> None: self.items[thread_id] = [ item for item in self.items.get(thread_id, []) if item.id != item_id ] def _paginate(self, rows: list, after: str | None, limit: int, order: str, sort_key, cursor_key): sorted_rows = sorted(rows, key=sort_key, reverse=order == "desc") start = 0 if after: for idx, row in enumerate(sorted_rows): if cursor_key(row) == after: start = idx + 1 break data = sorted_rows[start : start + limit] has_more = start + limit < len(sorted_rows) next_after = cursor_key(data[-1]) if has_more and data else None return Page(data=data, has_more=has_more, after=next_after) # Attachments are intentionally not implemented for the quickstart async def save_attachment( self, attachment: Attachment, context: dict ) -> None: raise NotImplementedError() async def load_attachment( self, attachment_id: str, context: dict ) -> Attachment: raise NotImplementedError() async def delete_attachment(self, attachment_id: str, context: dict) -> None: raise NotImplementedError() ``` -------------------------------- ### Handle Actions on the Client Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md Provide the 'widgets.onAction' callback when initializing ChatKit on the client. This example shows handling a 'save_profile' action and optionally forwarding a follow-up action to the server. ```ts const chatkit = useChatKit({ // ... widgets: { onAction: async (action, widgetItem) => { if (action.type === "save_profile") { const result = await saveProfile(action.payload); // Optionally invoke a server action after client-side work completes. await chatkit.sendCustomAction( { type: "save_profile_complete", payload: {...result, user_id: action.payload.user_id}, }, widgetItem.id, ); } }, }, }); ``` -------------------------------- ### Implement Localization with gettext Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/prepare-your-app-for-production.md This example demonstrates how to use the `gettext` library to load translations based on the provided locale and translate strings for UI elements and error messages within a function tool. ```python from pathlib import Path import gettext from agents import RunContextWrapper, function_tool from chatkit.agents import AgentContext LOCALE_DIR = Path(__file__).with_suffix("").parent / "locales" _translations: dict[str, gettext.NullTranslations] = {} def get_translations(locale: str) -> gettext.NullTranslations: """Return a gettext translation object for the given locale.""" if locale not in _translations: _translations[locale] = gettext.translation( "messages", # your .po/.mo domain localedir=LOCALE_DIR, languages=[locale], fallback=True, ) return _translations[locale] @function_tool() async def load_document( ctx: RunContextWrapper[AgentContext], document_id: str, ): locale = ctx.context.request_context.locale _ = get_translations(locale).gettext await ctx.context.stream_progress( icon="document", text=_("Loading document…"), ) doc = await get_document_by_id(document_id) if not doc: raise ValueError(_("We couldn’t find that document.")) return doc ``` -------------------------------- ### Send Messages with sendUserMessage Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/let-your-app-draft-and-send-messages.md Initiate a chat turn directly using `sendUserMessage`. This example shows sending a canned support message and starting a new thread. ```tsx export function Inbox() { const { control, sendUserMessage, setThreadId, } = useChatKit({ // ... }); const handleHelpClick = () => { // Send a canned message from a fresh thread sendUserMessage({text: "I need help with my billing.", newThread: true}); }; return ( <> ); } ``` -------------------------------- ### ChatKitServer and FastAPI Endpoint Source: https://context7.com/openai/chatkit-python/llms.txt This example demonstrates how to implement a minimal ChatKit server using an in-memory store and expose it via a FastAPI endpoint. The `MyChatKitServer` subclass overrides the `respond` method to provide a simple greeting. The FastAPI app handles incoming requests, processes them using the server, and returns either a streaming response for SSE or a JSON response. ```APIDOC ## POST /chatkit ### Description Handles incoming chat requests, processes them using the ChatKit server, and returns either a streaming response (Server-Sent Events) for ongoing conversations or a JSON response for completed ones. ### Method POST ### Endpoint /chatkit ### Request Body - **(bytes)** - Required - Raw JSON payload representing the chat request. ### Response #### Success Response (200 OK) - **(StreamingResponse)** - For streaming responses (SSE). - **(Response)** - For non-streaming JSON responses. ### Request Example ```python # Example of sending a request to the /chatkit endpoint # This would typically be handled by the ChatKit.js frontend ``` ### Response Example #### Streaming Response (SSE) ``` event: thread_item_created data: {"type": "assistant_message", "thread_id": "...", "id": "...", "created_at": "...", "content": [{"type": "text", "text": "Hello from ChatKit!"}]} event: thread_item_done data: {"type": "thread_item_done", "item_id": "..."} ``` #### JSON Response ```json { "response": "..." } ``` ``` -------------------------------- ### Implement ChatKitServer Respond Method Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/respond-to-user-message.md Subclass ChatKitServer and implement the respond method to yield streaming events for a user's turn. This example provides a basic text response. ```python from collections.abc import AsyncIterator from datetime import datetime from chatkit.server import ChatKitServer from chatkit.types import ( AssistantMessageContent, AssistantMessageItem, ThreadItemDoneEvent, ThreadMetadata, ThreadStreamEvent, UserMessageItem, ) class MyChatKitServer(ChatKitServer[MyRequestContext]): async def respond( self, thread: ThreadMetadata, input: UserMessageItem | None, context: MyRequestContext, ) -> AsyncIterator[ThreadStreamEvent]: # Replace this with your inference pipeline. yield ThreadItemDoneEvent( item=AssistantMessageItem( thread_id=thread.id, id=self.store.generate_item_id("message", thread, context), created_at=datetime.now(), content=[AssistantMessageContent(text="Hi there!")], ) ) ``` -------------------------------- ### Handle Actions on the Server Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md Implement the 'ChatKitServer.action' method to process incoming actions. This example handles a 'send_message' action, stores it as a hidden context item, and streams an updated widget. ```python from datetime import datetime from chatkit.server import ChatKitServer, stream_widget from chatkit.types import HiddenContextItem, WidgetItem class MyChatKitServer(ChatKitServer[RequestContext]): async def action(self, thread, action, sender, context): if action.type == "send_message": await send_to_chat(action.payload["text"]) # Record the user action so the model can see it on the next turn. hidden = HiddenContextItem( id="generated-item-id", thread_id=thread.id, created_at=datetime.now(), content=f"User sent message: {action.payload['text']}", ) # HiddenContextItems need to be manually saved because ChatKitServer # only auto-saves streamed items, and HiddenContextItem should never be streamed to the client. await self.store.add_thread_item(thread.id, hidden, context) # Stream an updated widget back to the client. updated_widget = build_message_widget(text=action.payload["text"]) async for event in stream_widget( thread, updated_widget, generate_id=lambda item_type: self.store.generate_item_id( item_type, thread, context ), ): yield event ``` -------------------------------- ### Define a Button with an onClickAction Source: https://github.com/openai/chatkit-python/blob/main/docs/guides/build-interactive-responses-with-widgets.md Configure actions directly in the widget definition using components like Button.onClickAction. This example shows a button that triggers a 'send_message' action. ```jsx