Примеры
Больше примеров доступно здесь.
Получение и вывод в консоль свечей с часовым интервалом за год
import os
from datetime import timedelta
from tinkoff.invest import CandleInterval, Client
from tinkoff.invest.schemas import CandleSource
from tinkoff.invest.utils import now
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with Client(TOKEN) as client:
for candle in client.get_all_candles(
instrument_id="BBG004730N88",
from_=now() - timedelta(days=365),
interval=CandleInterval.CANDLE_INTERVAL_HOUR,
candle_source_type=CandleSource.CANDLE_SOURCE_UNSPECIFIED,
):
print(candle)
return 0
if __name__ == "__main__":
main()
Асинхронная функция получения и вывода в консоль свечей с часовым интервалом за год
import asyncio
import os
from datetime import timedelta
from tinkoff.invest import AsyncClient, CandleInterval
from tinkoff.invest.schemas import CandleSource
from tinkoff.invest.utils import now
TOKEN = os.environ["INVEST_TOKEN"]
async def main():
async with AsyncClient(TOKEN) as client:
async for candle in client.get_all_candles(
instrument_id="BBG004730N88",
from_=now() - timedelta(days=365),
interval=CandleInterval.CANDLE_INTERVAL_HOUR,
candle_source_type=CandleSource.CANDLE_SOURCE_EXCHANGE,
):
print(candle)
if __name__ == "__main__":
asyncio.run(main())
Асинхронная функция получения и вывода счетов пользователя
import asyncio
import os
from tinkoff.invest import AsyncClient
TOKEN = os.environ["INVEST_TOKEN"]
async def main():
async with AsyncClient(TOKEN) as client:
print(await client.users.get_accounts())
if __name__ == "__main__":
asyncio.run(main())
Асинхронная функция получения и вывода минутных свечей
examples/async_retrying_client.py
import asyncio
import logging
import os
from datetime import timedelta
from tinkoff.invest import CandleInterval
from tinkoff.invest.retrying.aio.client import AsyncRetryingClient
from tinkoff.invest.retrying.settings import RetryClientSettings
from tinkoff.invest.utils import now
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
TOKEN = os.environ["INVEST_TOKEN"]
retry_settings = RetryClientSettings(use_retry=True, max_retry_attempt=2)
async def main():
async with AsyncRetryingClient(TOKEN, settings=retry_settings) as client:
async for candle in client.get_all_candles(
figi="BBG000B9XRY4",
from_=now() - timedelta(days=301),
interval=CandleInterval.CANDLE_INTERVAL_1_MIN,
):
print(candle)
if __name__ == "__main__":
asyncio.run(main())
Подписка на стрим котировок по минутным свечам и вывод получаемых свечей в консоль
examples/async_stream_client.py
import asyncio
import os
from tinkoff.invest import (
AsyncClient,
CandleInstrument,
MarketDataRequest,
SubscribeCandlesRequest,
SubscriptionAction,
SubscriptionInterval,
)
TOKEN = os.environ["INVEST_TOKEN"]
async def main():
async def request_iterator():
yield MarketDataRequest(
subscribe_candles_request=SubscribeCandlesRequest(
subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
instruments=[
CandleInstrument(
figi="BBG004730N88",
interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE,
)
],
)
)
while True:
await asyncio.sleep(1)
async with AsyncClient(TOKEN) as client:
async for marketdata in client.market_data_stream.market_data_stream(
request_iterator()
):
print(marketdata)
if __name__ == "__main__":
asyncio.run(main())
Отмена всех выставленных поручений
import logging
import os
from tinkoff.invest import Client
TOKEN = os.environ["INVEST_TOKEN"]
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def main():
with Client(TOKEN) as client:
response = client.users.get_accounts()
account, *_ = response.accounts
account_id = account.id
logger.info("Orders: %s", client.orders.get_orders(account_id=account_id))
client.cancel_all_orders(account_id=account.id)
logger.info("Orders: %s", client.orders.get_orders(account_id=account_id))
if __name__ == "__main__":
main()
Функция получения и вывода счетов клиента
import os
from tinkoff.invest import Client
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with Client(TOKEN) as client:
print(client.users.get_accounts())
if __name__ == "__main__":
main()
Загрузка и вывод всех минутных свечей по интрументу
examples/download_all_candles.py
import logging
import os
from datetime import timedelta
from pathlib import Path
from tinkoff.invest import CandleInterval, Client
from tinkoff.invest.caching.market_data_cache.cache import MarketDataCache
from tinkoff.invest.caching.market_data_cache.cache_settings import (
MarketDataCacheSettings,
)
from tinkoff.invest.utils import now
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG)
def main():
with Client(TOKEN) as client:
settings = MarketDataCacheSettings(base_cache_dir=Path("market_data_cache"))
market_data_cache = MarketDataCache(settings=settings, services=client)
for candle in market_data_cache.get_all_candles(
figi="BBG004730N88",
from_=now() - timedelta(days=1),
interval=CandleInterval.CANDLE_INTERVAL_HOUR,
):
print(candle.time, candle.is_complete)
return 0
if __name__ == "__main__":
main()
Асинхронная подписка на стрим минутных свечей
examples/easy_async_stream_client.py
import asyncio
import os
from tinkoff.invest import (
AsyncClient,
CandleInstrument,
InfoInstrument,
MarketDataResponse,
SubscriptionInterval,
TradeInstrument,
)
from tinkoff.invest.async_services import AsyncMarketDataStreamManager
TOKEN = os.environ["INVEST_TOKEN"]
async def main():
async with AsyncClient(TOKEN) as client:
market_data_stream: AsyncMarketDataStreamManager = (
client.create_market_data_stream()
)
market_data_stream.candles.waiting_close().subscribe(
[
CandleInstrument(
figi="BBG004730N88",
interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE,
)
]
)
market_data_stream.trades.subscribe(
[
TradeInstrument(
figi="BBG004730N88",
)
]
)
async for marketdata in market_data_stream:
marketdata: MarketDataResponse = marketdata
print(marketdata)
market_data_stream.info.subscribe([InfoInstrument(figi="BBG004730N88")])
if marketdata.subscribe_info_response:
market_data_stream.stop()
if __name__ == "__main__":
asyncio.run(main())
Простая подписка на стрим минутных свечей
examples/easy_stream_client.py
import os
from tinkoff.invest import (
CandleInstrument,
Client,
InfoInstrument,
SubscriptionInterval,
)
from tinkoff.invest.services import MarketDataStreamManager
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with Client(TOKEN) as client:
market_data_stream: MarketDataStreamManager = client.create_market_data_stream()
market_data_stream.candles.waiting_close().subscribe(
[
CandleInstrument(
figi="BBG004730N88",
interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE,
)
]
)
for marketdata in market_data_stream:
print(marketdata)
market_data_stream.info.subscribe([InfoInstrument(figi="BBG004730N88")])
if marketdata.subscribe_info_response:
market_data_stream.stop()
if __name__ == "__main__":
main()
Получение списка операций и их постраничный вывод
examples/get_operations_by_cursor.py
import os
from pprint import pprint
from tinkoff.invest import Client, GetOperationsByCursorRequest
token = os.environ["INVEST_TOKEN"]
with Client(token) as client:
accounts = client.users.get_accounts()
account_id = accounts.accounts[0].id
def get_request(cursor=""):
return GetOperationsByCursorRequest(
account_id=account_id,
instrument_id="BBG004730N88",
cursor=cursor,
limit=1,
)
operations = client.operations.get_operations_by_cursor(get_request())
print(operations)
depth = 10
while operations.has_next and depth > 0:
request = get_request(cursor=operations.next_cursor)
operations = client.operations.get_operations_by_cursor(request)
pprint(operations)
depth -= 1
Функция кэширования инструментов
import logging
import os
from pprint import pprint
from tinkoff.invest import Client, InstrumentIdType
from tinkoff.invest.caching.instruments_cache.instruments_cache import InstrumentsCache
from tinkoff.invest.caching.instruments_cache.settings import InstrumentsCacheSettings
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(level=logging.INFO)
def main():
with Client(TOKEN) as client:
inst = client.instruments.etfs().instruments[-1]
pprint(inst)
from_server = client.instruments.etf_by(
id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_UID,
class_code=inst.class_code,
id=inst.uid,
)
pprint(from_server)
settings = InstrumentsCacheSettings()
instruments_cache = InstrumentsCache(
settings=settings, instruments_service=client.instruments
)
from_cache = instruments_cache.etf_by(
id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_UID,
class_code=inst.class_code,
id=inst.uid,
)
pprint(from_cache)
if str(from_server) != str(from_cache):
raise Exception("cache miss")
if __name__ == "__main__":
main()
Функция получения списка инструментов подходящих под строку query
import os
from tinkoff.invest import Client
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with Client(TOKEN) as client:
r = client.instruments.find_instrument(query="BBG001M2SC01")
for i in r.instruments:
print(i)
if __name__ == "__main__":
main()
Функция логгирования ошибок
import logging
import os
from tinkoff.invest import Client, RequestError
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
def main():
with Client(TOKEN) as client:
_ = client.users.get_accounts().accounts
try:
client.users.get_margin_attributes(account_id="123")
except RequestError as err:
tracking_id = err.metadata.tracking_id if err.metadata else ""
logger.error("Error tracking_id=%s code=%s", tracking_id, str(err.code))
if __name__ == "__main__":
main()
Подписка на стрим портфолио и вывод информации
examples/porfolio_stream_client.py
import os
from tinkoff.invest import Client
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with Client(TOKEN) as client:
accounts = client.users.get_accounts()
for portfolio in client.operations_stream.portfolio_stream(
accounts=[acc.id for acc in accounts.accounts], ping_delay_ms=60_000
):
print(portfolio)
if __name__ == "__main__":
main()
Подписка на стрим позиций и вывод информации
import os
from tinkoff.invest import Client
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with Client(TOKEN) as client:
response = client.users.get_accounts()
accounts = [account.id for account in response.accounts]
for response in client.operations_stream.positions_stream(
accounts=accounts, with_initial_positions=True
): # noqa:E501 # pylint:disable=line-too-long
print(response)
if __name__ == "__main__":
main()
Функция получения и вывода минутных свечей
import logging
import os
from datetime import timedelta
from tinkoff.invest import CandleInterval
from tinkoff.invest.retrying.settings import RetryClientSettings
from tinkoff.invest.retrying.sync.client import RetryingClient
from tinkoff.invest.utils import now
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
TOKEN = os.environ["INVEST_TOKEN"]
retry_settings = RetryClientSettings(use_retry=True, max_retry_attempt=2)
with RetryingClient(TOKEN, settings=retry_settings) as client:
for candle in client.get_all_candles(
figi="BBG000B9XRY4",
from_=now() - timedelta(days=301),
interval=CandleInterval.CANDLE_INTERVAL_1_MIN,
):
print(candle)
Получение и вывод информации об аккаунте пользователя в песочнице
import os
from tinkoff.invest.sandbox.client import SandboxClient
TOKEN = os.environ["INVEST_TOKEN"]
def main():
with SandboxClient(TOKEN) as client:
print(client.users.get_info())
if __name__ == "__main__":
main()
Подписка на стрим минутных свечей и их вывод
import os
import time
from tinkoff.invest import (
CandleInstrument,
Client,
MarketDataRequest,
SubscribeCandlesRequest,
SubscriptionAction,
SubscriptionInterval,
)
from tinkoff.invest.schemas import CandleSource
TOKEN = os.environ["INVEST_TOKEN"]
def main():
def request_iterator():
yield MarketDataRequest(
subscribe_candles_request=SubscribeCandlesRequest(
waiting_close=True,
subscription_action=SubscriptionAction.SUBSCRIPTION_ACTION_SUBSCRIBE,
candle_source_type=CandleSource.CANDLE_SOURCE_EXCHANGE,
instruments=[
CandleInstrument(
figi="BBG004730N88",
interval=SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE,
)
],
)
)
while True:
time.sleep(1)
with Client(TOKEN) as client:
for marketdata in client.market_data_stream.market_data_stream(
request_iterator()
):
print(marketdata)
if __name__ == "__main__":
main()
Создание тэйк-профит стоп ордера
examples/wiseplat_create_take_profit_stop_order.py
"""Example - How to create takeprofit buy order."""
import logging
import os
from decimal import Decimal
from tinkoff.invest import (
Client,
InstrumentIdType,
StopOrderDirection,
StopOrderExpirationType,
StopOrderType,
)
from tinkoff.invest.exceptions import InvestError
from tinkoff.invest.utils import decimal_to_quotation, quotation_to_decimal
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
def main():
"""Example - How to create takeprofit buy order."""
with Client(TOKEN) as client:
response = client.users.get_accounts()
account, *_ = response.accounts
account_id = account.id
logger.info("Orders: %s", client.orders.get_orders(account_id=account_id))
figi = "BBG004730ZJ9" # BBG004730ZJ9 - VTBR / BBG004730N88 - SBER
# getting the last price for instrument
last_price = (
client.market_data.get_last_prices(figi=[figi]).last_prices[0].price
)
last_price = quotation_to_decimal(last_price)
print(f"figi, last price = {last_price}")
# setting the percentage by which the takeprofit stop order
# should be set below the last price
percent_down = 5
# calculation of the price for takeprofit stop order
calculated_price = last_price - last_price * Decimal(percent_down / 100)
print(f"calculated_price = {calculated_price}")
# getting the min price increment and the number of digits after point
min_price_increment = client.instruments.get_instrument_by(
id_type=InstrumentIdType.INSTRUMENT_ID_TYPE_FIGI, id=figi
).instrument.min_price_increment
number_digits_after_point = 9 - len(str(min_price_increment.nano)) + 1
min_price_increment = quotation_to_decimal(min_price_increment)
print(
f"min_price_increment = {min_price_increment}, "
f"number_digits_after_point={number_digits_after_point}"
)
# calculation of the price for instrument which is
# divisible to min price increment
calculated_price = (
round(calculated_price / min_price_increment) * min_price_increment
)
print(
f"let's send stop order at price = "
f"{calculated_price:.{number_digits_after_point}f} divisible to "
f"min price increment {min_price_increment}"
)
# creating takeprofit buy order
stop_order_type = StopOrderType.STOP_ORDER_TYPE_TAKE_PROFIT
direction = StopOrderDirection.STOP_ORDER_DIRECTION_BUY
exp_type = StopOrderExpirationType.STOP_ORDER_EXPIRATION_TYPE_GOOD_TILL_CANCEL
try:
response = client.stop_orders.post_stop_order(
figi=figi,
quantity=1,
price=decimal_to_quotation(Decimal(calculated_price)),
stop_price=decimal_to_quotation(Decimal(calculated_price)),
direction=direction,
account_id=account_id,
expiration_type=exp_type,
stop_order_type=stop_order_type,
expire_date=None,
)
print(response)
print("stop_order_id=", response.stop_order_id)
except InvestError as error:
logger.error("Posting trade takeprofit order failed. Exception: %s", error)
if __name__ == "__main__":
main()
Отмена всех выставленных стоп ордеров
examples/wiseplat_cancel_all_stop_orders.py
"""Example - How to cancel all stop orders."""
import logging
import os
from tinkoff.invest import Client
from tinkoff.invest.exceptions import InvestError
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
def main():
"""Example - How to cancel all stop orders."""
with Client(TOKEN) as client:
response = client.users.get_accounts()
account, *_ = response.accounts
account_id = account.id
try:
stop_orders_response = client.stop_orders.get_stop_orders(
account_id=account_id
)
logger.info("Stop Orders: %s", stop_orders_response)
for stop_order in stop_orders_response.stop_orders:
client.stop_orders.cancel_stop_order(
account_id=account_id, stop_order_id=stop_order.stop_order_id
)
logger.info("Stop Order: %s was canceled.", stop_order.stop_order_id)
logger.info(
"Orders: %s", client.stop_orders.get_stop_orders(account_id=account_id)
)
except InvestError as error:
logger.error("Failed to cancel all orders. Error: %s", error)
if __name__ == "__main__":
main()
Получение figi для тикера
examples/wiseplat_get_figi_for_ticker.py
"""Example - How to get figi by name of ticker."""
import logging
import os
from pandas import DataFrame
from tinkoff.invest import Client, SecurityTradingStatus
from tinkoff.invest.services import InstrumentsService
from tinkoff.invest.utils import quotation_to_decimal
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
def main():
"""Example - How to get figi by name of ticker."""
ticker = "VTBR" # "BRH3" "SBER" "VTBR"
with Client(TOKEN) as client:
instruments: InstrumentsService = client.instruments
tickers = []
for method in ["shares", "bonds", "etfs", "currencies", "futures"]:
for item in getattr(instruments, method)().instruments:
tickers.append(
{
"name": item.name,
"ticker": item.ticker,
"class_code": item.class_code,
"figi": item.figi,
"uid": item.uid,
"type": method,
"min_price_increment": quotation_to_decimal(
item.min_price_increment
),
"scale": 9 - len(str(item.min_price_increment.nano)) + 1,
"lot": item.lot,
"trading_status": str(
SecurityTradingStatus(item.trading_status).name
),
"api_trade_available_flag": item.api_trade_available_flag,
"currency": item.currency,
"exchange": item.exchange,
"buy_available_flag": item.buy_available_flag,
"sell_available_flag": item.sell_available_flag,
"short_enabled_flag": item.short_enabled_flag,
"klong": quotation_to_decimal(item.klong),
"kshort": quotation_to_decimal(item.kshort),
}
)
tickers_df = DataFrame(tickers)
ticker_df = tickers_df[tickers_df["ticker"] == ticker]
if ticker_df.empty:
logger.error("There is no such ticker: %s", ticker)
return
figi = ticker_df["figi"].iloc[0]
print(f"\nTicker {ticker} have figi={figi}\n")
print(f"Additional info for this {ticker} ticker:")
print(ticker_df.iloc[0])
if __name__ == "__main__":
main()
Получение / установка баланса для песочницы. Получение / закрытие всех песочниц. Создание новой песочницы.
examples/wiseplat_set_get_sandbox_balance.py
""" Example - How to set/get balance for sandbox account.
How to get/close all sandbox accounts.
How to open new sandbox account. """
import logging
import os
from datetime import datetime
from decimal import Decimal
from tinkoff.invest import MoneyValue
from tinkoff.invest.sandbox.client import SandboxClient
from tinkoff.invest.utils import decimal_to_quotation, quotation_to_decimal
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
def add_money_sandbox(client, account_id, money, currency="rub"):
"""Function to add money to sandbox account."""
money = decimal_to_quotation(Decimal(money))
return client.sandbox.sandbox_pay_in(
account_id=account_id,
amount=MoneyValue(units=money.units, nano=money.nano, currency=currency),
)
def main():
"""Example - How to set/get balance for sandbox account.
How to get/close all sandbox accounts.
How to open new sandbox account."""
with SandboxClient(TOKEN) as client:
# get all sandbox accounts
sandbox_accounts = client.users.get_accounts()
print(sandbox_accounts)
# close all sandbox accounts
for sandbox_account in sandbox_accounts.accounts:
client.sandbox.close_sandbox_account(account_id=sandbox_account.id)
# open new sandbox account
sandbox_account = client.sandbox.open_sandbox_account()
print(sandbox_account.account_id)
account_id = sandbox_account.account_id
# add initial 2 000 000 to sandbox account
print(add_money_sandbox(client=client, account_id=account_id, money=2000000))
logger.info(
"positions: %s", client.operations.get_positions(account_id=account_id)
)
print(
"money: ",
float(
quotation_to_decimal(
client.operations.get_positions(account_id=account_id).money[0]
)
),
)
logger.info("orders: %s", client.orders.get_orders(account_id=account_id))
logger.info(
"positions: %s", client.operations.get_positions(account_id=account_id)
)
logger.info(
"portfolio: %s", client.operations.get_portfolio(account_id=account_id)
)
logger.info(
"operations: %s",
client.operations.get_operations(
account_id=account_id,
from_=datetime(2023, 1, 1),
to=datetime(2023, 2, 5),
),
)
logger.info(
"withdraw_limits: %s",
client.operations.get_withdraw_limits(account_id=account_id),
)
# add + 2 000 000 to sandbox account, total is 4 000 000
print(add_money_sandbox(client=client, account_id=account_id, money=2000000))
logger.info(
"positions: %s", client.operations.get_positions(account_id=account_id)
)
# close new sandbox account
sandbox_account = client.sandbox.close_sandbox_account(
account_id=sandbox_account.account_id
)
print(sandbox_account)
if __name__ == "__main__":
main()
Пример live стратегии для нескольких тикеров. Вывод OHLCV для каждой сформировавшейся свечи.
examples/wiseplat_live_strategy_print_ohlcv.py
"""
This code is an example of applying Trading Strategy for several Tickers.
The strategy in this code is for demonstration only purposes
- it outputs OHLCV values.
Author: Oleg Shpagin, my github: https://github.com/WISEPLAT
"""
import asyncio
import logging
import os
from datetime import timedelta
from typing import List, Optional
from tinkoff.invest import AioRequestError, AsyncClient, CandleInterval, HistoricCandle
from tinkoff.invest.async_services import AsyncServices
from tinkoff.invest.utils import now
TOKEN = os.environ["INVEST_TOKEN"]
logging.basicConfig(format="%(asctime)s %(levelname)s:%(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
class LogOnlyCandlesStrategy:
"""This class is responsible for a strategy. You can put here
your methods for your strategy."""
def __init__(
self,
figi: str,
timeframe: CandleInterval,
days_back: int,
check_interval: int,
client: Optional[AsyncServices],
):
self.account_id = None
self.figi = figi
self.timeframe = timeframe
self.days_back = days_back
self.check_interval = check_interval
self.client = client
self.candles: List[HistoricCandle] = []
async def get_historical_data(self):
"""
Gets historical data for the instrument. Returns list of candles.
Requests all the candles of timeframe from days_back to now.
:return: list of HistoricCandle
"""
logger.debug(
"Start getting historical data for %s days back from now. figi=%s",
self.days_back,
self.figi,
)
async for candle in self.client.get_all_candles(
figi=self.figi,
from_=now() - timedelta(days=self.days_back),
to=now(),
interval=self.timeframe,
):
if candle not in self.candles:
if candle.is_complete:
self.candles.append(candle)
logger.debug("Found %s - figi=%s", candle, self.figi)
async def ensure_market_open(self):
"""
Ensure that the market is open. Loop until the instrument is available.
:return: when instrument is available for trading
"""
trading_status = await self.client.market_data.get_trading_status(
figi=self.figi
)
while not (
trading_status.market_order_available_flag
and trading_status.api_trade_available_flag
):
logger.debug("Waiting for the market to open. figi=%s", self.figi)
await asyncio.sleep(60)
trading_status = await self.client.market_data.get_trading_status(
figi=self.figi
)
async def main_cycle(self):
"""Main cycle for live strategy."""
while True:
try:
await self.ensure_market_open()
await self.get_historical_data()
# put your strategy code here for live
# to generate signals for buying or selling tickers
logger.debug(
"- live mode: run some strategy code to buy or sell - figi=%s",
self.figi,
)
except AioRequestError as are:
logger.error("Client error %s", are)
await asyncio.sleep(self.check_interval)
async def start(self):
"""Strategy starts from this function."""
if self.account_id is None:
try:
self.account_id = (
(await self.client.users.get_accounts()).accounts.pop().id
)
except AioRequestError as are:
logger.error("Error taking account id. Stopping strategy. %s", are)
return
await self.main_cycle()
async def run_strategy(portfolio, timeframe, days_back, check_interval):
"""From this function we are starting
strategy for every ticker from portfolio.
"""
async with AsyncClient(token=TOKEN, app_name="TinkoffApp") as client:
strategy_tasks = []
for instrument in portfolio:
strategy = LogOnlyCandlesStrategy(
figi=instrument,
timeframe=timeframe,
days_back=days_back,
check_interval=check_interval,
client=client,
)
strategy_tasks.append(asyncio.create_task(strategy.start()))
await asyncio.gather(*strategy_tasks)
if __name__ == "__main__":
sber_figi = "BBG004730N88"
vtbr_figi = "BBG004730ZJ9"
portfolio = {sber_figi, vtbr_figi}
timeframe = CandleInterval.CANDLE_INTERVAL_1_MIN
days_back = 1
check_interval = 10 # seconds to check interval for new completed candle
loop = asyncio.get_event_loop()
task = loop.create_task(
run_strategy(
portfolio=portfolio,
timeframe=timeframe,
days_back=days_back,
check_interval=check_interval,
)
)
loop.run_until_complete(task)