### Install dependencies Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Install project dependencies using Makefile. ```makefile make install-poetry make install ``` -------------------------------- ### Async Get and Print User Accounts Source: https://tinkoff.github.io/invest-python/examples Asynchronously retrieves and prints the user's accounts. Requires INVEST_TOKEN environment variable. ```python import asyncio import os from tinkoff.invest import AsyncClient TOKEN = os.environ["INVEST_TOKEN"] async def main(): async with AsyncClient(TOKEN) as client: print(await client.users.get_accounts()) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Get and Print Client Accounts Source: https://tinkoff.github.io/invest-python/examples Retrieves and prints the user's accounts using a synchronous client. Requires INVEST_TOKEN environment variable. ```python import os from tinkoff.invest import Client TOKEN = os.environ["INVEST_TOKEN"] def main(): with Client(TOKEN) as client: print(client.users.get_accounts()) if __name__ == "__main__": main() ``` -------------------------------- ### Async Get and Print Hourly Candles for a Year Source: https://tinkoff.github.io/invest-python/examples Asynchronously fetches and prints hourly candle data for a given FIGI over the past year. Requires INVEST_TOKEN environment variable. ```python import asyncio import os from datetime import timedelta from tinkoff.invest import AsyncClient, CandleInterval from tinkoff.invest.utils import now TOKEN = os.environ["INVEST_TOKEN"] async def main(): async with AsyncClient(TOKEN) as client: async for candle in client.get_all_candles( figi="BBG004730N88", from_=now() - timedelta(days=365), interval=CandleInterval.CANDLE_INTERVAL_HOUR, ): print(candle) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Get Operations by Cursor Source: https://tinkoff.github.io/invest-python/examples Retrieves a paginated list of operations for a specific account and instrument. Uses the INVEST_TOKEN environment variable and demonstrates cursor-based pagination. ```python import os from tinkoff.invest import Client, GetOperationsByCursorRequest token = os.environ["INVEST_TOKEN"] with Client(token) as client: accounts = client.users.get_accounts() account_id = accounts.accounts[0].id def get_request(cursor=""): return GetOperationsByCursorRequest( account_id=account_id, instrument_id="BBG004730N88", cursor=cursor, limit=1, ) operations = client.operations.get_operations_by_cursor(get_request()) print(operations) while operations.has_next: request = get_request(cursor=operations.next_cursor) operations = client.operations.get_operations_by_cursor(request) print(operations) ``` -------------------------------- ### Get and Print Hourly Candles for a Year Source: https://tinkoff.github.io/invest-python/examples Fetches and prints hourly candle data for a given FIGI over the past year. Requires INVEST_TOKEN environment variable. ```python import os from datetime import timedelta from tinkoff.invest import CandleInterval, Client from tink.invest.utils import now TOKEN = os.environ["INVEST_TOKEN"] def main(): with Client(TOKEN) as client: for candle in client.get_all_candles( figi="BBG004730N88", from_=now() - timedelta(days=365), interval=CandleInterval.CANDLE_INTERVAL_HOUR, ): print(candle) return 0 if __name__ == "__main__": main() ``` -------------------------------- ### Get Minute Candles with RetryingClient Source: https://tinkoff.github.io/invest-python/examples Fetches minute interval candles for a given FIGI with retry logic. Configurable retry attempts and usage. ```python import logging import os from datetime import timedelta from tinkoff.invest import CandleInterval from tinkoff.invest.retrying.settings import RetryClientSettings from tinkoff.invest.retrying.sync.client import RetryingClient from tinkoff.invest.utils import now logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) TOKEN = os.environ["INVEST_TOKEN"] retry_settings = RetryClientSettings(use_retry=True, max_retry_attempt=2) with RetryingClient(TOKEN, settings=retry_settings) as client: for candle in client.get_all_candles( figi="BBG000B9XRY4", from_=now() - timedelta(days=301), interval=CandleInterval.CANDLE_INTERVAL_1_MIN, ): print(candle) ``` -------------------------------- ### Get FIGI for Ticker Source: https://tinkoff.github.io/invest-python/examples Retrieves the FIGI (Financial Instrument Global Identifier) for a given ticker symbol. This example iterates through various instrument types to find the ticker. ```python """Example - How to get figi by name of ticker.""" import logging import os from pandas import DataFrame from tinkoff.invest import Client, SecurityTradingStatus from tinkoff.invest.services import InstrumentsService from tinkoff.invest.utils import quotation_to_decimal TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) logger = logging.getLogger(__name__) def main(): """Example - How to get figi by name of ticker.""" ticker = "VTBR" # "BRH3" "SBER" "VTBR" with Client(TOKEN) as client: instruments: InstrumentsService = client.instruments tickers = [] for method in ["shares", "bonds", "etfs", "currencies", "futures"]: for item in getattr(instruments, method)().instruments: tickers.append( { "name": item.name, "ticker": item.ticker, "class_code": item.class_code, "figi": item.figi, "uid": item.uid, "type": method, "min_price_increment": quotation_to_decimal( item.min_price_increment ), "scale": 9 - len(str(item.min_price_increment.nano)) + 1, "lot": item.lot, "trading_status": str( SecurityTradingStatus(item.trading_status).name ), "api_trade_available_flag": item.api_trade_available_flag, "currency": item.currency, "exchange": item.exchange, "buy_available_flag": item.buy_available_flag, "sell_available_flag": item.sell_available_flag, "short_enabled_flag": item.short_enabled_flag, "klong": quotation_to_decimal(item.klong), "kshort": quotation_to_decimal(item.kshort), } ) tickers_df = DataFrame(tickers) ticker_df = tickers_df[tickers_df["ticker"] == ticker] if ticker_df.empty: logger.error("There is no such ticker: %s", ticker) return figi = ticker_df["figi"].iloc[0] print(f"\nTicker {ticker} have figi={figi}\n") print(f"Additional info for this {ticker} ticker:") print(ticker_df.iloc[0]) if __name__ == "__main__": main() ``` -------------------------------- ### Get Sandbox User Info Source: https://tinkoff.github.io/invest-python/examples Retrieves user information from the Tinkoff Invest sandbox environment. Requires a sandbox token. ```python import os from tinkoff.invest.sandbox.client import SandboxClient TOKEN = os.environ["INVEST_TOKEN"] def main(): with SandboxClient(TOKEN) as client: print(client.users.get_info()) if __name__ == "__main__": main() ``` -------------------------------- ### Get List of Accounts Source: https://tinkoff.github.io/invest-python Use the `Client` to connect to the Tinkoff Invest API and retrieve a list of user accounts. Ensure your token is kept private. ```python from tinkoff.invest import Client TOKEN = 'token' with Client(TOKEN) as client: print(client.users.get_accounts()) ``` -------------------------------- ### Clone repository Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Initial step to download the project source code. ```bash git clone https://github.com/username/invest-python.git ``` -------------------------------- ### Manage Sandbox Account Balance and Accounts Source: https://tinkoff.github.io/invest-python/examples Demonstrates how to open a new sandbox account, add money to it, retrieve positions and operations, and then close the account. Requires an INVEST_TOKEN environment variable. ```python import logging import os from datetime import datetime from decimal import Decimal from tinkoff.invest import MoneyValue from tinkoff.invest.sandbox.client import SandboxClient from tinkoff.invest.utils import decimal_to_quotation, quotation_to_decimal TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) logger = logging.getLogger(__name__) def add_money_sandbox(client, account_id, money, currency="rub"): """Function to add money to sandbox account.""" money = decimal_to_quotation(Decimal(money)) return client.sandbox.sandbox_pay_in( account_id=account_id, amount=MoneyValue(units=money.units, nano=money.nano, currency=currency), ) def main(): """Example - How to set/get balance for sandbox account. How to get/close all sandbox accounts. How to open new sandbox account.""" with SandboxClient(TOKEN) as client: # get all sandbox accounts sandbox_accounts = client.users.get_accounts() print(sandbox_accounts) # close all sandbox accounts for sandbox_account in sandbox_accounts.accounts: client.sandbox.close_sandbox_account(account_id=sandbox_account.id) # open new sandbox account sandbox_account = client.sandbox.open_sandbox_account() print(sandbox_account.account_id) account_id = sandbox_account.account_id # add initial 2 000 000 to sandbox account print(add_money_sandbox(client=client, account_id=account_id, money=2000000)) logger.info( "positions: %s", client.operations.get_positions(account_id=account_id) ) print( "money: ", float( quotation_to_decimal( client.operations.get_positions(account_id=account_id).money[0] ) ), ) logger.info("orders: %s", client.orders.get_orders(account_id=account_id)) logger.info( "positions: %s", client.operations.get_positions(account_id=account_id) ) logger.info( "portfolio: %s", client.operations.get_portfolio(account_id=account_id) ) logger.info( "operations: %s", client.operations.get_operations( account_id=account_id, from_=datetime(2023, 1, 1), to=datetime(2023, 2, 5), ), ) logger.info( "withdraw_limits: %s", client.operations.get_withdraw_limits(account_id=account_id), ) # add + 2 000 000 to sandbox account, total is 4 000 000 print(add_money_sandbox(client=client, account_id=account_id, money=2000000)) logger.info( "positions: %s", client.operations.get_positions(account_id=account_id) ) # close new sandbox account sandbox_account = client.sandbox.close_sandbox_account( account_id=sandbox_account.account_id ) print(sandbox_account) if __name__ == "__main__": main() ``` -------------------------------- ### Generate gRPC client Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Commands to download proto files and generate the gRPC client. ```makefile make download-protos ``` ```makefile make gen-grpc ``` ```makefile make gen-client ``` -------------------------------- ### Run development tasks Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Common development tasks for testing, linting, and formatting. ```makefile make test ``` ```makefile make lint ``` ```makefile make format ``` -------------------------------- ### Simple Subscribe to 1-Minute Candle Stream Source: https://tinkoff.github.io/invest-python/examples Subscribes to a stream of 1-minute candles and info for a given FIGI. Requires INVEST_TOKEN environment variable. ```python import os from tinkoff.invest import ( CandleInstrument, Client, InfoInstrument, SubscriptionInterval, ) from tinkoff.invest.services import MarketDataStreamManager TOKEN = os.environ["INVEST_TOKEN"] def main(): with Client(TOKEN) as client: market_data_stream: MarketDataStreamManager = client.create_market_data_stream() market_data_stream.candles.waiting_close().subscribe( [ CandleInstrument( figi="BBG004730N88", interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE, ) ] ) for marketdata in market_data_stream: print(marketdata) market_data_stream.info.subscribe([InfoInstrument(figi="BBG004730N88")]) if marketdata.subscribe_info_response: market_data_stream.stop() if __name__ == "__main__": main() ``` -------------------------------- ### Async Subscribe to 1-Minute Candle Stream Source: https://tinkoff.github.io/invest-python/examples Establishes an asynchronous subscription to a stream of 1-minute candles and info for a given FIGI. Requires INVEST_TOKEN environment variable. ```python import asyncio import os from tinkoff.invest import ( AsyncClient, CandleInstrument, InfoInstrument, SubscriptionInterval, ) from tinkoff.invest.async_services import AsyncMarketDataStreamManager TOKEN = os.environ["INVEST_TOKEN"] async def main(): async with AsyncClient(TOKEN) as client: market_data_stream: AsyncMarketDataStreamManager = ( client.create_market_data_stream() ) market_data_stream.candles.waiting_close().subscribe( [ CandleInstrument( figi="BBG004730N88", interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE, ) ] ) async for marketdata in market_data_stream: print(marketdata) market_data_stream.info.subscribe([InfoInstrument(figi="BBG004730N88")]) if marketdata.subscribe_info_response: market_data_stream.stop() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Configure virtual environment Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Commands to manage virtual environments with Poetry or Python venv. ```bash poetry config virtualenvs.in-project true ``` ```bash python -m venv .venv ``` -------------------------------- ### Download All 1-Minute Candles Source: https://tinkoff.github.io/invest-python/examples Retrieves all available 1-minute candles for a given FIGI using the market data cache. Requires INVEST_TOKEN environment variable. ```python import logging import os from datetime import timedelta from pathlib import Path from tinkoff.invest import CandleInterval, Client from tinkoff.invest.caching.market_data_cache.cache_settings import ( MarketDataCacheSettings, ) from tinkoff.invest.services import MarketDataCache from tinkoff.invest.utils import now TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) def main(): with Client(TOKEN) as client: settings = MarketDataCacheSettings(base_cache_dir=Path("market_data_cache")) market_data_cache = MarketDataCache(settings=settings, services=client) for candle in market_data_cache.get_all_candles( figi="BBG004730N88", from_=now() - timedelta(days=3), interval=CandleInterval.CANDLE_INTERVAL_1_MIN, ): print(candle.time) return 0 if __name__ == "__main__": main() ``` -------------------------------- ### Cache Instruments with InstrumentsCache Source: https://tinkoff.github.io/invest-python/examples Demonstrates caching instrument data using InstrumentsCache. Ensures data consistency between server and cache. ```python import logging import os from pprint import pprint from tinkoff.invest import Client, InstrumentIdType from tinkoff.invest.caching.instruments_cache.instruments_cache import InstrumentsCache from tinkoff.invest.caching.instruments_cache.settings import InstrumentsCacheSettings TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(level=logging.INFO) def main(): with Client(TOKEN) as client: inst = client.instruments.etfs().instruments[-1] pprint(inst) from_server = client.instruments.etf_by( id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_UID, class_code=inst.class_code, id=inst.uid, ) pprint(from_server) settings = InstrumentsCacheSettings() instruments_cache = InstrumentsCache( settings=settings, instruments_service=client.instruments ) from_cache = instruments_cache.etf_by( id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_UID, class_code=inst.class_code, id=inst.uid, ) pprint(from_cache) if str(from_server) != str(from_cache): raise Exception("cache miss") if __name__ == "__main__": main() ``` -------------------------------- ### Async Stream for Minute Candles Source: https://tinkoff.github.io/invest-python/examples Subscribes to a stream of minute candle quotes and prints the received data. Requires INVEST_TOKEN environment variable. ```python import asyncio import os from tinkoff.invest import ( AsyncClient, CandleInstrument, MarketDataRequest, SubscribeCandlesRequest, SubscriptionAction, SubscriptionInterval, ) TOKEN = os.environ["INVEST_TOKEN"] async def main(): async def request_iterator(): yield MarketDataRequest( subscribe_candles_request=SubscribeCandlesRequest( subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE, instruments=[ CandleInstrument( figi="BBG004730N88", interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE, ) ], ) ) while True: await asyncio.sleep(1) async with AsyncClient(TOKEN) as client: async for marketdata in client.market_data_stream.market_data_stream( request_iterator() ): print(marketdata) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Async Retrying Client for Minute Candles Source: https://tinkoff.github.io/invest-python/examples Demonstrates using an asynchronous retrying client to fetch minute candle data for a given FIGI over the past 301 days. Includes basic logging configuration and retry settings. ```python import asyncio import logging import os from datetime import timedelta from tinkoff.invest import CandleInterval from tinkoff.invest.retrying.aio.client import AsyncRetryingClient from tinkoff.invest.retrying.settings import RetryClientSettings from tinkoff.invest.utils import now logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) TOKEN = os.environ["INVEST_TOKEN"] retry_settings = RetryClientSettings(use_retry=True, max_retry_attempt=2) async def main(): async with AsyncRetryingClient(TOKEN, settings=retry_settings) as client: async for candle in client.get_all_candles( figi="BBG000B9XRY4", from_=now() - timedelta(days=301), interval=CandleInterval.CANDLE_INTERVAL_1_MIN, ): print(candle) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Create feature branch Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Create a new branch for your changes. ```bash git checkout -b branch_name ``` -------------------------------- ### Commit changes Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Stage and commit your local changes. ```bash git add . git commit -m "feat: add new feature" ``` -------------------------------- ### Override Target for Sandbox or Production Source: https://tinkoff.github.io/invest-python Switch between the production ('боевой') and sandbox ('песочница') API environments by setting the `target` parameter in the `Client`. The sandbox is for testing and does not execute real orders. ```python from tinkoff.invest import Client from tinkoff.invest.constants import INVEST_GRPC_API TOKEN = 'token' with Client(TOKEN, target=INVEST_GRPC_API) as client: print(client.users.get_accounts()) ``` -------------------------------- ### Create a take-profit stop order Source: https://tinkoff.github.io/invest-python/examples Calculates a target price based on a percentage drop from the last market price and submits a take-profit buy order. Ensures the calculated price is divisible by the instrument's minimum price increment. ```python """Example - How to create takeprofit buy order.""" import logging import os from decimal import Decimal from tinkoff.invest import ( Client, InstrumentIdType, StopOrderDirection, StopOrderExpirationType, StopOrderType, ) from tinkoff.invest.exceptions import InvestError from tinkoff.invest.utils import decimal_to_quotation, quotation_to_decimal TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) logger = logging.getLogger(__name__) def main(): """Example - How to create takeprofit buy order.""" with Client(TOKEN) as client: response = client.users.get_accounts() account, *_ = response.accounts account_id = account.id logger.info("Orders: %s", client.orders.get_orders(account_id=account_id)) figi = "BBG004730ZJ9" # BBG004730ZJ9 - VTBR / BBG004730N88 - SBER # getting the last price for instrument last_price = ( client.market_data.get_last_prices(figi=[figi]).last_prices[0].price ) last_price = quotation_to_decimal(last_price) print(f"figi, last price = {last_price}") # setting the percentage by which the takeprofit stop order # should be set below the last price percent_down = 5 # calculation of the price for takeprofit stop order calculated_price = last_price - last_price * Decimal(percent_down / 100) print(f"calculated_price = {calculated_price}") # getting the min price increment and the number of digits after point min_price_increment = client.instruments.get_instrument_by( id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_FIGI, id=figi ).instrument.min_price_increment number_digits_after_point = 9 - len(str(min_price_increment.nano)) + 1 min_price_increment = quotation_to_decimal(min_price_increment) print( f"min_price_increment = {min_price_increment}, " f"number_digits_after_point={number_digits_after_point}" ) # calculation of the price for instrument which is # divisible to min price increment calculated_price = ( round(calculated_price / min_price_increment) * min_price_increment ) print( f"let's send stop order at price = " f"{calculated_price:.{number_digits_after_point}f} divisible to " f"min price increment {min_price_increment}" ) # creating takeprofit buy order stop_order_type = StopOrderType.STOP_ORDER_TYPE_TAKE_PROFIT direction = StopOrderDirection.STOP_ORDER_DIRECTION_BUY exp_type = StopOrderExpirationType.STOP_ORDER_EXPIRATION_TYPE_GOOD_TILL_CANCEL try: response = client.stop_orders.post_stop_order( figi=figi, quantity=1, price=decimal_to_quotation(Decimal(calculated_price)), stop_price=decimal_to_quotation(Decimal(calculated_price)), direction=direction, account_id=account_id, expiration_type=exp_type, stop_order_type=stop_order_type, expire_date=None, ) print(response) print("stop_order_id=", response.stop_order_id) except InvestError as error: logger.error("Posting trade takeprofit order failed. Exception: %s", error) if __name__ == "__main__": main() ``` -------------------------------- ### Push changes Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Upload your local commits to the remote repository. ```bash git push ``` -------------------------------- ### Moving Average Strategy Implementation Source: https://tinkoff.github.io/invest-python/robots This code implements a moving average trading strategy. It requires environment variables for the API token, FIGI, and account ID. The strategy trades for a specified number of iterations and then plots the trading events. ```python import logging import os from datetime import timedelta from decimal import Decimal from matplotlib import pyplot as plt from tinkoff.invest import CandleInterval, Client from tinkoff.invest.strategies.base.account_manager import AccountManager from tinkoff.invest.strategies.moving_average.plotter import \ MovingAverageStrategyPlotter from tinkoff.invest.strategies.moving_average.signal_executor import \ MovingAverageSignalExecutor from tinkoff.invest.strategies.moving_average.strategy import MovingAverageStrategy from tinkoff.invest.strategies.moving_average.strategy_settings import \ MovingAverageStrategySettings from tinkoff.invest.strategies.moving_average.strategy_state import \ MovingAverageStrategyState from tinkoff.invest.strategies.moving_average.supervisor import \ MovingAverageStrategySupervisor from tinkoff.invest.strategies.moving_average.trader import MovingAverageStrategyTrader logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.INFO) logger = logging.getLogger(__name__) TOKEN = os.environ["INVEST_TOKEN"] FIGI = os.environ["INVEST_FIGI"] ACCOUNT_ID = os.environ["INVEST_ACCOUNT_ID"] def main(): with Client(TOKEN) as services: settings = MovingAverageStrategySettings( share_id=FIGI, account_id=ACCOUNT_ID, max_transaction_price=Decimal(10000), candle_interval=CandleInterval.CANDLE_INTERVAL_1_MIN, long_period=timedelta(minutes=100), short_period=timedelta(minutes=20), std_period=timedelta(minutes=30), ) account_manager = AccountManager(services=services, strategy_settings=settings) state = MovingAverageStrategyState() strategy = MovingAverageStrategy( settings=settings, account_manager=account_manager, state=state, ) signal_executor = MovingAverageSignalExecutor( services=services, state=state, settings=settings, ) supervisor = MovingAverageStrategySupervisor() trader = MovingAverageStrategyTrader( strategy=strategy, settings=settings, services=services, state=state, signal_executor=signal_executor, account_manager=account_manager, supervisor=supervisor, ) plotter = MovingAverageStrategyPlotter(settings=settings) initial_balance = account_manager.get_current_balance() for i in range(5): logger.info("Trade %s", i) trader.trade() current_balance = account_manager.get_current_balance() logger.info("Initial balance %s", initial_balance) logger.info("Current balance %s", current_balance) events = supervisor.get_events() plotter.plot(events) plt.show() ``` -------------------------------- ### Subscribe to one-minute candle stream Source: https://tinkoff.github.io/invest-python/examples Uses a request iterator to subscribe to real-time one-minute candle updates for a specific instrument. Requires an active INVEST_TOKEN environment variable. ```python import os import time from tinkoff.invest import ( CandleInstrument, Client, MarketDataRequest, SubscribeCandlesRequest, SubscriptionAction, SubscriptionInterval, ) TOKEN = os.environ["INVEST_TOKEN"] def main(): def request_iterator(): yield MarketDataRequest( subscribe_candles_request=SubscribeCandlesRequest( waiting_close=True, subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE, instruments=[ CandleInstrument( figi="BBG004730N88", interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE, ) ], ) ) while True: time.sleep(1) with Client(TOKEN) as client: for marketdata in client.market_data_stream.market_data_stream( request_iterator() ): print(marketdata) if __name__ == "__main__": main() ``` -------------------------------- ### Find Instrument by Query Source: https://tinkoff.github.io/invest-python/examples Searches for instruments matching a given query string. Useful for finding specific financial instruments. ```python import os from tinkoff.invest import Client TOKEN = os.environ["INVEST_TOKEN"] def main(): with Client(TOKEN) as client: r = client.instruments.find_instrument(query="BBG001M2SC01") for i in r.instruments: print(i) if __name__ == "__main__": main() ``` -------------------------------- ### Subscribe to Portfolio Stream Source: https://tinkoff.github.io/invest-python/examples Subscribes to real-time portfolio updates for specified accounts and prints received portfolio data. ```python import os from tinkoff.invest import Client TOKEN = os.environ["INVEST_TOKEN"] def main(): with Client(TOKEN) as client: accounts = client.users.get_accounts() for portfolio in client.operations_stream.portfolio_stream( accounts=[acc.id for acc in accounts.accounts] ): print(portfolio) if __name__ == "__main__": main() ``` -------------------------------- ### Commit message format Source: https://tinkoff.github.io/invest-python/CONTRIBUTING Template for Conventional Commits. ```text (): │ │ │ │ │ └─⫸ Summary in present tense. Not capitalized. No period at the end. │ │ │ └─⫸ Commit Scope: grpc, async, mypy, schemas, sandbox │ └─⫸ Commit Type: feat|fix|build|ci|docs|perf|refactor|test|chore ``` -------------------------------- ### Subscribe to Positions Stream Source: https://tinkoff.github.io/invest-python/examples Subscribes to real-time positions updates for specified accounts and prints received position data. ```python import os from tinkoff.invest import Client TOKEN = os.environ["INVEST_TOKEN"] def main(): with Client(TOKEN) as client: response = client.users.get_accounts() accounts = [account.id for account in response.accounts] for response in client.operations_stream.positions_stream(accounts=accounts): print(response) if __name__ == "__main__": main() ``` -------------------------------- ### Cancel All Stop Orders Source: https://tinkoff.github.io/invest-python/examples Use this to cancel all active stop orders for a given account. Ensure you have the INVEST_TOKEN environment variable set. ```python """Example - How to cancel all stop orders.""" import logging import os from tinkoff.invest import Client from tinkoff.invest.exceptions import InvestError TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG) logger = logging.getLogger(__name__) def main(): """Example - How to cancel all stop orders.""" with Client(TOKEN) as client: response = client.users.get_accounts() account, *_ = response.accounts account_id = account.id try: stop_orders_response = client.stop_orders.get_stop_orders( account_id=account_id ) logger.info("Stop Orders: %s", stop_orders_response) for stop_order in stop_orders_response.stop_orders: client.stop_orders.cancel_stop_order( account_id=account_id, stop_order_id=stop_order.stop_order_id ) logger.info("Stop Order: %s was canceled.", stop_order.stop_order_id) logger.info( "Orders: %s", client.stop_orders.get_stop_orders(account_id=account_id) ) except InvestError as error: logger.logger.error("Failed to cancel all orders. Error: %s", error) if __name__ == "__main__": main() ``` -------------------------------- ### Cancel All Orders Source: https://tinkoff.github.io/invest-python/examples Cancels all outstanding orders for the user's account. It first retrieves the account ID and then calls the cancel_all_orders method. Requires INVEST_TOKEN environment variable. ```python import logging import os from tinkoff.invest import Client TOKEN = os.environ["INVEST_TOKEN"] logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) def main(): with Client(TOKEN) as client: response = client.users.get_accounts() account, *_ = response.accounts account_id = account.id logger.info("Orders: %s", client.orders.get_orders(account_id=account_id)) client.cancel_all_orders(account_id=account.id) logger.info("Orders: %s", client.orders.get_orders(account_id=account_id)) if __name__ == "__main__": main() ``` -------------------------------- ### Log Request Errors Source: https://tinkoff.github.io/invest-python/examples Logs specific request errors, including tracking ID and error code. Useful for debugging API interactions. ```python import logging import os from tinkoff.invest import Client, RequestError TOKEN = os.environ["INVEST_TOKEN"] logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.INFO) logger = logging.getLogger(__name__) def main(): with Client(TOKEN) as client: _ = client.users.get_accounts().accounts try: client.users.get_margin_attributes(account_id="123") except RequestError as err: tracking_id = err.metadata.tracking_id if err.metadata else "" logger.error("Error tracking_id=%s code=%s", tracking_id, str(err.code)) if __name__ == "__main__": main() ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.