# vk-teams-async-bot vk-teams-async-bot is an async Python framework for building bots on the VK Teams platform. It provides a comprehensive wrapper around the VK Teams Bot API with 27 API methods, an event-driven architecture using long polling, a dispatcher with decorator-based handler registration, composable filters, finite state machine (FSM) support for multi-step dialogs, middleware hooks, and dependency injection. The framework requires Python 3.11+ and is built on aiohttp and Pydantic. The framework follows a clean architecture with typed event models, automatic retry with exponential backoff, and graceful shutdown handling. It supports both in-memory storage for simple use cases and Redis storage for production deployments with session persistence across restarts. Events are processed in parallel with automatic serialization for the same user/chat combination when FSM storage is configured. ## Bot Initialization and Lifecycle The `Bot` class is the main entry point that manages the HTTP session, polling loop, and lifecycle hooks. It must be used as an async context manager to properly initialize and clean up resources. ```python import asyncio import os from vk_teams_async_bot import Bot, Dispatcher, NewMessageEvent bot = Bot( bot_token=os.environ["BOT_TOKEN"], url="https://myteam.mail.ru", # VK Teams server URL timeout=30, # HTTP request timeout (seconds) poll_time=15, # Long polling timeout (seconds) max_concurrent_handlers=100, # Max parallel handler executions shutdown_timeout=30.0, # Graceful shutdown timeout (seconds) ) dp = Dispatcher() @bot.on_startup async def on_start(bot: Bot): info = await bot.get_self() print(f"Bot started: {info.first_name} (@{info.nick})") @bot.on_shutdown async def on_stop(bot: Bot): print("Bot stopped") @dp.message() async def echo(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text=event.text or "") async def main(): async with bot: await bot.start_polling(dp) if __name__ == "__main__": asyncio.run(main()) ``` ## Sending Text Messages The `send_text` method sends a text message to a chat with optional formatting, keyboard, reply, or forward parameters. ```python from vk_teams_async_bot import ( Bot, ParseMode, InlineKeyboardMarkup, KeyboardButton, StyleKeyboard ) # Simple text message result = await bot.send_text( chat_id="user@example.com", text="Hello, World!" ) print(f"Message ID: {result.msg_id}") # Markdown formatted message await bot.send_text( chat_id="user@example.com", text="*bold* _italic_ `code`", parse_mode=ParseMode.MARKDOWNV2 ) # HTML formatted message await bot.send_text( chat_id="user@example.com", text="bold italic code", parse_mode=ParseMode.HTML ) # Message with inline keyboard kb = InlineKeyboardMarkup(buttons_in_row=2) kb.add( KeyboardButton(text="Yes", callback_data="confirm_yes", style=StyleKeyboard.PRIMARY), KeyboardButton(text="No", callback_data="confirm_no", style=StyleKeyboard.ATTENTION), KeyboardButton(text="Documentation", url="https://teams.vk.com/botapi/"), ) await bot.send_text( chat_id="user@example.com", text="Do you confirm?", inline_keyboard_markup=kb ) # Reply to a message await bot.send_text( chat_id="user@example.com", text="This is a reply", reply_msg_id="6752793278973034123" ) # Forward a message await bot.send_text( chat_id="user@example.com", text="Forwarded message", forward_chat_id="other@example.com", forward_msg_id="6752793278973034456" ) ``` ## Sending and Downloading Files The `send_file`, `send_voice`, `get_file_info`, and `download_file` methods handle file uploads and downloads. ```python from pathlib import Path # Send file from disk result = await bot.send_file( chat_id="user@example.com", file="documents/report.pdf", caption="Monthly report" ) print(f"File ID: {result.file_id}") # Send file from memory (tuple format) file_content = b"Hello, this is file content" result = await bot.send_file( chat_id="user@example.com", file=("hello.txt", file_content, "text/plain"), caption="Dynamic file" ) # Re-send previously uploaded file by ID await bot.send_file( chat_id="another_user@example.com", file_id=result.file_id ) # Send voice message await bot.send_voice( chat_id="user@example.com", file="audio/message.ogg" ) # Get file info and download file_info = await bot.get_file_info(file_id="ABC123XYZ") print(f"Filename: {file_info.filename}, Size: {file_info.size}") file_data = await bot.download_file(file_info.url) Path("downloaded_file.pdf").write_bytes(file_data) ``` ## Editing and Deleting Messages The `edit_text` and `delete_messages` methods modify or remove existing messages. ```python # Edit message text await bot.edit_text( chat_id="user@example.com", msg_id="6752793278973034123", text="Updated message content" ) # Edit message with new keyboard kb = InlineKeyboardMarkup(buttons_in_row=1) kb.add(KeyboardButton(text="Done", callback_data="done")) await bot.edit_text( chat_id="user@example.com", msg_id="6752793278973034123", text="Task completed!", inline_keyboard_markup=kb ) # Delete single message await bot.delete_messages( chat_id="user@example.com", msg_id="6752793278973034123" ) # Delete multiple messages await bot.delete_messages( chat_id="user@example.com", msg_id=["6752793278973034123", "6752793278973034124", "6752793278973034125"] ) ``` ## Handling Callback Queries The `answer_callback_query` method responds to inline keyboard button presses, and the `@dp.callback_query()` decorator registers handlers for these events. ```python from vk_teams_async_bot import ( Bot, Dispatcher, CallbackQueryEvent, CallbackDataFilter, InlineKeyboardMarkup, KeyboardButton, StyleKeyboard ) dp = Dispatcher() @dp.callback_query(CallbackDataFilter("confirm_yes")) async def on_confirm_yes(event: CallbackQueryEvent, bot: Bot): # Show popup notification await bot.answer_callback_query( query_id=event.query_id, text="Confirmed!", show_alert=False ) # Update the message await bot.edit_text( chat_id=event.chat.chat_id, msg_id=event.message.msg_id, text="You confirmed the action." ) @dp.callback_query(CallbackDataFilter("confirm_no")) async def on_confirm_no(event: CallbackQueryEvent, bot: Bot): # Show alert dialog await bot.answer_callback_query( query_id=event.query_id, text="Action cancelled", show_alert=True ) @dp.callback_query(CallbackDataFilter("open_url")) async def on_open_url(event: CallbackQueryEvent, bot: Bot): # Open URL in browser await bot.answer_callback_query( query_id=event.query_id, url="https://example.com/details" ) ``` ## Command Handlers The `CommandFilter` and `@dp.command()` decorator provide convenient ways to handle bot commands. ```python from vk_teams_async_bot import Bot, Dispatcher, NewMessageEvent, CommandFilter dp = Dispatcher() # Using @dp.command() shortcut @dp.command("start") async def cmd_start(event: NewMessageEvent, bot: Bot): await bot.send_text( chat_id=event.chat.chat_id, text="Welcome! Use /help to see available commands." ) @dp.command("help") async def cmd_help(event: NewMessageEvent, bot: Bot): help_text = """ Available commands: /start - Start the bot /help - Show this help /settings - Bot settings /status - Check status """ await bot.send_text(chat_id=event.chat.chat_id, text=help_text) # Using CommandFilter explicitly with additional filters from vk_teams_async_bot import ChatTypeFilter, ChatType @dp.message(CommandFilter("admin"), ChatTypeFilter(ChatType.GROUP)) async def cmd_admin(event: NewMessageEvent, bot: Bot): # This command only works in group chats await bot.send_text(chat_id=event.chat.chat_id, text="Admin command executed") ``` ## Filter Composition Filters can be combined using `&` (AND), `|` (OR), and `~` (NOT) operators to create complex matching logic. ```python from vk_teams_async_bot import ( Bot, Dispatcher, NewMessageEvent, CommandFilter, RegexpFilter, TextFilter, FileFilter, VoiceFilter, StickerFilter, ChatTypeFilter, ChatType, FromUserFilter, MentionFilter, ReplyFilter, ForwardFilter ) dp = Dispatcher() # Match files that are NOT voice messages @dp.message(FileFilter() & ~VoiceFilter()) async def on_file_not_voice(event: NewMessageEvent, bot: Bot): await bot.send_text( chat_id=event.chat.chat_id, text="Received a file (not voice)" ) # Match regex pattern OR specific command @dp.message(RegexpFilter(r"order\s+\d+") | CommandFilter("order")) async def on_order(event: NewMessageEvent, bot: Bot): await bot.send_text( chat_id=event.chat.chat_id, text=f"Processing order: {event.text}" ) # Match text in private chats only @dp.message(TextFilter() & ChatTypeFilter(ChatType.PRIVATE)) async def on_private_text(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text="Private message received") # Match replies from specific user @dp.message(ReplyFilter() & FromUserFilter("admin@example.com")) async def on_admin_reply(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text="Admin replied") # Match forwarded messages with mentions @dp.message(ForwardFilter() & MentionFilter()) async def on_forwarded_mention(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text="Forwarded message with mention") # Match stickers in group chats @dp.message(StickerFilter() & ChatTypeFilter(ChatType.GROUP)) async def on_group_sticker(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text="Nice sticker!") ``` ## Custom Filters Create custom filters by inheriting from `FilterBase` and implementing the `__call__` method. ```python from vk_teams_async_bot import FilterBase, NewMessageEvent, BaseEvent class LongMessageFilter(FilterBase): """Match messages longer than a specified length.""" def __init__(self, min_length: int = 100): self.min_length = min_length def __call__(self, event: BaseEvent) -> bool: if isinstance(event, NewMessageEvent) and event.text: return len(event.text) >= self.min_length return False class KeywordFilter(FilterBase): """Match messages containing any of the specified keywords.""" def __init__(self, keywords: list[str], case_sensitive: bool = False): self.keywords = keywords self.case_sensitive = case_sensitive def __call__(self, event: BaseEvent) -> bool: if not isinstance(event, NewMessageEvent) or not event.text: return False text = event.text if self.case_sensitive else event.text.lower() keywords = self.keywords if self.case_sensitive else [k.lower() for k in self.keywords] return any(kw in text for kw in keywords) # Usage @dp.message(LongMessageFilter(200)) async def on_long_message(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text="That's a long message!") @dp.message(KeywordFilter(["help", "support", "issue"])) async def on_support_request(event: NewMessageEvent, bot: Bot): await bot.send_text(chat_id=event.chat.chat_id, text="How can we help you?") ``` ## Finite State Machine (FSM) for Multi-Step Dialogs The FSM system enables multi-step conversational flows with state and data persistence per user/chat combination. ```python from vk_teams_async_bot import ( Bot, Dispatcher, NewMessageEvent, CommandFilter, FSMContext, MemoryStorage, State, StatesGroup, StateFilter ) class OrderForm(StatesGroup): waiting_name = State() waiting_phone = State() waiting_address = State() waiting_confirm = State() storage = MemoryStorage() dp = Dispatcher(storage=storage) @dp.command("order") async def start_order(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext): await fsm_context.set_state(OrderForm.waiting_name) await bot.send_text(chat_id=event.chat.chat_id, text="Please enter your name:") @dp.message(StateFilter(OrderForm.waiting_name, storage)) async def process_name(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext): await fsm_context.update_data(name=event.text) await fsm_context.set_state(OrderForm.waiting_phone) await bot.send_text(chat_id=event.chat.chat_id, text="Enter your phone number:") @dp.message(StateFilter(OrderForm.waiting_phone, storage)) async def process_phone(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext): await fsm_context.update_data(phone=event.text) await fsm_context.set_state(OrderForm.waiting_address) await bot.send_text(chat_id=event.chat.chat_id, text="Enter delivery address:") @dp.message(StateFilter(OrderForm.waiting_address, storage)) async def process_address(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext): await fsm_context.update_data(address=event.text) data = await fsm_context.get_data() summary = f""" Order Summary: Name: {data['name']} Phone: {data['phone']} Address: {data['address']} Confirm order? (yes/no) """ await fsm_context.set_state(OrderForm.waiting_confirm) await bot.send_text(chat_id=event.chat.chat_id, text=summary) @dp.message(StateFilter(OrderForm.waiting_confirm, storage)) async def process_confirm(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext): if event.text and event.text.lower() == "yes": data = await fsm_context.get_data() await bot.send_text( chat_id=event.chat.chat_id, text=f"Order confirmed! We'll deliver to {data['address']}." ) else: await bot.send_text(chat_id=event.chat.chat_id, text="Order cancelled.") await fsm_context.clear() @dp.command("cancel") async def cancel_order(event: NewMessageEvent, bot: Bot, fsm_context: FSMContext): current_state = await fsm_context.get_state() if current_state: await fsm_context.clear() await bot.send_text(chat_id=event.chat.chat_id, text="Order cancelled.") else: await bot.send_text(chat_id=event.chat.chat_id, text="Nothing to cancel.") ``` ## Redis Storage for Production FSM Use `RedisStorage` for production deployments with persistence across bot restarts and multi-process support. ```python from vk_teams_async_bot import Bot, Dispatcher, RedisStorage # Using Redis URL (storage manages connection) storage = RedisStorage( redis_url="redis://localhost:6379/0", key_prefix="mybot", state_ttl=3600 # Sessions expire after 1 hour of inactivity ) dp = Dispatcher(storage=storage) # Or with existing Redis connection from redis.asyncio import Redis redis_client = Redis.from_url("redis://localhost:6379/0") storage = RedisStorage( redis=redis_client, key_prefix="mybot", state_ttl=1800 ) dp = Dispatcher(storage=storage) # Note: When using redis_url, storage closes connection automatically # When using redis instance, you manage the connection lifecycle ``` ## Middleware Middleware intercepts events before and after handler execution, enabling cross-cutting concerns like logging, access control, and data injection. ```python from typing import Any from vk_teams_async_bot import ( BaseMiddleware, Bot, Dispatcher, NewMessageEvent, CommandFilter ) from vk_teams_async_bot.types.event import BaseEvent class LoggingMiddleware(BaseMiddleware): """Log all incoming events.""" async def __call__(self, handler, event, data): print(f"-> Event received: {event.type}") result = await handler(event, data) print(f"<- Event processed: {event.type}") return result class AccessControlMiddleware(BaseMiddleware): """Restrict bot access to allowed chats.""" def __init__(self, allowed_chats: list[str]): self.allowed_chats = allowed_chats async def __call__(self, handler, event, data): if isinstance(event, BaseEvent) and hasattr(event, "chat"): if event.chat.chat_id not in self.allowed_chats: bot = data["bot"] await bot.send_text( chat_id=event.chat.chat_id, text="Access denied. Contact administrator." ) return # Short-circuit, don't call handler return await handler(event, data) class UserDataMiddleware(BaseMiddleware): """Inject user role into handler data.""" def __init__(self, roles: dict[str, str]): self.roles = roles async def __call__(self, handler, event, data): if isinstance(event, BaseEvent) and hasattr(event, "from_"): user_id = event.from_.user_id data["user_role"] = self.roles.get(user_id, "guest") return await handler(event, data) # Register middleware (order matters - first added runs first) dp = Dispatcher() dp.add_middleware(LoggingMiddleware()) dp.add_middleware(AccessControlMiddleware(["allowed@chat.agent", "admin@chat.agent"])) dp.add_middleware(UserDataMiddleware({"admin@example.com": "admin"})) @dp.command("admin") async def admin_command(event: NewMessageEvent, bot: Bot, user_role: str): if user_role == "admin": await bot.send_text(chat_id=event.chat.chat_id, text="Admin command executed") else: await bot.send_text(chat_id=event.chat.chat_id, text="Permission denied") ``` ## Session Timeout Middleware The built-in `SessionTimeoutMiddleware` automatically clears stale FSM sessions for `MemoryStorage`. ```python from vk_teams_async_bot import ( Bot, Dispatcher, MemoryStorage, SessionTimeoutMiddleware ) storage = MemoryStorage() timeout_mw = SessionTimeoutMiddleware(storage, timeout=300) # 5 minutes dp = Dispatcher(storage=storage) dp.add_middleware(timeout_mw) bot = Bot(bot_token="TOKEN") @bot.on_shutdown async def cleanup(bot: Bot): await timeout_mw.close() # Note: For RedisStorage, use state_ttl parameter instead # Redis handles expiration automatically ``` ## Dependency Injection Register factory functions in `bot.depends` to automatically inject dependencies into handlers based on type annotations. ```python import asyncio from typing import Annotated import aiohttp from vk_teams_async_bot import Bot, Dispatcher, NewMessageEvent bot = Bot(bot_token="TOKEN") dp = Dispatcher() # Sync factory - returns value directly def get_config(): return {"api_url": "https://api.example.com", "timeout": 30} # Async factory - returns value directly async def get_api_client(): return {"client_id": "abc123"} # Async generator factory - with cleanup async def get_http_session(): session = aiohttp.ClientSession() try: yield session finally: await session.close() # Database connection with cleanup async def get_database(): conn = await create_db_connection() try: yield conn finally: await conn.close() # Register factories bot.depends.extend([get_config, get_api_client, get_http_session, get_database]) @dp.message() async def handler( event: NewMessageEvent, bot: Bot, config: get_config, # Injected by return type api: get_api_client, # Async factory session: Annotated[aiohttp.ClientSession, get_http_session], # Generator with Annotated db: Annotated[object, get_database], # Database connection ): async with session.get(config["api_url"]) as resp: data = await resp.json() await bot.send_text(chat_id=event.chat.chat_id, text=f"API response: {data}") async def main(): async with bot: await bot.start_polling(dp) ``` ## Chat Management Methods for managing chat properties, members, and moderation. ```python from vk_teams_async_bot import Bot, ChatAction # Get chat information chat_info = await bot.get_chat_info(chat_id="group@chat.agent") print(f"Chat: {chat_info.title}, Members: {chat_info.members_count}") # Get chat admins admins = await bot.get_chat_admins(chat_id="group@chat.agent") for admin in admins.admins: print(f"Admin: {admin.user_id}") # Get chat members with pagination members = await bot.get_chat_members(chat_id="group@chat.agent") for member in members.members: print(f"Member: {member.user_id}") if members.cursor: # Fetch next page more_members = await bot.get_chat_members(chat_id="group@chat.agent", cursor=members.cursor) # Set chat properties await bot.set_chat_title(chat_id="group@chat.agent", title="New Chat Title") await bot.set_chat_about(chat_id="group@chat.agent", about="Chat description") await bot.set_chat_rules(chat_id="group@chat.agent", rules="Be respectful") # Set chat avatar await bot.set_chat_avatar(chat_id="group@chat.agent", image="avatar.png") # Send typing indicator await bot.send_chat_actions(chat_id="user@example.com", actions=[ChatAction.TYPING]) # Pin/unpin messages await bot.pin_message(chat_id="group@chat.agent", msg_id="6752793278973034123") await bot.unpin_message(chat_id="group@chat.agent", msg_id="6752793278973034123") # Block/unblock users await bot.block_user(chat_id="group@chat.agent", user_id="user@example.com", del_last_messages=True) await bot.unblock_user(chat_id="group@chat.agent", user_id="user@example.com") # Get blocked/pending users blocked = await bot.get_blocked_users(chat_id="group@chat.agent") pending = await bot.get_pending_users(chat_id="group@chat.agent") # Approve/reject pending users await bot.resolve_pending(chat_id="group@chat.agent", approve=True, user_id="user@example.com") await bot.resolve_pending(chat_id="group@chat.agent", approve=False, everyone=True) # Remove members await bot.delete_chat_members(chat_id="group@chat.agent", members=["user1@example.com", "user2@example.com"]) # On-premise only: Create chat and add members chat = await bot.create_chat( name="New Group", about="Group description", members=["user1@example.com", "user2@example.com"], public=False ) await bot.add_chat_members(chat_id=chat.chat_id, members=["user3@example.com"]) ``` ## Event Handlers Register handlers for different event types using dispatcher decorators. ```python from vk_teams_async_bot import ( Bot, Dispatcher, NewMessageEvent, EditedMessageEvent, DeletedMessageEvent, PinnedMessageEvent, UnpinnedMessageEvent, NewChatMembersEvent, LeftChatMembersEvent, CallbackQueryEvent ) dp = Dispatcher() @dp.message() async def on_new_message(event: NewMessageEvent, bot: Bot): print(f"New message from {event.from_.user_id}: {event.text}") @dp.edited_message() async def on_edited(event: EditedMessageEvent, bot: Bot): print(f"Message {event.msg_id} edited: {event.text}") @dp.deleted_message() async def on_deleted(event: DeletedMessageEvent, bot: Bot): print(f"Message {event.msg_id} deleted") @dp.pinned_message() async def on_pinned(event: PinnedMessageEvent, bot: Bot): print(f"Message pinned by {event.from_.user_id}") @dp.unpinned_message() async def on_unpinned(event: UnpinnedMessageEvent, bot: Bot): print(f"Message {event.msg_id} unpinned") @dp.new_chat_members() async def on_new_members(event: NewChatMembersEvent, bot: Bot): names = [m.first_name for m in event.new_members] await bot.send_text( chat_id=event.chat.chat_id, text=f"Welcome {', '.join(names)}!" ) @dp.left_chat_members() async def on_left_members(event: LeftChatMembersEvent, bot: Bot): names = [m.first_name for m in event.left_members] print(f"Members left: {', '.join(names)}") @dp.callback_query() async def on_callback(event: CallbackQueryEvent, bot: Bot): print(f"Button pressed: {event.callback_data}") await bot.answer_callback_query(query_id=event.query_id) ``` ## Error Handling and Retry Policy Configure automatic retry with exponential backoff and handle API errors gracefully. ```python from vk_teams_async_bot import ( Bot, VKTeamsError, APIError, RateLimitError, ServerError, NetworkError, TimeoutError ) from vk_teams_async_bot.client.retry import RetryPolicy # Configure retry policy bot = Bot( bot_token="TOKEN", retry_policy=RetryPolicy( max_retries=3, # Maximum retry attempts base_delay=1.0, # Initial delay in seconds max_delay=30.0, # Maximum delay between retries jitter=True # Add randomization to prevent thundering herd ) ) # Handle errors in handlers @dp.command("risky") async def risky_operation(event: NewMessageEvent, bot: Bot): try: await bot.send_text(chat_id=event.chat.chat_id, text="Attempting...") except RateLimitError as e: print(f"Rate limited, retry after: {e.retry_after}s") await bot.send_text(chat_id=event.chat.chat_id, text="Too many requests, try later") except ServerError as e: print(f"Server error {e.status_code}: {e.description}") except NetworkError: print("Network connection failed") except TimeoutError: print("Request timed out") except APIError as e: print(f"API error {e.status_code}: {e.description}") except VKTeamsError as e: print(f"VK Teams error: {e}") ``` ## Inline Keyboard Building Build complex keyboard layouts with rows and button styles. ```python from vk_teams_async_bot import InlineKeyboardMarkup, KeyboardButton, StyleKeyboard # Basic keyboard with automatic row wrapping kb = InlineKeyboardMarkup(buttons_in_row=3) kb.add( KeyboardButton(text="1", callback_data="num_1"), KeyboardButton(text="2", callback_data="num_2"), KeyboardButton(text="3", callback_data="num_3"), KeyboardButton(text="4", callback_data="num_4"), KeyboardButton(text="5", callback_data="num_5"), ) # Result: [[1, 2, 3], [4, 5]] # Manual row control kb = InlineKeyboardMarkup() kb.row( KeyboardButton(text="Option A", callback_data="a", style=StyleKeyboard.PRIMARY), KeyboardButton(text="Option B", callback_data="b", style=StyleKeyboard.PRIMARY), ) kb.row( KeyboardButton(text="Cancel", callback_data="cancel", style=StyleKeyboard.ATTENTION), ) # Result: [[Option A, Option B], [Cancel]] # Mixed callback and URL buttons kb = InlineKeyboardMarkup(buttons_in_row=2) kb.add( KeyboardButton(text="Confirm", callback_data="confirm", style=StyleKeyboard.PRIMARY), KeyboardButton(text="Help", url="https://help.example.com"), ) # Combine keyboards header = InlineKeyboardMarkup() header.row(KeyboardButton(text="Title", callback_data="title")) footer = InlineKeyboardMarkup() footer.row(KeyboardButton(text="Back", callback_data="back")) combined = header + footer # Combines all rows ``` ## Bot Information Retrieve information about the bot itself. ```python # Get bot info info = await bot.get_self() print(f"Bot ID: {info.user_id}") print(f"Name: {info.first_name} {info.last_name or ''}") print(f"Nick: @{info.nick}") print(f"About: {info.about}") ``` The vk-teams-async-bot framework is ideal for building conversational bots, automation tools, and integrations with VK Teams. Its event-driven architecture with long polling makes it suitable for real-time messaging applications, while FSM support enables complex multi-step workflows like order forms, surveys, and customer support flows. For production deployments, use RedisStorage to persist FSM state across restarts and enable horizontal scaling. The middleware system allows for clean separation of concerns like authentication, logging, and rate limiting. Dependency injection simplifies testing and resource management by allowing handlers to declare their dependencies through type annotations rather than managing connections directly.