# vk-teams-async-bot
vk-teams-async-bot is an async Python framework for building bots on the VK Teams platform. It provides a comprehensive wrapper around the VK Teams Bot API with 27 API methods, an event-driven architecture using long polling, a dispatcher with decorator-based handler registration, composable filters, finite state machine (FSM) support for multi-step dialogs, middleware hooks, and dependency injection. The framework requires Python 3.11+ and is built on aiohttp and Pydantic.
The framework follows a clean architecture with typed event models, automatic retry with exponential backoff, and graceful shutdown handling. It supports both in-memory storage for simple use cases and Redis storage for production deployments with session persistence across restarts. Events are processed in parallel with automatic serialization for the same user/chat combination when FSM storage is configured.
## Bot Initialization and Lifecycle
The `Bot` class is the main entry point that manages the HTTP session, polling loop, and lifecycle hooks. It must be used as an async context manager to properly initialize and clean up resources.
```python
import asyncio
import os
from vk_teams_async_bot import Bot, Dispatcher, NewMessageEvent
bot = Bot(
bot_token=os.environ["BOT_TOKEN"],
url="https://myteam.mail.ru", # VK Teams server URL
timeout=30, # HTTP request timeout (seconds)
poll_time=15, # Long polling timeout (seconds)
max_concurrent_handlers=100, # Max parallel handler executions
shutdown_timeout=30.0, # Graceful shutdown timeout (seconds)
)
dp = Dispatcher()
@bot.on_startup
async def on_start(bot: Bot):
info = await bot.get_self()
print(f"Bot started: {info.first_name} (@{info.nick})")
@bot.on_shutdown
async def on_stop(bot: Bot):
print("Bot stopped")
@dp.message()
async def echo(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text=event.text or "")
async def main():
async with bot:
await bot.start_polling(dp)
if __name__ == "__main__":
asyncio.run(main())
```
## Sending Text Messages
The `send_text` method sends a text message to a chat with optional formatting, keyboard, reply, or forward parameters.
```python
from vk_teams_async_bot import (
Bot, ParseMode, InlineKeyboardMarkup, KeyboardButton, StyleKeyboard
)
# Simple text message
result = await bot.send_text(
chat_id="user@example.com",
text="Hello, World!"
)
print(f"Message ID: {result.msg_id}")
# Markdown formatted message
await bot.send_text(
chat_id="user@example.com",
text="*bold* _italic_ `code`",
parse_mode=ParseMode.MARKDOWNV2
)
# HTML formatted message
await bot.send_text(
chat_id="user@example.com",
text="bold italic code",
parse_mode=ParseMode.HTML
)
# Message with inline keyboard
kb = InlineKeyboardMarkup(buttons_in_row=2)
kb.add(
KeyboardButton(text="Yes", callback_data="confirm_yes", style=StyleKeyboard.PRIMARY),
KeyboardButton(text="No", callback_data="confirm_no", style=StyleKeyboard.ATTENTION),
KeyboardButton(text="Documentation", url="https://teams.vk.com/botapi/"),
)
await bot.send_text(
chat_id="user@example.com",
text="Do you confirm?",
inline_keyboard_markup=kb
)
# Reply to a message
await bot.send_text(
chat_id="user@example.com",
text="This is a reply",
reply_msg_id="6752793278973034123"
)
# Forward a message
await bot.send_text(
chat_id="user@example.com",
text="Forwarded message",
forward_chat_id="other@example.com",
forward_msg_id="6752793278973034456"
)
```
## Sending and Downloading Files
The `send_file`, `send_voice`, `get_file_info`, and `download_file` methods handle file uploads and downloads.
```python
from pathlib import Path
# Send file from disk
result = await bot.send_file(
chat_id="user@example.com",
file="documents/report.pdf",
caption="Monthly report"
)
print(f"File ID: {result.file_id}")
# Send file from memory (tuple format)
file_content = b"Hello, this is file content"
result = await bot.send_file(
chat_id="user@example.com",
file=("hello.txt", file_content, "text/plain"),
caption="Dynamic file"
)
# Re-send previously uploaded file by ID
await bot.send_file(
chat_id="another_user@example.com",
file_id=result.file_id
)
# Send voice message
await bot.send_voice(
chat_id="user@example.com",
file="audio/message.ogg"
)
# Get file info and download
file_info = await bot.get_file_info(file_id="ABC123XYZ")
print(f"Filename: {file_info.filename}, Size: {file_info.size}")
file_data = await bot.download_file(file_info.url)
Path("downloaded_file.pdf").write_bytes(file_data)
```
## Editing and Deleting Messages
The `edit_text` and `delete_messages` methods modify or remove existing messages.
```python
# Edit message text
await bot.edit_text(
chat_id="user@example.com",
msg_id="6752793278973034123",
text="Updated message content"
)
# Edit message with new keyboard
kb = InlineKeyboardMarkup(buttons_in_row=1)
kb.add(KeyboardButton(text="Done", callback_data="done"))
await bot.edit_text(
chat_id="user@example.com",
msg_id="6752793278973034123",
text="Task completed!",
inline_keyboard_markup=kb
)
# Delete single message
await bot.delete_messages(
chat_id="user@example.com",
msg_id="6752793278973034123"
)
# Delete multiple messages
await bot.delete_messages(
chat_id="user@example.com",
msg_id=["6752793278973034123", "6752793278973034124", "6752793278973034125"]
)
```
## Handling Callback Queries
The `answer_callback_query` method responds to inline keyboard button presses, and the `@dp.callback_query()` decorator registers handlers for these events.
```python
from vk_teams_async_bot import (
Bot, Dispatcher, CallbackQueryEvent, CallbackDataFilter,
InlineKeyboardMarkup, KeyboardButton, StyleKeyboard
)
dp = Dispatcher()
@dp.callback_query(CallbackDataFilter("confirm_yes"))
async def on_confirm_yes(event: CallbackQueryEvent, bot: Bot):
# Show popup notification
await bot.answer_callback_query(
query_id=event.query_id,
text="Confirmed!",
show_alert=False
)
# Update the message
await bot.edit_text(
chat_id=event.chat.chat_id,
msg_id=event.message.msg_id,
text="You confirmed the action."
)
@dp.callback_query(CallbackDataFilter("confirm_no"))
async def on_confirm_no(event: CallbackQueryEvent, bot: Bot):
# Show alert dialog
await bot.answer_callback_query(
query_id=event.query_id,
text="Action cancelled",
show_alert=True
)
@dp.callback_query(CallbackDataFilter("open_url"))
async def on_open_url(event: CallbackQueryEvent, bot: Bot):
# Open URL in browser
await bot.answer_callback_query(
query_id=event.query_id,
url="https://example.com/details"
)
```
## Command Handlers
The `CommandFilter` and `@dp.command()` decorator provide convenient ways to handle bot commands.
```python
from vk_teams_async_bot import Bot, Dispatcher, NewMessageEvent, CommandFilter
dp = Dispatcher()
# Using @dp.command() shortcut
@dp.command("start")
async def cmd_start(event: NewMessageEvent, bot: Bot):
await bot.send_text(
chat_id=event.chat.chat_id,
text="Welcome! Use /help to see available commands."
)
@dp.command("help")
async def cmd_help(event: NewMessageEvent, bot: Bot):
help_text = """
Available commands:
/start - Start the bot
/help - Show this help
/settings - Bot settings
/status - Check status
"""
await bot.send_text(chat_id=event.chat.chat_id, text=help_text)
# Using CommandFilter explicitly with additional filters
from vk_teams_async_bot import ChatTypeFilter, ChatType
@dp.message(CommandFilter("admin"), ChatTypeFilter(ChatType.GROUP))
async def cmd_admin(event: NewMessageEvent, bot: Bot):
# This command only works in group chats
await bot.send_text(chat_id=event.chat.chat_id, text="Admin command executed")
```
## Filter Composition
Filters can be combined using `&` (AND), `|` (OR), and `~` (NOT) operators to create complex matching logic.
```python
from vk_teams_async_bot import (
Bot, Dispatcher, NewMessageEvent,
CommandFilter, RegexpFilter, TextFilter,
FileFilter, VoiceFilter, StickerFilter,
ChatTypeFilter, ChatType, FromUserFilter,
MentionFilter, ReplyFilter, ForwardFilter
)
dp = Dispatcher()
# Match files that are NOT voice messages
@dp.message(FileFilter() & ~VoiceFilter())
async def on_file_not_voice(event: NewMessageEvent, bot: Bot):
await bot.send_text(
chat_id=event.chat.chat_id,
text="Received a file (not voice)"
)
# Match regex pattern OR specific command
@dp.message(RegexpFilter(r"order\s+\d+") | CommandFilter("order"))
async def on_order(event: NewMessageEvent, bot: Bot):
await bot.send_text(
chat_id=event.chat.chat_id,
text=f"Processing order: {event.text}"
)
# Match text in private chats only
@dp.message(TextFilter() & ChatTypeFilter(ChatType.PRIVATE))
async def on_private_text(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text="Private message received")
# Match replies from specific user
@dp.message(ReplyFilter() & FromUserFilter("admin@example.com"))
async def on_admin_reply(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text="Admin replied")
# Match forwarded messages with mentions
@dp.message(ForwardFilter() & MentionFilter())
async def on_forwarded_mention(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text="Forwarded message with mention")
# Match stickers in group chats
@dp.message(StickerFilter() & ChatTypeFilter(ChatType.GROUP))
async def on_group_sticker(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text="Nice sticker!")
```
## Custom Filters
Create custom filters by inheriting from `FilterBase` and implementing the `__call__` method.
```python
from vk_teams_async_bot import FilterBase, NewMessageEvent, BaseEvent
class LongMessageFilter(FilterBase):
"""Match messages longer than a specified length."""
def __init__(self, min_length: int = 100):
self.min_length = min_length
def __call__(self, event: BaseEvent) -> bool:
if isinstance(event, NewMessageEvent) and event.text:
return len(event.text) >= self.min_length
return False
class KeywordFilter(FilterBase):
"""Match messages containing any of the specified keywords."""
def __init__(self, keywords: list[str], case_sensitive: bool = False):
self.keywords = keywords
self.case_sensitive = case_sensitive
def __call__(self, event: BaseEvent) -> bool:
if not isinstance(event, NewMessageEvent) or not event.text:
return False
text = event.text if self.case_sensitive else event.text.lower()
keywords = self.keywords if self.case_sensitive else [k.lower() for k in self.keywords]
return any(kw in text for kw in keywords)
# Usage
@dp.message(LongMessageFilter(200))
async def on_long_message(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text="That's a long message!")
@dp.message(KeywordFilter(["help", "support", "issue"]))
async def on_support_request(event: NewMessageEvent, bot: Bot):
await bot.send_text(chat_id=event.chat.chat_id, text="How can we help you?")
```
## Finite State Machine (FSM) for Multi-Step Dialogs
The FSM system enables multi-step conversational flows with state and data persistence per user/chat combination.
```python
from vk_teams_async_bot import (
Bot, Dispatcher, NewMessageEvent, CommandFilter,
FSMContext, MemoryStorage, State, StatesGroup, StateFilter
)
class OrderForm(StatesGroup):
waiting_name = State()
waiting_phone = State()
waiting_address = State()
waiting_confirm = State()
storage = MemoryStorage()
dp = Dispatcher(storage=storage)
@dp.command("order")
async def start_order(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext):
await fsm_context.set_state(OrderForm.waiting_name)
await bot.send_text(chat_id=event.chat.chat_id, text="Please enter your name:")
@dp.message(StateFilter(OrderForm.waiting_name, storage))
async def process_name(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext):
await fsm_context.update_data(name=event.text)
await fsm_context.set_state(OrderForm.waiting_phone)
await bot.send_text(chat_id=event.chat.chat_id, text="Enter your phone number:")
@dp.message(StateFilter(OrderForm.waiting_phone, storage))
async def process_phone(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext):
await fsm_context.update_data(phone=event.text)
await fsm_context.set_state(OrderForm.waiting_address)
await bot.send_text(chat_id=event.chat.chat_id, text="Enter delivery address:")
@dp.message(StateFilter(OrderForm.waiting_address, storage))
async def process_address(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext):
await fsm_context.update_data(address=event.text)
data = await fsm_context.get_data()
summary = f"""
Order Summary:
Name: {data['name']}
Phone: {data['phone']}
Address: {data['address']}
Confirm order? (yes/no)
"""
await fsm_context.set_state(OrderForm.waiting_confirm)
await bot.send_text(chat_id=event.chat.chat_id, text=summary)
@dp.message(StateFilter(OrderForm.waiting_confirm, storage))
async def process_confirm(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext):
if event.text and event.text.lower() == "yes":
data = await fsm_context.get_data()
await bot.send_text(
chat_id=event.chat.chat_id,
text=f"Order confirmed! We'll deliver to {data['address']}."
)
else:
await bot.send_text(chat_id=event.chat.chat_id, text="Order cancelled.")
await fsm_context.clear()
@dp.command("cancel")
async def cancel_order(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext):
current_state = await fsm_context.get_state()
if current_state:
await fsm_context.clear()
await bot.send_text(chat_id=event.chat.chat_id, text="Order cancelled.")
else:
await bot.send_text(chat_id=event.chat.chat_id, text="Nothing to cancel.")
```
## Redis Storage for Production FSM
Use `RedisStorage` for production deployments with persistence across bot restarts and multi-process support.
```python
from vk_teams_async_bot import Bot, Dispatcher, RedisStorage
# Using Redis URL (storage manages connection)
storage = RedisStorage(
redis_url="redis://localhost:6379/0",
key_prefix="mybot",
state_ttl=3600 # Sessions expire after 1 hour of inactivity
)
dp = Dispatcher(storage=storage)
# Or with existing Redis connection
from redis.asyncio import Redis
redis_client = Redis.from_url("redis://localhost:6379/0")
storage = RedisStorage(
redis=redis_client,
key_prefix="mybot",
state_ttl=1800
)
dp = Dispatcher(storage=storage)
# Note: When using redis_url, storage closes connection automatically
# When using redis instance, you manage the connection lifecycle
```
## Middleware
Middleware intercepts events before and after handler execution, enabling cross-cutting concerns like logging, access control, and data injection.
```python
from typing import Any
from vk_teams_async_bot import (
BaseMiddleware, Bot, Dispatcher, NewMessageEvent, CommandFilter
)
from vk_teams_async_bot.types.event import BaseEvent
class LoggingMiddleware(BaseMiddleware):
"""Log all incoming events."""
async def __call__(self, handler, event, data):
print(f"-> Event received: {event.type}")
result = await handler(event, data)
print(f"<- Event processed: {event.type}")
return result
class AccessControlMiddleware(BaseMiddleware):
"""Restrict bot access to allowed chats."""
def __init__(self, allowed_chats: list[str]):
self.allowed_chats = allowed_chats
async def __call__(self, handler, event, data):
if isinstance(event, BaseEvent) and hasattr(event, "chat"):
if event.chat.chat_id not in self.allowed_chats:
bot = data["bot"]
await bot.send_text(
chat_id=event.chat.chat_id,
text="Access denied. Contact administrator."
)
return # Short-circuit, don't call handler
return await handler(event, data)
class UserDataMiddleware(BaseMiddleware):
"""Inject user role into handler data."""
def __init__(self, roles: dict[str, str]):
self.roles = roles
async def __call__(self, handler, event, data):
if isinstance(event, BaseEvent) and hasattr(event, "from_"):
user_id = event.from_.user_id
data["user_role"] = self.roles.get(user_id, "guest")
return await handler(event, data)
# Register middleware (order matters - first added runs first)
dp = Dispatcher()
dp.add_middleware(LoggingMiddleware())
dp.add_middleware(AccessControlMiddleware(["allowed@chat.agent", "admin@chat.agent"]))
dp.add_middleware(UserDataMiddleware({"admin@example.com": "admin"}))
@dp.command("admin")
async def admin_command(event: NewMessageEvent, bot: Bot, user_role: str):
if user_role == "admin":
await bot.send_text(chat_id=event.chat.chat_id, text="Admin command executed")
else:
await bot.send_text(chat_id=event.chat.chat_id, text="Permission denied")
```
## Session Timeout Middleware
The built-in `SessionTimeoutMiddleware` automatically clears stale FSM sessions for `MemoryStorage`.
```python
from vk_teams_async_bot import (
Bot, Dispatcher, MemoryStorage, SessionTimeoutMiddleware
)
storage = MemoryStorage()
timeout_mw = SessionTimeoutMiddleware(storage, timeout=300) # 5 minutes
dp = Dispatcher(storage=storage)
dp.add_middleware(timeout_mw)
bot = Bot(bot_token="TOKEN")
@bot.on_shutdown
async def cleanup(bot: Bot):
await timeout_mw.close()
# Note: For RedisStorage, use state_ttl parameter instead
# Redis handles expiration automatically
```
## Dependency Injection
Register factory functions in `bot.depends` to automatically inject dependencies into handlers based on type annotations.
```python
import asyncio
from typing import Annotated
import aiohttp
from vk_teams_async_bot import Bot, Dispatcher, NewMessageEvent
bot = Bot(bot_token="TOKEN")
dp = Dispatcher()
# Sync factory - returns value directly
def get_config():
return {"api_url": "https://api.example.com", "timeout": 30}
# Async factory - returns value directly
async def get_api_client():
return {"client_id": "abc123"}
# Async generator factory - with cleanup
async def get_http_session():
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()
# Database connection with cleanup
async def get_database():
conn = await create_db_connection()
try:
yield conn
finally:
await conn.close()
# Register factories
bot.depends.extend([get_config, get_api_client, get_http_session, get_database])
@dp.message()
async def handler(
event: NewMessageEvent,
bot: Bot,
config: get_config, # Injected by return type
api: get_api_client, # Async factory
session: Annotated[aiohttp.ClientSession, get_http_session], # Generator with Annotated
db: Annotated[object, get_database], # Database connection
):
async with session.get(config["api_url"]) as resp:
data = await resp.json()
await bot.send_text(chat_id=event.chat.chat_id, text=f"API response: {data}")
async def main():
async with bot:
await bot.start_polling(dp)
```
## Chat Management
Methods for managing chat properties, members, and moderation.
```python
from vk_teams_async_bot import Bot, ChatAction
# Get chat information
chat_info = await bot.get_chat_info(chat_id="group@chat.agent")
print(f"Chat: {chat_info.title}, Members: {chat_info.members_count}")
# Get chat admins
admins = await bot.get_chat_admins(chat_id="group@chat.agent")
for admin in admins.admins:
print(f"Admin: {admin.user_id}")
# Get chat members with pagination
members = await bot.get_chat_members(chat_id="group@chat.agent")
for member in members.members:
print(f"Member: {member.user_id}")
if members.cursor:
# Fetch next page
more_members = await bot.get_chat_members(chat_id="group@chat.agent", cursor=members.cursor)
# Set chat properties
await bot.set_chat_title(chat_id="group@chat.agent", title="New Chat Title")
await bot.set_chat_about(chat_id="group@chat.agent", about="Chat description")
await bot.set_chat_rules(chat_id="group@chat.agent", rules="Be respectful")
# Set chat avatar
await bot.set_chat_avatar(chat_id="group@chat.agent", image="avatar.png")
# Send typing indicator
await bot.send_chat_actions(chat_id="user@example.com", actions=[ChatAction.TYPING])
# Pin/unpin messages
await bot.pin_message(chat_id="group@chat.agent", msg_id="6752793278973034123")
await bot.unpin_message(chat_id="group@chat.agent", msg_id="6752793278973034123")
# Block/unblock users
await bot.block_user(chat_id="group@chat.agent", user_id="user@example.com", del_last_messages=True)
await bot.unblock_user(chat_id="group@chat.agent", user_id="user@example.com")
# Get blocked/pending users
blocked = await bot.get_blocked_users(chat_id="group@chat.agent")
pending = await bot.get_pending_users(chat_id="group@chat.agent")
# Approve/reject pending users
await bot.resolve_pending(chat_id="group@chat.agent", approve=True, user_id="user@example.com")
await bot.resolve_pending(chat_id="group@chat.agent", approve=False, everyone=True)
# Remove members
await bot.delete_chat_members(chat_id="group@chat.agent", members=["user1@example.com", "user2@example.com"])
# On-premise only: Create chat and add members
chat = await bot.create_chat(
name="New Group",
about="Group description",
members=["user1@example.com", "user2@example.com"],
public=False
)
await bot.add_chat_members(chat_id=chat.chat_id, members=["user3@example.com"])
```
## Event Handlers
Register handlers for different event types using dispatcher decorators.
```python
from vk_teams_async_bot import (
Bot, Dispatcher,
NewMessageEvent, EditedMessageEvent, DeletedMessageEvent,
PinnedMessageEvent, UnpinnedMessageEvent,
NewChatMembersEvent, LeftChatMembersEvent, CallbackQueryEvent
)
dp = Dispatcher()
@dp.message()
async def on_new_message(event: NewMessageEvent, bot: Bot):
print(f"New message from {event.from_.user_id}: {event.text}")
@dp.edited_message()
async def on_edited(event: EditedMessageEvent, bot: Bot):
print(f"Message {event.msg_id} edited: {event.text}")
@dp.deleted_message()
async def on_deleted(event: DeletedMessageEvent, bot: Bot):
print(f"Message {event.msg_id} deleted")
@dp.pinned_message()
async def on_pinned(event: PinnedMessageEvent, bot: Bot):
print(f"Message pinned by {event.from_.user_id}")
@dp.unpinned_message()
async def on_unpinned(event: UnpinnedMessageEvent, bot: Bot):
print(f"Message {event.msg_id} unpinned")
@dp.new_chat_members()
async def on_new_members(event: NewChatMembersEvent, bot: Bot):
names = [m.first_name for m in event.new_members]
await bot.send_text(
chat_id=event.chat.chat_id,
text=f"Welcome {', '.join(names)}!"
)
@dp.left_chat_members()
async def on_left_members(event: LeftChatMembersEvent, bot: Bot):
names = [m.first_name for m in event.left_members]
print(f"Members left: {', '.join(names)}")
@dp.callback_query()
async def on_callback(event: CallbackQueryEvent, bot: Bot):
print(f"Button pressed: {event.callback_data}")
await bot.answer_callback_query(query_id=event.query_id)
```
## Error Handling and Retry Policy
Configure automatic retry with exponential backoff and handle API errors gracefully.
```python
from vk_teams_async_bot import (
Bot, VKTeamsError, APIError, RateLimitError,
ServerError, NetworkError, TimeoutError
)
from vk_teams_async_bot.client.retry import RetryPolicy
# Configure retry policy
bot = Bot(
bot_token="TOKEN",
retry_policy=RetryPolicy(
max_retries=3, # Maximum retry attempts
base_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay between retries
jitter=True # Add randomization to prevent thundering herd
)
)
# Handle errors in handlers
@dp.command("risky")
async def risky_operation(event: NewMessageEvent, bot: Bot):
try:
await bot.send_text(chat_id=event.chat.chat_id, text="Attempting...")
except RateLimitError as e:
print(f"Rate limited, retry after: {e.retry_after}s")
await bot.send_text(chat_id=event.chat.chat_id, text="Too many requests, try later")
except ServerError as e:
print(f"Server error {e.status_code}: {e.description}")
except NetworkError:
print("Network connection failed")
except TimeoutError:
print("Request timed out")
except APIError as e:
print(f"API error {e.status_code}: {e.description}")
except VKTeamsError as e:
print(f"VK Teams error: {e}")
```
## Inline Keyboard Building
Build complex keyboard layouts with rows and button styles.
```python
from vk_teams_async_bot import InlineKeyboardMarkup, KeyboardButton, StyleKeyboard
# Basic keyboard with automatic row wrapping
kb = InlineKeyboardMarkup(buttons_in_row=3)
kb.add(
KeyboardButton(text="1", callback_data="num_1"),
KeyboardButton(text="2", callback_data="num_2"),
KeyboardButton(text="3", callback_data="num_3"),
KeyboardButton(text="4", callback_data="num_4"),
KeyboardButton(text="5", callback_data="num_5"),
)
# Result: [[1, 2, 3], [4, 5]]
# Manual row control
kb = InlineKeyboardMarkup()
kb.row(
KeyboardButton(text="Option A", callback_data="a", style=StyleKeyboard.PRIMARY),
KeyboardButton(text="Option B", callback_data="b", style=StyleKeyboard.PRIMARY),
)
kb.row(
KeyboardButton(text="Cancel", callback_data="cancel", style=StyleKeyboard.ATTENTION),
)
# Result: [[Option A, Option B], [Cancel]]
# Mixed callback and URL buttons
kb = InlineKeyboardMarkup(buttons_in_row=2)
kb.add(
KeyboardButton(text="Confirm", callback_data="confirm", style=StyleKeyboard.PRIMARY),
KeyboardButton(text="Help", url="https://help.example.com"),
)
# Combine keyboards
header = InlineKeyboardMarkup()
header.row(KeyboardButton(text="Title", callback_data="title"))
footer = InlineKeyboardMarkup()
footer.row(KeyboardButton(text="Back", callback_data="back"))
combined = header + footer # Combines all rows
```
## Bot Information
Retrieve information about the bot itself.
```python
# Get bot info
info = await bot.get_self()
print(f"Bot ID: {info.user_id}")
print(f"Name: {info.first_name} {info.last_name or ''}")
print(f"Nick: @{info.nick}")
print(f"About: {info.about}")
```
The vk-teams-async-bot framework is ideal for building conversational bots, automation tools, and integrations with VK Teams. Its event-driven architecture with long polling makes it suitable for real-time messaging applications, while FSM support enables complex multi-step workflows like order forms, surveys, and customer support flows.
For production deployments, use RedisStorage to persist FSM state across restarts and enable horizontal scaling. The middleware system allows for clean separation of concerns like authentication, logging, and rate limiting. Dependency injection simplifies testing and resource management by allowing handlers to declare their dependencies through type annotations rather than managing connections directly.