tqsdk.api - 框架及核心业务

天勤接口的PYTHON封装, 提供以下功能

class tqsdk.api.TqApi(account=None, url=None, backtest=None, debug=None, loop=None, _ins_url=None, _md_url=None, _td_url=None)

天勤接口及数据管理类

该类中所有参数只针对天勤外部IDE编写使用, 在天勤内使用 api = TqApi() 即可指定为当前天勤终端登录用户

通常情况下, 一个线程中应该只有一个TqApi的实例, 它负责维护网络连接, 接收行情及账户数据, 并在内存中维护业务数据截面

创建天勤接口实例

Args:
account (None/TqAccount/TqSim/TqApi): [可选]交易账号:
  • None: 账号将根据命令行参数决定, 默认为 TqSim

  • TqAccount : 使用实盘账号, 直连行情和交易服务器(不通过天勤终端), 需提供期货公司/帐号/密码

  • TqSim : 使用 TqApi 自带的内部模拟账号

  • TqApi : 对 master TqApi 创建一个 slave 副本, 以便在其它线程中使用

url (str): [可选]指定服务器的地址
  • 当 account 为 TqAccount 类型时, 可以通过该参数指定交易服务器地址, 默认使用 wss://opentd.shinnytech.com/trade/user0, 行情始终使用 wss://openmd.shinnytech.com/t/md/front/mobile

  • 当 account 为 TqSim 类型时, 可以通过该参数指定行情服务器地址, 默认使用 wss://openmd.shinnytech.com/t/md/front/mobile

backtest (TqBacktest): [可选]传入 TqBacktest 对象将进入回测模式, 回测模式下会强制使用 account 为 TqSim 并只连接 wss://openmd.shinnytech.com/t/md/front/mobile 接收行情数据

debug(str): [可选] 指定一个日志文件名, 将调试信息输出到指定文件. 默认不输出.

loop(asyncio.AbstractEventLoop): [可选]使用指定的 IOLoop, 默认创建一个新的.

Example:

# 使用实盘帐号直连行情和交易服务器
from tqsdk import TqApi, TqAccount
api = TqApi(TqAccount("H海通期货", "022631", "123456"))

# 使用模拟帐号直连行情服务器
from tqsdk import TqApi, TqSim
api = TqApi(TqSim())

# 进行策略回测
from datetime import date
from tqsdk import TqApi, TqSim, TqBacktest
api = TqApi(TqSim(), backtest=TqBacktest(start_dt=date(2018, 5, 1), end_dt=date(2018, 10, 1)))
copy() → tqsdk.api.TqApi

创建当前TqApi的一个副本. 这个副本可以在另一个线程中使用

Returns:

TqApi: 返回当前TqApi的一个副本. 这个副本可以在另一个线程中使用

close()

关闭天勤接口实例并释放相应资源

Example:

# m1901开多3手
from tqsdk import TqApi, TqSim
from contextlib import closing

with closing(TqApi(TqSim())) as api:
    api.insert_order(symbol="DCE.m1901", direction="BUY", offset="OPEN", volume=3)
get_quote(symbol) → tqsdk.objs.Quote

获取指定合约的盘口行情.

Args:
symbol (str): 指定合约代码。注意:天勤接口从0.8版本开始,合约代码格式变更为 交易所代码.合约代码 的格式. 可用的交易所代码如下:
  • CFFEX: 中金所

  • SHFE: 上期所

  • DCE: 大商所

  • CZCE: 郑商所

  • INE: 能源交易所(原油)

Returns:

Quote: 返回一个盘口行情引用. 其内容将在 wait_update() 时更新.

注意: 在 tqsdk 还没有收到行情数据包时, 此对象中各项内容为 NaN 或 0

Example:

# 获取 SHFE.cu1812 合约的报价
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
quote = api.get_quote("SHFE.cu1812")
print(quote.last_price)
while api.wait_update():
    print(quote.last_price)

#以上代码将输出
nan
24575.0
24575.0
...
get_kline_serial(symbol, duration_seconds, data_length=200, chart_id=None) → pandas.core.frame.DataFrame

获取k线序列数据

请求指定合约及周期的K线数据. 序列数据会随着时间推进自动更新

Args:

symbol (str): 指定合约代码.

duration_seconds (int): K线数据周期, 以秒为单位。例如: 1分钟线为60,1小时线为3600,日线为86400。 注意: 周期在日线以内时此参数可以任意填写, 在日线以上时只能是日线(86400)的整数倍

data_length (int): 需要获取的序列长度。默认200根, 返回的K线序列数据是从当前最新一根K线开始往回取data_length根。 每个序列最大支持请求 8964 个数据

chart_id (str): [可选]指定序列id, 默认由 api 自动生成

Returns:

pandas.DataFrame: 本函数总是返回一个 pandas.DataFrame 实例. 行数=data_length, 包含以下列:

  • id: int, 1234 (k线序列号)

  • datetime: int, 1501080715000000000 (K线起点时间(按北京时间),自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数)

  • open: float, 51450.0 (K线起始时刻的最新价)

  • high: float, 51450.0 (K线时间范围内的最高价)

  • low: float, 51450.0 (K线时间范围内的最低价)

  • close: float, 51450.0 (K线结束时刻的最新价)

  • volume: int, 11 (K线时间范围内的成交量)

  • open_oi: int, 27354 (K线起始时刻的持仓量)

  • close_oi: int, 27355 (K线结束时刻的持仓量)

Example:

# 获取 SHFE.cu1812 的1分钟线
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
k_serial = api.get_kline_serial("SHFE.cu1812", 60)
while True:
    api.wait_update()
    print(k_serial.iloc[-1].close)

# 预计的输出是这样的:
50970.0
50970.0
50960.0
...
get_tick_serial(symbol, data_length=200, chart_id=None) → pandas.core.frame.DataFrame

获取tick序列数据

请求指定合约的Tick序列数据. 序列数据会随着时间推进自动更新

Args:

symbol (str): 指定合约代码.

data_length (int): 需要获取的序列长度。每个序列最大支持请求 8964 个数据

chart_id (str): [可选]指定序列id, 默认由 api 自动生成

Returns:

pandas.DataFrame: 本函数总是返回一个 pandas.DataFrame 实例. 行数=data_length, 包含以下列:

  • id: int, tick序列号

  • datetime: int, 1501074872000000000 (tick从交易所发出的时间(按北京时间),自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数)

  • last_price: float, 3887.0 (最新价)

  • average: float, 3820.0 (当日均价)

  • highest: float, 3897.0 (当日最高价)

  • lowest: float, 3806.0 (当日最低价)

  • ask_price1: float, 3886.0 (卖一价)

  • ask_volume1: int, 3 (卖一量)

  • bid_price1: float, 3881.0 (买一价)

  • bid_volume1: int, 18 (买一量)

  • volume: int 7823 (当日成交量)

  • amount: float, 19237841.0 (成交额)

  • open_interest: int, 1941 (持仓量)

Example:

# 获取 SHFE.cu1812 的Tick序列
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
serial = api.get_tick_serial("SHFE.cu1812")
while True:
    api.wait_update()
    print(serial.iloc[-1].bid_price1, serial.iloc[-1].ask_price1)

# 预计的输出是这样的:
50860.0 51580.0
50860.0 51580.0
50820.0 51580.0
...
insert_order(symbol, direction, offset, volume, limit_price=None, order_id=None) → tqsdk.objs.Order

发送下单指令. 注意: 指令将在下次调用 wait_update() 时发出

Args:

symbol (str): 拟下单的合约symbol, 格式为 交易所代码.合约代码, 例如 "SHFE.cu1801"

direction (str): "BUY" 或 "SELL"

offset (str): "OPEN", "CLOSE" 或 "CLOSETODAY" (上期所和原油分平今/平昨, 平今用"CLOSETODAY", 平昨用"CLOSE"; 其他交易所直接用"CLOSE" 按照交易所的规则平仓)

volume (int): 需要下单的手数

limit_price (float): [可选]下单价格, 默认市价单 (上期所和原油不支持市价单, 需填写此参数值)

order_id (str): [可选]指定下单单号, 默认由 api 自动生成

Returns:

Order: 返回一个委托单对象引用. 其内容将在 wait_update() 时更新.

Example:

# 市价开3手 DCE.m1809 多仓
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
order = api.insert_order(symbol="DCE.m1809", direction="BUY", offset="OPEN", volume=3)
while True:
    api.wait_update()
    print("单状态: %s, 已成交: %d 手" % (order.status, order.volume_orign - order.volume_left))

# 预计的输出是这样的:
单状态: ALIVE, 已成交: 0 
单状态: ALIVE, 已成交: 0 
单状态: FINISHED, 已成交: 3 
...
cancel_order(order_or_order_id)

发送撤单指令. 注意: 指令将在下次调用 wait_update() 时发出

Args:

order_or_order_id (str/ Order ): 拟撤委托单或单号

Example:

# 挂价开3手 DCE.m1809 多仓, 如果价格变化则撤单重下,直到全部成交
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
quote = api.get_quote("DCE.m1809")
order = {}

while True:
    api.wait_update()
    # 当行情有变化且当前挂单价格不优时,则撤单
    if order and api.is_changing(quote) and order.status == "ALIVE"                 and quote.bid_price1 > order.limit_price:
        print("价格改变,撤单重下")
        api.cancel_order(order)
    # 当委托单已撤或还没有下单时则下单
    if (not order and api.is_changing(quote)) or (api.is_changing(order)                 and order.volume_left != 0 and order.status == "FINISHED"):
        print("下单: 价格 %f" % quote.bid_price1)
        order = api.insert_order(symbol="DCE.m1809", direction="BUY", offset="OPEN",                     volume=order.get("volume_left", 3), limit_price=quote.bid_price1)
    if api.is_changing(order):
        print("单状态: %s, 已成交: %d 手" % (order.status, order.volume_orign - order.volume_left))


# 预计的输出是这样的:
下单: 价格 3117.000000
单状态: ALIVE, 已成交: 0 手
价格改变,撤单重下
下单: 价格 3118.000000
单状态: ALIVE, 已成交: 0 手
单状态: FINISHED, 已成交: 3 手
...
get_account() → tqsdk.objs.Account

获取用户账户资金信息

Returns:

Account: 返回一个账户对象引用. 其内容将在 wait_update() 时更新.

Example:

# 获取当前浮动盈亏
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
account = api.get_account()
print(account.float_profit)

# 预计的输出是这样的:
2180.0
2080.0
2080.0
...
get_position(symbol=None) → Union[tqsdk.objs.Position, tqsdk.objs.Entity]

获取用户持仓信息

Args:

symbol (str): [可选]合约代码, 不填则返回所有持仓

Returns:

Position: 当指定了 symbol 时, 返回一个持仓对象引用. 其内容将在 wait_update() 时更新.

不填 symbol 参数调用本函数, 将返回包含用户所有持仓的一个tqsdk.objs.Entity对象引用, 使用方法与dict一致, 其中每个元素的key为合约代码, value为 Position

为保留一些可供用户查询的历史信息, 如 volume_long_yd(本交易日开盘前的多头持仓手数) 等字段, 因此服务器会返回当天已平仓合约( pos_long 和 pos_short 等字段为0)的持仓信息

Example:

# 获取 DCE.m1809 当前浮动盈亏
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
position = api.get_position("DCE.m1809")
print(position.float_profit_long + position.float_profit_short)
while api.wait_update():
    print(position.float_profit_long + position.float_profit_short)

# 预计的输出是这样的:
300.0
330.0
...
get_order(order_id=None) → Union[tqsdk.objs.Order, tqsdk.objs.Entity]

获取用户委托单信息

Args:

order_id (str): [可选]单号, 不填单号则返回所有委托单

Returns:

Order: 当指定了order_id时, 返回一个委托单对象引用. 其内容将在 wait_update() 时更新.

不填order_id参数调用本函数, 将返回包含用户所有委托单的一个tqsdk.objs.Entity对象引用, 使用方法与dict一致, 其中每个元素的key为委托单号, value为 Order

注意: 在刚下单后, tqsdk 还没有收到回单信息时, 此对象中各项内容为空

Example:

# 获取当前总挂单手数
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
orders = api.get_order()
while True:
    api.wait_update()
    print(sum(order.volume_left for oid, order in orders.items() if order.status == "ALIVE"))

# 预计的输出是这样的:
3
3
0
...
get_trade(trade_id=None) → Union[tqsdk.objs.Trade, tqsdk.objs.Entity]

获取用户成交信息

Args:

trade_id (str): [可选]成交号, 不填成交号则返回所有委托单

Returns:

Trade: 当指定了trade_id时, 返回一个成交对象引用. 其内容将在 wait_update() 时更新.

不填trade_id参数调用本函数, 将返回包含用户当前交易日所有成交记录的一个tqsdk.objs.Entity对象引用, 使用方法与dict一致, 其中每个元素的key为成交号, value为 Trade

推荐优先使用 trade_records() 获取某个委托单的相应成交记录, 仅当确有需要时才使用本函数.

wait_update(deadline=None)

等待业务数据更新

调用此函数将阻塞当前线程, 等待天勤主进程发送业务数据更新并返回

注: 它是TqApi中最重要的一个函数, 每次调用它时都会发生这些事:
  • 实际发出网络数据包(如行情订阅指令或交易指令等).

  • 尝试从服务器接收一个数据包, 并用收到的数据包更新内存中的业务数据截面.

  • 让正在运行中的后台任务获得动作机会(如策略程序创建的后台调仓任务只会在wait_update()时发出交易指令).

  • 如果没有收到数据包,则挂起等待.

Args:

deadline (float): [可选]指定截止时间,自unix epoch(1970-01-01 00:00:00 GMT)以来的秒数(time.time())。默认没有超时(无限等待)

Returns:

bool: 如果收到业务数据更新则返回 True, 如果到截止时间依然没有收到业务数据更新则返回 False

注意:
  • 天勤终端里策略日志窗口输出的内容由每次调用wait_update()时发出.

  • 由于存在网络延迟, 因此有数据更新不代表之前发出的所有请求都被处理了, 例如:

    from tqsdk import TqApi, TqSim
    
    api = TqApi(TqSim())
    quote = api.get_quote("SHFE.cu1812")
    api.wait_update()
    print(quote.datetime)
    

可能输出 ""(空字符串), 表示还没有收到该合约的行情

is_changing(obj, key=None) → bool

判定obj最近是否有更新

当业务数据更新导致 wait_update 返回后可以使用该函数判断本次业务数据更新是否包含特定obj或其中某个字段

Args:

obj (any): 任意业务对象, 包括 get_quote 返回的 quote, get_kline_serial 返回的 k_serial, get_account 返回的 account 等

key (str/list of str): [可选]需要判断的字段,默认不指定
  • 不指定: 当该obj下的任意字段有更新时返回True, 否则返回 False.

  • str: 当该obj下的指定字段有更新时返回True, 否则返回 False.

  • list of str: 当该obj下的指定字段中的任何一个字段有更新时返回True, 否则返回 False.

Returns:

bool: 如果本次业务数据更新包含了待判定的数据则返回 True, 否则返回 False.

Example:

# 追踪 SHFE.cu1812 的最新价更新
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
quote = api.get_quote("SHFE.cu1812")
print(quote.last_price)
while True:
    api.wait_update()
    if api.is_changing(quote, "last_price"):
        print(quote.last_price)

# 以上代码运行后的输出是这样的:
51800.0
51810.0
51800.0
...
is_serial_ready(obj) → bool

判断是否已经从服务器收到了所有订阅的数据

Returns:

bool: 返回 True 表示已经从服务器收到了所有订阅的数据

Example:

# 判断是否已经从服务器收到了最后 3000 根 SHFE.cu1812 的分钟线数据
from tqsdk import TqApi, TqSim

api = TqApi(TqSim())
k_serial = api.get_kline_serial("SHFE.cu1812", 60, data_length=3000)
while True:
    api.wait_update()
    print(api.is_serial_ready(k_serial))

# 预计的输出是这样的:
False
False
True
True
...
create_task(coro)

创建一个task

一个task就是一个协程,task的调度是在 wait_update 函数中完成的,如果代码从来没有调用 wait_update,则task也得不到执行

Args:

coro (coroutine): 需要创建的协程

Example:

# 一个简单的task
import asyncio
from tqsdk import TqApi, TqSim

async def hello():
    await asyncio.sleep(3)
    print("hello world")

api = TqApi(TqSim())
api.create_task(hello())
while True:
    api.wait_update()

#以上代码将在3秒后输出
hello world
register_update_notify(obj=None, chan=None)

注册一个channel以便接受业务数据更新通知

调用此函数将返回一个channel, 当obj更新时会通知该channel

推荐使用 async with api.register_update_notify() as update_chan 来注册更新通知

如果直接调用 update_chan = api.register_update_notify() 则使用完成后需要调用 await update_chan.close() 避免资源泄漏

Args:

obj (any/list of any): [可选]任意业务对象, 包括 get_quote 返回的 quote, get_kline_serial 返回的 k_serial, get_account 返回的 account 等。默认不指定,监控所有业务对象

chan (TqChan): [可选]指定需要注册的channel。默认不指定,由本函数创建

Example:

# 获取 SHFE.cu1812 合约的报价
from tqsdk import TqApi, TqSim

async def demo():
    quote = api.get_quote("SHFE.cu1812")
    async with api.register_update_notify(quote) as update_chan:
        async for _ in update_chan:
            print(quote.last_price)

api = TqApi(TqSim())
api.create_task(demo())
while True:
    api.wait_update()

#以上代码将输出
nan
51850.0
51850.0
51690.0
...
draw_text(base_k_dataframe, text, x=None, y=None, id=None, board='MAIN', color=4294901760)

配合天勤使用时, 在天勤的行情图上绘制一个字符串

Args:

base_k_dataframe (pandas.DataFrame): 基础K线数据序列, 要绘制的K线将出现在这个K线图上. 需要画图的数据以附加列的形式存在

text (str): 要显示的字符串

x (int): X 坐标, 以K线的序列号表示. 可选, 缺省为对齐最后一根K线,

y (float): Y 坐标. 可选, 缺省为最后一根K线收盘价

id (str): 字符串ID, 可选. 以相同ID多次调用本函数, 后一次调用将覆盖前一次调用的效果

board (str): 选择图板, 可选, 缺省为 "MAIN" 表示绘制在主图

color (ARGB): 文本颜色, 可选, 缺省为红色.

Example:

# 在主图最近K线的最低处标一个"最低"文字
klines = api.get_kline_serial("SHFE.cu1905", 86400)
indic = np.where(klines.low == klines.low.min())[0]
value = klines.low.min()
api.draw_text(klines, "测试413423", x=indic, y=value, color=0xFF00FF00)
draw_line(base_k_dataframe, x1, y1, x2, y2, id=None, board='MAIN', line_type='LINE', color=4294901760, width=1)

配合天勤使用时, 在天勤的行情图上绘制一个直线/线段/射线

Args:

base_k_dataframe (pandas.DataFrame): 基础K线数据序列, 要绘制的K线将出现在这个K线图上. 需要画图的数据以附加列的形式存在

x1 (int): 第一个点的 X 坐标, 以K线的序列号表示

y1 (float): 第一个点的 Y 坐标

x2 (int): 第二个点的 X 坐标, 以K线的序列号表示

y2 (float): 第二个点的 Y 坐标

id (str): 字符串ID, 可选. 以相同ID多次调用本函数, 后一次调用将覆盖前一次调用的效果

board (str): 选择图板, 可选, 缺省为 "MAIN" 表示绘制在主图

line_type ("LINE" | "SEG" | "RAY"): 画线类型, 可选, 默认为 LINE. LINE=直线, SEG=线段, RAY=射线

color (ARGB): 线颜色, 可选, 缺省为 红色

width (int): 线宽度, 可选, 缺省为 1

draw_box(base_k_dataframe, x1, y1, x2, y2, id=None, board='MAIN', bg_color=0, color=4294901760, width=1)

配合天勤使用时, 在天勤的行情图上绘制一个矩形

Args:

base_k_dataframe (pandas.DataFrame): 基础K线数据序列, 要绘制的K线将出现在这个K线图上. 需要画图的数据以附加列的形式存在

x1 (int): 矩形左上角的 X 坐标, 以K线的序列号表示

y1 (float): 矩形左上角的 Y 坐标

x2 (int): 矩形右下角的 X 坐标, 以K线的序列号表示

y2 (float): 矩形右下角的 Y 坐标

id (str): ID, 可选. 以相同ID多次调用本函数, 后一次调用将覆盖前一次调用的效果

board (str): 选择图板, 可选, 缺省为 "MAIN" 表示绘制在主图

bg_color (ARGB): 填充颜色, 可选, 缺省为 空

color (ARGB): 边框颜色, 可选, 缺省为 红色

width (int): 边框宽度, 可选, 缺省为 1

Example:

# 给主图最后5根K线加一个方框
klines = api.get_kline_serial("SHFE.cu1905", 86400)
api.draw_box(klines, x1=-5, y1=klines.iloc[-5].close, x2=-1,             y2=klines.iloc[-1].close, width=1, color=0xFF0000FF, bg_color=0x8000FF00)
class tqsdk.api.TqAccount(broker_id, account_id, password, front_broker=None, front_url=None)

天勤实盘类

创建天勤实盘实例

Args:

broker_id (str): 期货公司, 可以在天勤终端中查看期货公司名称

account_id (str): 帐号

password (str): 密码

front_broker(str): [可选]CTP交易前置的Broker ID, 用于连接次席服务器, eg: "2020"

front_url(str): [可选]CTP交易前置地址, 用于连接次席服务器, eg: "tcp://1.2.3.4:1234/"

class tqsdk.api.TqChan(api, last_only=False)

用于协程间通讯的channel

创建channel实例

Args:

last_only (bool): 为True时只存储最后一个发送到channel的对象

async close()

关闭channel

关闭后send将不起作用,recv在收完剩余数据后会立即返回None

async send(item)

异步发送数据到channel中

Args:

item (any): 待发送的对象

send_nowait(item)

尝试立即发送数据到channel中

Args:

item (any): 待发送的对象

Raises:

asyncio.QueueFull: 如果channel已满则会抛出 asyncio.QueueFull

async recv()

异步接收channel中的数据,如果channel中没有数据则一直等待

Returns:

any: 收到的数据,如果channel已被关闭则会立即收到None

recv_nowait()

尝试立即接收channel中的数据

Returns:

any: 收到的数据,如果channel已被关闭则会立即收到None

Raises:

asyncio.QueueFull: 如果channel中没有数据则会抛出 asyncio.QueueEmpty

recv_latest(latest)

尝试立即接收channel中的最后一个数据

Args:

latest (any): 如果当前channel中没有数据或已关闭则返回该对象

Returns:

any: channel中的最后一个数据