API 参考¶
天勤接口的PYTHON封装, 提供以下功能
- 连接行情和交易服务器或天勤终端的websocket扩展接口, 接收行情及交易推送数据
- 在内存中存储管理一份完整的业务数据(行情+交易), 并在接收到新数据包时更新内存数据
- 通过一批函数接口, 支持用户代码访问业务数据
- 发送交易指令
- PYTHON SDK使用文档: https://doc.shinnytech.com/pysdk/latest/
- 天勤行情终端下载: https://www.shinnytech.com/tianqin
- 天勤使用文档: https://doc.shinnytech.com/tq/latest/
-
class
tqsdk.api.
TqAccount
(broker_id, account_id, password)¶ 天勤实盘类
-
class
tqsdk.api.
TqApi
(account=None, url=None, backtest=None, debug=None, loop=None)¶ 天勤接口及数据管理类.
通常情况下, 一个线程中应该只有一个TqApi的实例, 它负责维护网络连接, 接收行情及账户数据, 并在内存中维护业务数据截面
-
cancel_order
(order_or_order_id)¶ 发送撤单指令
- Args:
- order_or_order_id (str/dict): 拟撤委托单的 dict 或 单号
Example:
# 挂价开3手 DCE.m1809 多仓, 如果价格变化则撤单重下,直到全部成交 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) quote = api.get_quote("DCE.m1809") order = {} while True: api.wait_update() # 当行情有变化且当前挂单价格不优时,则撤单 if order and api.is_changing(quote) and order["status"] == "ALIVE" and quote["bid_price1"] > order["limit_price"]: print("价格改变,撤单重下") api.cancel_order(order) # 当委托单已撤或还没有下单时则下单 if (not order and api.is_changing(quote)) or (api.is_changing(order) and order["volume_left"] != 0 and order["status"] == "FINISHED"): print("下单: 价格 %f" % quote["bid_price1"]) order = api.insert_order(symbol="DCE.m1809", direction="BUY", offset="OPEN", volume=order.get("volume_left", 3), limit_price=quote["bid_price1"]) if api.is_changing(order): print("单状态: %s, 已成交: %d 手" % (order["status"], order["volume_orign"] - order["volume_left"])) # 预计的输出是这样的: 下单: 价格 3117.000000 单状态: ALIVE, 已成交: 0 手 价格改变,撤单重下 下单: 价格 3118.000000 单状态: ALIVE, 已成交: 0 手 单状态: FINISHED, 已成交: 3 手 ...
-
close
()¶ 关闭天勤接口实例并释放相应资源
Example:
# m1901开多3手 from tqsdk import TqApi, TqSim from contextlib import closing with closing(TqApi(TqSim())) as api: api.insert_order(symbol="DCE.m1901", direction="BUY", offset="OPEN", volume=3)
-
create_task
(coro)¶ 创建一个task
一个task就是一个协程,task的调度是在 wait_update 函数中完成的,如果代码从来没有调用 wait_update,则task也得不到执行
- Args:
- coro (coroutine): 需要创建的协程
Example:
# 一个简单的task import asyncio from tqsdk import TqApi, TqSim async def hello(): await asyncio.sleep(3) print("hello world") api = TqApi(TqSim()) api.create_task(hello()) while True: api.wait_update() #以上代码将在3秒后输出 hello world
-
draw_box
(base_k_dataframe, x1, y1, x2, y2, id=None, board='MAIN', bg_color=0, color=4294901760, width=1)¶ 配合天勤使用时, 在天勤的行情图上绘制一个矩形
- Args:
base_k_dataframe (pandas.DataFrame): 基础K线数据序列, 要绘制的K线将出现在这个K线图上. 需要画图的数据以附加列的形式存在
x1 (int): 矩形左上角的 X 坐标, 以K线的序列号表示
y1 (float): 矩形左上角的 Y 坐标
x2 (int): 矩形左上角的 X 坐标, 以K线的序列号表示
y2 (float): 矩形左上角的 Y 坐标
id (str): ID, 可选. 以相同ID多次调用本函数, 后一次调用将覆盖前一次调用的效果
board (str): 选择图板, 可选, 缺省为 "MAIN" 表示绘制在主图
bg_color (ARGB): 填充颜色, 可选, 缺省为 空
color (ARGB): 边框颜色, 可选, 缺省为 红色
width (int): 边框宽度, 可选, 缺省为 1
Example:
# 给主图最后5根K线加一个方框 klines = api.get_kline_serial("SHFE.cu1905", 86400) api.draw_box(klines, x1=-5, y1=klines.iloc[-5]["close"], x2=-1, y2=klines.iloc[-1]["close"], width=1, color=0xFF0000FF, bg_color=0x8000FF00)
-
draw_line
(base_k_dataframe, x1, y1, x2, y2, id=None, board='MAIN', line_type='LINE', color=4294901760, width=1)¶ 配合天勤使用时, 在天勤的行情图上绘制一个直线/线段/射线
- Args:
base_k_dataframe (pandas.DataFrame): 基础K线数据序列, 要绘制的K线将出现在这个K线图上. 需要画图的数据以附加列的形式存在
x1 (int): 第一个点的 X 坐标, 以K线的序列号表示
y1 (float): 第一个点的 Y 坐标
x2 (int): 第二个点的 X 坐标, 以K线的序列号表示
y2 (float): 第二个点的 Y 坐标
id (str): 字符串ID, 可选. 以相同ID多次调用本函数, 后一次调用将覆盖前一次调用的效果
board (str): 选择图板, 可选, 缺省为 "MAIN" 表示绘制在主图
line_type ("LINE" | "SEG" | "RAY"): 画线类型, 可选, 默认为 LINE. LINE=直线, SEG=线段, RAY=射线
color (ARGB): 线颜色, 可选, 缺省为 红色
width (int): 线宽度, 可选, 缺省为 1
-
draw_text
(base_k_dataframe, text, x=None, y=None, id=None, board='MAIN', color=4294901760)¶ 配合天勤使用时, 在天勤的行情图上绘制一个字符串
- Args:
base_k_dataframe (pandas.DataFrame): 基础K线数据序列, 要绘制的K线将出现在这个K线图上. 需要画图的数据以附加列的形式存在
text (str): 要显示的字符串
x (int): X 坐标, 以K线的序列号表示. 可选, 缺省为对齐最后一根K线,
y (float): Y 坐标. 可选, 缺省为最后一根K线收盘价
id (str): 字符串ID, 可选. 以相同ID多次调用本函数, 后一次调用将覆盖前一次调用的效果
board (str): 选择图板, 可选, 缺省为 "MAIN" 表示绘制在主图
color (ARGB): 文本颜色, 可选, 缺省为红色.
Example:
# 在主图最近K线的最低处标一个"最低"文字 klines = api.get_kline_serial("SHFE.cu1905", 86400) indic = np.where(klines["low"] == klines["low"].min())[0] value = klines["low"].min() api.draw_text(klines, "测试413423", x=indic, y=value, color=0xFF00FF00)
-
get_account
()¶ 获取用户账户资金信息
- Returns:
dict: 本函数总是返回一个如下结构所示的包含用户账户资金信息的dict的引用. 每当其中信息改变时, 此dict会自动更新.
注意: 在 tqsdk 还没有收到账户数据包时, 此对象中各项内容为NaN
Example:
# 获取当前浮动盈亏 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) account = api.get_account() while True: api.wait_update() print(account["float_profit"]) # 预计的输出是这样的: 2180.0 2080.0 2080.0 ...
-
get_kline_serial
(symbol, duration_seconds, data_length=200, chart_id=None)¶ 获取k线序列数据
请求指定合约及周期的K线数据. 序列数据会随着时间推进自动更新
- Args:
symbol (str): 指定合约代码.
duration_seconds (int): K线数据周期,以秒为单位。例如: 1分钟线为60,1小时线为3600,日线为86400
data_length (int): 需要获取的序列长度。每个序列最大支持请求 8964 个数据
chart_id (str): [可选]指定序列id, 默认由 api 自动生成
- Returns:
pandas.DataFrame: 本函数总是返回一个 pandas.DataFrame 实例. 行数=data_length, 包含以下列:
id: int, 1234 (k线序列号) datetime: int, 1501080715000000000 (K线起点时间(按北京时间),自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数) open: float, 51450.0 (K线起始时刻的最新价) high: float, 51450.0 (K线时间范围内的最高价) low: float, 51450.0 (K线时间范围内的最低价) close: float, 51450.0 (K线结束时刻的最新价) volume: int, 11 (K线时间范围内的成交量) open_oi: int, 27354 (K线起始时刻的持仓量) close_oi: int, 27355 (K线结束时刻的持仓量)
Example:
# 获取 SHFE.cu1812 的1分钟线 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) k_serial = api.get_kline_serial("SHFE.cu1812", 60) while True: api.wait_update() print(k_serial.iloc[-1]["close"]) # 预计的输出是这样的: 50970.0 50970.0 50960.0 ...
-
get_order
(order_id=None)¶ 获取用户委托单信息
- Args:
- order_id (str): [可选]单号, 默认返回所有委托单
- Returns:
dict: 当指定了order_id时, 返回一个如下结构所示的包含指定order_id委托单信息的引用. 每当其中信息改变时, 此dict会自动更新.
不带order_id参数调用get_order函数, 将返回包含用户所有委托单的一个嵌套dict, 其中每个元素的key为合约代码, value为上述格式的dict
注意: 在 tqsdk 还没有收到委托单信息时, 此对象中各项内容为空
Example:
# 获取当前总挂单手数 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) orders = api.get_order() while True: api.wait_update() print(sum(o["volume_left"] for oid, o in orders.items() if not oid.startswith("_") and o["status"] == "ALIVE")) # 预计的输出是这样的: 3 3 0 ...
-
get_position
(symbol=None)¶ 获取用户持仓信息
- Args:
- symbol (str): [可选]合约代码, 默认返回所有持仓
- Returns:
dict: 当指定了symbol时, 返回一个如下结构所示的包含指定symbol持仓信息的引用. 每当其中信息改变时, 此dict会自动更新.
不带symbol参数调用 get_position 函数, 将返回包含用户所有持仓的一个嵌套dict, 其中每个元素的key为合约代码, value为上述格式的dict
注意: 在 tqsdk 还没有收到持仓信息时, 此对象中各项内容为空或0
Example:
# 获取 DCE.m1809 当前浮动盈亏 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) position = api.get_position("DCE.m1809") while True: api.wait_update() print(position["float_profit_long"] + position["float_profit_short"]) # 预计的输出是这样的: 300.0 300.0 330.0 ...
-
get_quote
(symbol)¶ 获取指定合约的盘口行情.
- Args:
- symbol (str): 指定合约代码。注意:天勤接口从0.8版本开始,合约代码格式变更为 交易所代码.合约代码 的格式. 可用的交易所代码如下:
- CFFEX: 中金所
- SHFE: 上期所
- DCE: 大商所
- CZCE: 郑商所
- INE: 能源交易所(原油)
- Returns:
dict: 返回一个如下结构所示的 dict 对象的引用, 当行情更新时, 此对象的内容会被自动更新
注意: 在 tqsdk 还没有收到行情数据包时, 此对象中各项内容为 NaN 或 0
Example:
# 获取 SHFE.cu1812 合约的报价 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) quote = api.get_quote("SHFE.cu1812") while True: api.wait_update() print(quote["last_price"]) #以上代码将输出 nan nan 24575.0 24575.0 ...
-
get_tick_serial
(symbol, data_length=200, chart_id=None)¶ 获取tick序列数据
请求指定合约的Tick序列数据. 序列数据会随着时间推进自动更新
- Args:
symbol (str): 指定合约代码.
data_length (int): 需要获取的序列长度。每个序列最大支持请求 8964 个数据
chart_id (str): [可选]指定序列id, 默认由 api 自动生成
- Returns:
pandas.DataFrame: 本函数总是返回一个 pandas.DataFrame 实例. 行数=data_length, 包含以下列:
id: int, tick序列号 datetime: int, 1501074872000000000 (tick从交易所发出的时间(按北京时间),自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数) last_price: float, 3887.0 (最新价) average: float, 3820.0 (当日均价) highest: float, 3897.0 (当日最高价) lowest: float, 3806.0 (当日最低价) ask_price1: float, 3886.0 (卖一价) ask_volume1: int, 3 (卖一量) bid_price1: float, 3881.0 (买一价) bid_volume1: int, 18 (买一量) volume: int 7823 (当日成交量) amount: float, 19237841.0 (成交额) open_interest: int, 1941 (持仓量)
Example:
# 获取 SHFE.cu1812 的Tick序列 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) serial = api.get_tick_serial("SHFE.cu1812") while True: api.wait_update() print(serial.iloc[-1]["bid_price1"], serial.iloc[-1]["ask_price1"]) # 预计的输出是这样的: 50860.0 51580.0 50860.0 51580.0 50820.0 51580.0 ...
-
insert_order
(symbol, direction, offset, volume, limit_price=None, order_id=None)¶ 发送下单指令
- Args:
symbol (str): 拟下单的合约symbol, 格式为 交易所代码.合约代码, 例如 "SHFE.cu1801"
direction (str): "BUY" 或 "SELL"
offset (str): "OPEN", "CLOSE" 或 "CLOSETODAY"
volume (int): 需要下单的手数
limit_price (float): [可选]下单价格, 默认市价单
order_id (str): [可选]指定下单单号, 默认由 api 自动生成
- Returns:
- dict: 本函数总是返回一个如下结构所示的包含委托单信息的dict的引用. 每当order中信息改变时, 此dict会自动更新.
Example:
# 市价开3手 DCE.m1809 多仓 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) order = api.insert_order(symbol="DCE.m1809", direction="BUY", offset="OPEN", volume=3) while True: api.wait_update() print("单状态: %s, 已成交: %d 手" % (order["status"], order["volume_orign"] - order["volume_left"])) # 预计的输出是这样的: 单状态: ALIVE, 已成交: 0 手 单状态: ALIVE, 已成交: 0 手 单状态: FINISHED, 已成交: 3 手 ...
-
is_changing
(obj, key=None)¶ 判定obj最近是否有更新
当业务数据更新导致 wait_update 返回后可以使用该函数判断本次业务数据更新是否包含特定obj或其中某个字段
- Args:
obj (any): 任意业务对象, 包括 get_quote 返回的 quote, get_kline_serial 返回的 k_serial, get_account 返回的 account 等
- key (str/list of str): [可选]需要判断的字段,默认不指定
- 不指定: 当该obj下的任意字段有更新时返回True, 否则返回 False.
- str: 当该obj下的指定字段有更新时返回True, 否则返回 False.
- list of str: 当该obj下的指定字段中的任何一个字段有更新时返回True, 否则返回 False.
- Returns:
- bool: 如果本次业务数据更新包含了待判定的数据则返回 True, 否则返回 False.
Example:
# 追踪 SHFE.cu1812 的最新价更新 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) quote = api.get_quote("SHFE.cu1812") while True: api.wait_update() if api.is_changing(quote, "last_price"): print(quote["last_price"]) # 以上代码运行后的输出是这样的: 51800.0 51810.0 51800.0 ...
-
is_serial_ready
(obj)¶ 判断是否已经从服务器收到了所有订阅的数据
- Returns:
- bool: 返回 True 表示已经从服务器收到了所有订阅的数据
Example:
# 判断是否已经从服务器收到了最后 3000 根 SHFE.cu1812 的分钟线数据 from tqsdk import TqApi, TqSim api = TqApi(TqSim()) k_serial = api.get_kline_serial("SHFE.cu1812", 60, data_length=3000) while True: api.wait_update() print(api.is_serial_ready(k_serial)) # 预计的输出是这样的: False False True True ...
-
register_update_notify
(obj=None, chan=None)¶ 注册一个channel以便接受业务数据更新通知
调用此函数将返回一个channel, 当obj更新时会通知该channel
推荐使用 async with api.register_update_notify() as update_chan 来注册更新通知
如果直接调用 update_chan = api.register_update_notify() 则使用完成后需要调用 await update_chan.close() 避免资源泄漏
- Args:
obj (any/list of any): [可选]任意业务对象, 包括 get_quote 返回的 quote, get_kline_serial 返回的 k_serial, get_account 返回的 account 等。默认不指定,监控所有业务对象
chan (TqChan): [可选]指定需要注册的channel。默认不指定,由本函数创建
Example:
# 获取 SHFE.cu1812 合约的报价 from tqsdk import TqApi, TqSim async def demo(): quote = api.get_quote("SHFE.cu1812") async with api.register_update_notify(quote) as update_chan: async for _ in update_chan: print(quote["last_price"]) api = TqApi(TqSim()) api.create_task(demo()) while True: api.wait_update() #以上代码将输出 nan 51850.0 51850.0 51690.0 ...
-
wait_update
(deadline=None)¶ 等待业务数据更新
调用此函数将阻塞当前线程, 等待天勤主进程发送业务数据更新并返回
- Args:
- deadline (float): [可选]指定截止时间,自unix epoch(1970-01-01 00:00:00 GMT)以来的秒数(time.time())。默认没有超时(无限等待)
- Returns:
- bool: 如果收到业务数据更新则返回 True, 如果到截止时间依然没有收到业务数据更新则返回 False
注意: 由于存在网络延迟, 因此有数据更新不代表之前发出的所有请求都被处理了, 例如:
from tqsdk import TqApi, TqSim api = TqApi(TqSim()) quote = api.get_quote("SHFE.cu1812") api.wait_update() print(quote["datetime"]) 可能输出 ""(空字符串), 表示还没有收到该合约的行情
-
-
class
tqsdk.api.
TqChan
(api, last_only=False)¶ 用于协程间通讯的channel
-
close
()¶ 关闭channel
关闭后send将不起作用,recv在收完剩余数据后会立即返回None
-
recv
()¶ 异步接收channel中的数据,如果channel中没有数据则一直等待
- Returns:
- any: 收到的数据,如果channel已被关闭则会立即收到None
-
recv_latest
(latest)¶ 尝试立即接收channel中的最后一个数据
- Args:
- latest (any): 如果当前channel中没有数据或已关闭则返回该对象
- Returns:
- any: channel中的最后一个数据
-
recv_nowait
()¶ 尝试立即接收channel中的数据
- Returns:
- any: 收到的数据,如果channel已被关闭则会立即收到None
- Raises:
- asyncio.QueueFull: 如果channel中没有数据则会抛出 asyncio.QueueEmpty
-
send
(item)¶ 异步发送数据到channel中
- Args:
- item (any): 待发送的对象
-
send_nowait
(item)¶ 尝试立即发送数据到channel中
- Args:
- item (any): 待发送的对象
- Raises:
- asyncio.QueueFull: 如果channel已满则会抛出 asyncio.QueueFull
-
-
class
tqsdk.lib.
InsertOrderTask
(api, symbol, direction, offset, volume, limit_price=None, order_chan=None, trade_chan=None)¶ 下单task
-
class
tqsdk.lib.
InsertOrderUntilAllTradedTask
(api, symbol, direction, offset, volume, price='ACTIVE', trade_chan=None)¶ 追价下单task, 该task会在行情变化后自动撤单重下,直到全部成交
-
class
tqsdk.lib.
TargetPosTask
(api, symbol, price='ACTIVE', init_pos=None, offset_priority='今昨, 开', trade_chan=None)¶ 目标持仓 task, 该 task 可以将指定合约调整到目标头寸
-
set_target_volume
(volume)¶ 设置目标持仓手数
- Args:
- volume (int): 目标持仓手数,正数表示多头,负数表示空头,0表示空仓
Example:
# 设置 rb1810 持仓为多头5手 from tqsdk import TqApi, TqSim, TargetPosTask api = TqApi(TqSim()) target_pos = TargetPosTask(api, "SHFE.rb1810") while True: api.wait_update() target_pos.set_target_volume(5)
-
-
class
tqsdk.sim.
TqSim
(init_balance=1000000.0, account_id='TQSIM')¶ 天勤模拟交易类
限价单要求报单价格达到或超过对手盘价格才能成交, 成交价为报单价格, 如果没有对手盘(涨跌停)则无法成交
市价单使用对手盘价格成交, 如果没有对手盘(涨跌停)则自动撤单
模拟交易不会有部分成交的情况, 要成交就是全部成交
-
class
tqsdk.backtest.
TqBacktest
(start_dt, end_dt)¶ 天勤回测类
将该类传入 TqApi 的构造函数, 则策略就会进入回测模式
回测模式下 k线会在刚创建出来时和结束时分别更新一次, 在这之间 k线是不会更新的
- 回测模式下 quote 的更新频率由所订阅的 tick 和 k线周期确定:
- 只要订阅了 tick, 则对应合约的 quote 就会使用 tick 生成, 更新频率也和 tick 一致, 但只有下字段:
- datetime/ask&bid_price1/ask&bid_volume1/last_price/highest/lowest/average/volume/amount/open_interest/ price_tick/price_decs/volume_multiple/max&min_limit&market_order_volume/underlying_symbol/strike_price
- 如果没有订阅 tick, 但是订阅了 k线, 则对应合约的 quote 会使用 k线生成, 更新频率和 k线的周期一致, 如果订阅了某个合约的多个周期的 k线, 则任一个周期的 k线有更新时, quote 都会更新. 使用 k线生成的 quote 的盘口由收盘价分别加/减一个最小变动单位, 并且 highest/lowest/average/amount 始终为 nan, volume 始终为0
- 如果即没有订阅 tick, 也没有订阅 k线或订阅的 k线周期大于分钟线, 则 TqBacktest 会自动订阅分钟线来生成 quote
模拟交易要求报单价格大于等于对手盘价格才会成交, 例如下买单, 要求价格大于等于卖一价才会成交, 如果不能立即成交则会等到下次行情更新再重新判断
回测模式下 wait_update 每次最多推进一个行情时间
回测结束后会抛出 BacktestFinished 例外
-
exception
tqsdk.exceptions.
BacktestFinished
¶ 回测结束会抛出此例外