交易策略示例

经典策略

Aberration 策略 (难度:初级)

策略说明 https://www.shinnytech.com/blog/aberration/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = "Ringo"

'''
Aberration策略 (难度:初级)
参考: https://www.shinnytech.com/blog/aberration/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from tqsdk import TqApi, TqAuth, TargetPosTask
from tqsdk.ta import BOLL

# 设置合约代码
SYMBOL = "DCE.m2105"
api = TqApi(auth=TqAuth("快期账户", "账户密码"))
quote = api.get_quote(SYMBOL)
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)
position = api.get_position(SYMBOL)
target_pos = TargetPosTask(api, SYMBOL)


# 使用BOLL指标计算中轨、上轨和下轨,其中26为周期N  ,2为参数p
def boll_line(klines):
    boll = BOLL(klines, 26, 2)
    midline = boll["mid"].iloc[-1]
    topline = boll["top"].iloc[-1]
    bottomline = boll["bottom"].iloc[-1]
    print("策略运行,中轨:%.2f,上轨为:%.2f,下轨为:%.2f" % (midline, topline, bottomline))
    return midline, topline, bottomline


midline, topline, bottomline = boll_line(klines)

while True:
    api.wait_update()
    # 每次生成新的K线时重新计算BOLL指标
    if api.is_changing(klines.iloc[-1], "datetime"):
        midline, topline, bottomline = boll_line(klines)

    # 每次最新价发生变化时进行判断
    if api.is_changing(quote, "last_price"):
        # 判断开仓条件
        if position.pos_long == 0 and position.pos_short == 0:
            # 如果最新价大于上轨,K线上穿上轨,开多仓
            if quote.last_price > topline:
                print("K线上穿上轨,开多仓")
                target_pos.set_target_volume(20)
            # 如果最新价小于轨,K线下穿下轨,开空仓
            elif quote.last_price < bottomline:
                print("K线下穿下轨,开空仓")
                target_pos.set_target_volume(-20)
            else:
                print("当前最新价%.2f,未穿上轨或下轨,不开仓" % quote.last_price)

        # 在多头情况下,空仓条件
        elif position.pos_long > 0:
            # 如果最新价低于中线,多头清仓离场
            if quote.last_price < midline:
                print("最新价低于中线,多头清仓离场")
                target_pos.set_target_volume(0)
            else:
                print("当前多仓,未穿越中线,仓位无变化")

        # 在空头情况下,空仓条件
        elif position.pos_short > 0:
            # 如果最新价高于中线,空头清仓离场
            if quote.last_price > midline:
                print("最新价高于中线,空头清仓离场")
                target_pos.set_target_volume(0)
            else:
                print("当前空仓,未穿越中线,仓位无变化")

Doublema 双均线策略 (难度:初级)

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
双均线策略
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''
from tqsdk import TqApi, TqAuth, TargetPosTask
from tqsdk.tafunc import ma

SHORT = 30  # 短周期
LONG = 60  # 长周期
SYMBOL = "SHFE.bu2012"  # 合约代码

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
print("策略开始运行")

data_length = LONG + 2  # k线数据长度
# "duration_seconds=60"为一分钟线, 日线的duration_seconds参数为: 24*60*60
klines = api.get_kline_serial(SYMBOL, duration_seconds=60, data_length=data_length)
target_pos = TargetPosTask(api, SYMBOL)

while True:
    api.wait_update()

    if api.is_changing(klines.iloc[-1], "datetime"):  # 产生新k线:重新计算SMA
        short_avg = ma(klines["close"], SHORT)  # 短周期
        long_avg = ma(klines["close"], LONG)  # 长周期

        # 均线下穿,做空
        if long_avg.iloc[-2] < short_avg.iloc[-2] and long_avg.iloc[-1] > short_avg.iloc[-1]:
            target_pos.set_target_volume(-3)
            print("均线下穿,做空")

        # 均线上穿,做多
        if short_avg.iloc[-2] < long_avg.iloc[-2] and short_avg.iloc[-1] > long_avg.iloc[-1]:
            target_pos.set_target_volume(3)
            print("均线上穿,做多")

价格动量策略 (难度:初级)

策略说明 https://www.shinnytech.com/blog/momentum-strategy/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = "Ringo"

'''
价格动量 策略 (难度:初级)
参考: https://www.shinnytech.com/blog/momentum-strategy/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from tqsdk import TqApi, TqAuth, TargetPosTask

# 设置指定合约,获取N条K线计算价格动量
SYMBOL = "SHFE.au2012"
N = 15

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24, N)
quote = api.get_quote(SYMBOL)
target_pos = TargetPosTask(api, SYMBOL)
position = api.get_position(SYMBOL)


def AR(kline1):
    """价格动量函数AR,以前N-1日K线计算价格动量ar"""
    spread_ho = sum(kline1.high[:-1] - kline1.open[:-1])
    spread_oc = sum(kline1.open[:-1] - kline1.low[:-1])
    # spread_oc 为0时,设置为最小价格跳动值
    if spread_oc == 0:
        spread_oc = quote.price_tick
    ar = (spread_ho / spread_oc) * 100
    return ar


ar = AR(klines)
print("策略开始启动")

while True:
    api.wait_update()
    # 生成新K线时,重新计算价格动量值ar
    if api.is_changing(klines.iloc[-1], "datetime"):
        ar = AR(klines)
        print("价格动量是:", ar)
    # 每次最新价发生变动时,重新进行判断
    if api.is_changing(quote, "last_price"):
        # 开仓策略
        if position.pos_long == 0 and position.pos_short == 0:
            # 如果ar大于110并且小于150,开多仓
            if 110 < ar < 150:
                print("价值动量超过110,小于150,做多")
                target_pos.set_target_volume(100)
            # 如果ar大于50,小于90,开空仓
            elif 50 < ar < 90:
                print("价值动量大于50,小于90,做空")
                target_pos.set_target_volume(-100)

        # 止损策略,多头下当前ar值小于90则平仓止损,空头下当前ar值大于110则平仓止损
        elif (position.pos_long > 0 and ar < 90) or (position.pos_short > 0 and ar > 110):
            print("止损平仓")
            target_pos.set_target_volume(0)

自动扶梯策略 (难度:初级)

策略说明 https://www.shinnytech.com/blog/escalator/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = "Ringo"

'''
自动扶梯 策略 (难度:初级)
参考: https://www.shinnytech.com/blog/escalator/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from tqsdk import TqApi, TqAuth, TargetPosTask
from tqsdk.ta import MA

# 设置合约
SYMBOL = "SHFE.rb2012"
# 设置均线长短周期
MA_SLOW, MA_FAST = 8, 40

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)
quote = api.get_quote(SYMBOL)
position = api.get_position(SYMBOL)
target_pos = TargetPosTask(api, SYMBOL)


# K线收盘价在这根K线波动范围函数


def kline_range(num):
    kl_range = (klines.iloc[num].close - klines.iloc[num].low) / \
               (klines.iloc[num].high - klines.iloc[num].low)
    return kl_range


# 获取长短均线值
def ma_caculate(klines):
    ma_slow = MA(klines, MA_SLOW).iloc[-1].ma
    ma_fast = MA(klines, MA_FAST).iloc[-1].ma
    return ma_slow, ma_fast


ma_slow, ma_fast = ma_caculate(klines)
print("慢速均线为%.2f,快速均线为%.2f" % (ma_slow, ma_fast))

while True:
    api.wait_update()
    # 每次k线更新,重新计算快慢均线
    if api.is_changing(klines.iloc[-1], "datetime"):
        ma_slow, ma_fast = ma_caculate(klines)
        print("慢速均线为%.2f,快速均线为%.2f" % (ma_slow, ma_fast))

    if api.is_changing(quote, "last_price"):
        # 开仓判断
        if position.pos_long == 0 and position.pos_short == 0:
            # 计算前后两根K线在当时K线范围波幅
            kl_range_cur = kline_range(-2)
            kl_range_pre = kline_range(-3)
            # 开多头判断,最近一根K线收盘价在短期均线和长期均线之上,前一根K线收盘价位于K线波动范围底部25%,最近这根K线收盘价位于K线波动范围顶部25%
            if klines.iloc[-2].close > max(ma_slow, ma_fast) and kl_range_pre <= 0.25 and kl_range_cur >= 0.75:
                print("最新价为:%.2f 开多头" % quote.last_price)
                target_pos.set_target_volume(100)

            # 开空头判断,最近一根K线收盘价在短期均线和长期均线之下,前一根K线收盘价位于K线波动范围顶部25%,最近这根K线收盘价位于K线波动范围底部25%
            elif klines.iloc[-2].close < min(ma_slow, ma_fast) and kl_range_pre >= 0.75 and kl_range_cur <= 0.25:
                print("最新价为:%.2f 开空头" % quote.last_price)
                target_pos.set_target_volume(-100)
            else:
                print("最新价位:%.2f ,未满足开仓条件" % quote.last_price)

        # 多头持仓止损策略
        elif position.pos_long > 0:
            # 在两根K线较低点减一跳,进行多头止损
            kline_low = min(klines.iloc[-2].low, klines.iloc[-3].low)
            if klines.iloc[-1].close <= kline_low - quote.price_tick:
                print("最新价为:%.2f,进行多头止损" % (quote.last_price))
                target_pos.set_target_volume(0)
            else:
                print("多头持仓,当前价格 %.2f,多头离场价格%.2f" %
                      (quote.last_price, kline_low - quote.price_tick))

        # 空头持仓止损策略
        elif position.pos_short > 0:
            # 在两根K线较高点加一跳,进行空头止损
            kline_high = max(klines.iloc[-2].high, klines.iloc[-3].high)
            if klines.iloc[-1].close >= kline_high + quote.price_tick:
                print("最新价为:%.2f 进行空头止损" % quote.last_price)
                target_pos.set_target_volume(0)
            else:
                print("空头持仓,当前价格 %.2f,空头离场价格%.2f" %
                      (quote.last_price, kline_high + quote.price_tick))

菲阿里四价 策略 (难度:初级)

策略说明 https://www.shinnytech.com/blog/fairy-four-price/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
菲阿里四价 策略(日内突破策略, 在每日收盘前对所持合约进行平仓)
参考: https://www.shinnytech.com/blog/fairy-four-price/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from tqsdk import TqApi, TqAuth, TargetPosTask
from datetime import datetime
import time

symbol = "SHFE.ni2012"  # 合约代码
close_hour, close_minute = 14, 50  # 平仓时间

api = TqApi(auth=TqAuth("快期账户", "账户密码"))  # 使用模拟帐号直连行情和交易服务器
quote = api.get_quote(symbol)  # 获取指定合约的盘口行情
klines = api.get_kline_serial(symbol, 24 * 60 * 60)  # 获取日线
position = api.get_position(symbol)  # 持仓信息
target_pos = TargetPosTask(api, symbol)  # 目标持仓

top_rail = klines.high.iloc[-2]  # 上轨: 昨日高点
bottom_rail = klines.low.iloc[-2]  # 下轨: 昨日低点
print("上轨:", top_rail, ",下轨:", bottom_rail, ",昨日收盘价:", klines.close.iloc[-2], ",今日开盘价:", klines.open.iloc[-1])

while True:
    api.wait_update()
    if api.is_changing(klines.iloc[-1], "datetime"):  # 如果产生一根新日线 (即到达下一个交易日): 重新获取上下轨
        top_rail = klines.high.iloc[-2]
        bottom_rail = klines.low.iloc[-2]
        print("上轨:", top_rail, ",下轨:", bottom_rail, ",昨日收盘价:", klines.close.iloc[-2], ",今日开盘价:", klines.open.iloc[-1])

    if api.is_changing(quote, "last_price"):  # 如果行情最新价发生变化
        print("当前最新价", quote.last_price)
        # 开仓突破
        if quote.last_price > top_rail and position.pos_long == 0:  # 如果价格突破上轨: 买入开仓
            print("最新价:", quote.last_price, ", 价格突破上轨,买入开仓")
            target_pos.set_target_volume(3)  # 设置目标持仓手数,将指定合约调整到目标头寸
        elif quote.last_price < bottom_rail and position.pos_short == 0:  # 如果价格跌破下轨: 卖出开仓
            print("最新价:", quote.last_price, ", 价格跌破下轨, 卖出开仓")
            target_pos.set_target_volume(-3)

        # 平仓止损: 当价格 向上突破上轨 或 向下突破下轨 后, 再次回破当日开盘价
        if (quote.highest > top_rail and quote.last_price <= quote.open) or (
                quote.lowest < bottom_rail and quote.last_price >= quote.open):
            print("平仓止损")
            target_pos.set_target_volume(0)

    if api.is_changing(quote, "datetime"):
        now_time = datetime.strptime(quote.datetime, "%Y-%m-%d %H:%M:%S.%f")  # 获取当前的行情时间
        if now_time.hour == close_hour and now_time.minute >= close_minute:  # 到达平仓时间: 平仓
            print("临近本交易日收盘: 平仓")
            target_pos.set_target_volume(0)
            deadline = time.time() + 60  # 设置截止时间为当前时间的60秒以后
            while api.wait_update(deadline=deadline):  # 等待60秒
                pass
            api.close()  # 关闭api
            break  # 退出while循环

R-Breaker 交易策略 - 隔夜留仓 (难度:初级)

策略说明 https://www.shinnytech.com/blog/r-breaker/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
R-Breaker策略(隔夜留仓) (难度:初级)
参考: https://www.shinnytech.com/blog/r-breaker
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from tqsdk import TqApi, TqAuth, TargetPosTask

SYMBOL = "SHFE.au2006"  # 合约代码
STOP_LOSS_PRICE = 10  # 止损点(价格)


def get_index_line(klines):
    '''计算指标线'''
    high = klines.high.iloc[-2]  # 前一日的最高价
    low = klines.low.iloc[-2]  # 前一日的最低价
    close = klines.close.iloc[-2]  # 前一日的收盘价

    pivot = (high + low + close) / 3  # 枢轴点
    b_break = high + 2 * (pivot - low)  # 突破买入价
    s_setup = pivot + (high - low)  # 观察卖出价
    s_enter = 2 * pivot - low  # 反转卖出价
    b_enter = 2 * pivot - high  # 反转买入价
    b_setup = pivot - (high - low)  # 观察买入价
    s_break = low - 2 * (high - pivot)  # 突破卖出价

    print("已计算新标志线, 枢轴点: %f, 突破买入价: %f, 观察卖出价: %f, 反转卖出价: %f, 反转买入价: %f, 观察买入价: %f, 突破卖出价: %f"
          % (pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break))
    return pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break


api = TqApi(auth=TqAuth("快期账户", "账户密码"))
quote = api.get_quote(SYMBOL)
klines = api.get_kline_serial(SYMBOL, 24 * 60 * 60)  # 86400: 使用日线
position = api.get_position(SYMBOL)
target_pos = TargetPosTask(api, SYMBOL)
target_pos_value = position.pos_long - position.pos_short  # 目标净持仓数
open_position_price = position.open_price_long if target_pos_value > 0 else position.open_price_short  # 开仓价
pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break = get_index_line(klines)  # 七条标准线

while True:
    target_pos.set_target_volume(target_pos_value)
    api.wait_update()
    if api.is_changing(klines.iloc[-1], "datetime"):  # 产生新k线,则重新计算7条指标线
        pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break = get_index_line(klines)

    '''交易规则'''
    if api.is_changing(quote, "last_price"):
        print("最新价: ", quote.last_price)

        # 开仓价与当前行情价之差大于止损点则止损
        if (target_pos_value > 0 and open_position_price - quote.last_price >= STOP_LOSS_PRICE) or \
                (target_pos_value < 0 and quote.last_price - open_position_price >= STOP_LOSS_PRICE):
            target_pos_value = 0  # 平仓

        # 反转:
        if target_pos_value > 0:  # 多头持仓
            if quote.highest > s_setup and quote.last_price < s_enter:
                # 多头持仓,当日内最高价超过观察卖出价后,
                # 盘中价格出现回落,且进一步跌破反转卖出价构成的支撑线时,
                # 采取反转策略,即在该点位反手做空
                print("多头持仓,当日内最高价超过观察卖出价后跌破反转卖出价: 反手做空")
                target_pos_value = -3  # 做空
                open_position_price = quote.last_price
        elif target_pos_value < 0:  # 空头持仓
            if quote.lowest < b_setup and quote.last_price > b_enter:
                # 空头持仓,当日内最低价低于观察买入价后,
                # 盘中价格出现反弹,且进一步超过反转买入价构成的阻力线时,
                # 采取反转策略,即在该点位反手做多
                print("空头持仓,当日最低价低于观察买入价后超过反转买入价: 反手做多")
                target_pos_value = 3  # 做多
                open_position_price = quote.last_price

        # 突破:
        elif target_pos_value == 0:  # 空仓条件
            if quote.last_price > b_break:
                # 在空仓的情况下,如果盘中价格超过突破买入价,
                # 则采取趋势策略,即在该点位开仓做多
                print("空仓,盘中价格超过突破买入价: 开仓做多")
                target_pos_value = 3  # 做多
                open_position_price = quote.last_price
            elif quote.last_price < s_break:
                # 在空仓的情况下,如果盘中价格跌破突破卖出价,
                # 则采取趋势策略,即在该点位开仓做空
                print("空仓,盘中价格跌破突破卖出价: 开仓做空")
                target_pos_value = -3  # 做空
                open_position_price = quote.last_price

R-Breaker 交易策略 - 非隔夜留仓 (难度:初级)

策略说明 https://www.shinnytech.com/blog/r-breaker/

# !/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
R-Breaker策略(非隔夜留仓: 在每日收盘前,对所持合约进行平仓)
参考: https://www.shinnytech.com/blog/r-breaker
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from datetime import datetime
from tqsdk import TqApi, TqAuth, TargetPosTask

SYMBOL = "SHFE.au2006"  # 合约代码
CLOSE_HOUR, CLOSE_MINUTE = 14, 50  # 平仓时间
STOP_LOSS_PRICE = 10  # 止损点(价格)

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
print("策略开始运行")


def get_index_line(klines):
    '''计算指标线'''
    high = klines.high.iloc[-2]  # 前一日的最高价
    low = klines.low.iloc[-2]  # 前一日的最低价
    close = klines.close.iloc[-2]  # 前一日的收盘价
    pivot = (high + low + close) / 3  # 枢轴点
    b_break = high + 2 * (pivot - low)  # 突破买入价
    s_setup = pivot + (high - low)  # 观察卖出价
    s_enter = 2 * pivot - low  # 反转卖出价
    b_enter = 2 * pivot - high  # 反转买入价
    b_setup = pivot - (high - low)  # 观察买入价
    s_break = low - 2 * (high - pivot)  # 突破卖出价
    print("已计算新标志线, 枢轴点: %f, 突破买入价: %f, 观察卖出价: %f, 反转卖出价: %f, 反转买入价: %f, 观察买入价: %f, 突破卖出价: %f"
          % (pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break))
    return pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break


quote = api.get_quote(SYMBOL)
klines = api.get_kline_serial(SYMBOL, 24 * 60 * 60)  # 86400: 使用日线
position = api.get_position(SYMBOL)
target_pos = TargetPosTask(api, SYMBOL)
target_pos_value = position.pos_long - position.pos_short  # 目标净持仓数
open_position_price = position.open_price_long if target_pos_value > 0 else position.open_price_short  # 开仓价
pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break = get_index_line(klines)  # 七条标准线

while True:
    target_pos.set_target_volume(target_pos_value)
    api.wait_update()
    if api.is_changing(klines.iloc[-1], "datetime"):  # 产生新k线,则重新计算7条指标线
        pivot, b_break, s_setup, s_enter, b_enter, b_setup, s_break = get_index_line(klines)

    if api.is_changing(quote, "datetime"):
        now = datetime.strptime(quote.datetime, "%Y-%m-%d %H:%M:%S.%f")
        if now.hour == CLOSE_HOUR and now.minute >= CLOSE_MINUTE:  # 到达平仓时间: 平仓
            print("临近本交易日收盘: 平仓")
            target_pos_value = 0  # 平仓
            pivot = b_break = s_setup = s_enter = b_enter = b_setup = s_break = float("nan")  # 修改各指标线的值, 避免平仓后再次触发

    '''交易规则'''
    if api.is_changing(quote, "last_price"):
        print("最新价: %f" % quote.last_price)

        # 开仓价与当前行情价之差大于止损点则止损
        if (target_pos_value > 0 and open_position_price - quote.last_price >= STOP_LOSS_PRICE) or \
                (target_pos_value < 0 and quote.last_price - open_position_price >= STOP_LOSS_PRICE):
            target_pos_value = 0  # 平仓

        # 反转:
        if target_pos_value > 0:  # 多头持仓
            if quote.highest > s_setup and quote.last_price < s_enter:
                # 多头持仓,当日内最高价超过观察卖出价后,
                # 盘中价格出现回落,且进一步跌破反转卖出价构成的支撑线时,
                # 采取反转策略,即在该点位反手做空
                print("多头持仓,当日内最高价超过观察卖出价后跌破反转卖出价: 反手做空")
                target_pos_value = -3  # 做空
                open_position_price = quote.last_price
        elif target_pos_value < 0:  # 空头持仓
            if quote.lowest < b_setup and quote.last_price > b_enter:
                # 空头持仓,当日内最低价低于观察买入价后,
                # 盘中价格出现反弹,且进一步超过反转买入价构成的阻力线时,
                # 采取反转策略,即在该点位反手做多
                print("空头持仓,当日最低价低于观察买入价后超过反转买入价: 反手做多")
                target_pos_value = 3  # 做多
                open_position_price = quote.last_price

        # 突破:
        elif target_pos_value == 0:  # 空仓条件
            if quote.last_price > b_break:
                # 在空仓的情况下,如果盘中价格超过突破买入价,
                # 则采取趋势策略,即在该点位开仓做多
                print("空仓,盘中价格超过突破买入价: 开仓做多")
                target_pos_value = 3  # 做多
                open_position_price = quote.last_price
            elif quote.last_price < s_break:
                # 在空仓的情况下,如果盘中价格跌破突破卖出价,
                # 则采取趋势策略,即在该点位开仓做空
                print("空仓,盘中价格跌破突破卖出价: 开仓做空")
                target_pos_value = -3  # 做空
                open_position_price = quote.last_price

Dual Thrust 策略 (难度:中级)

策略说明 https://www.shinnytech.com/blog/dual-thrust/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
Dual Thrust策略 (难度:中级)
参考: https://www.shinnytech.com/blog/dual-thrust
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

from tqsdk import TqApi, TqAuth, TargetPosTask

SYMBOL = "DCE.jd2011"  # 合约代码
NDAY = 5  # 天数
K1 = 0.2  # 上轨K值
K2 = 0.2  # 下轨K值

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
print("策略开始运行")

quote = api.get_quote(SYMBOL)
klines = api.get_kline_serial(SYMBOL, 24 * 60 * 60)  # 86400使用日线
target_pos = TargetPosTask(api, SYMBOL)


def dual_thrust(quote, klines):
    current_open = klines.iloc[-1]["open"]
    HH = max(klines.high.iloc[-NDAY - 1:-1])  # N日最高价的最高价
    HC = max(klines.close.iloc[-NDAY - 1:-1])  # N日收盘价的最高价
    LC = min(klines.close.iloc[-NDAY - 1:-1])  # N日收盘价的最低价
    LL = min(klines.low.iloc[-NDAY - 1:-1])  # N日最低价的最低价
    range = max(HH - LC, HC - LL)
    buy_line = current_open + range * K1  # 上轨
    sell_line = current_open - range * K2  # 下轨
    print("当前开盘价: %f, 上轨: %f, 下轨: %f" % (current_open, buy_line, sell_line))
    return buy_line, sell_line


buy_line, sell_line = dual_thrust(quote, klines)  # 获取上下轨

while True:
    api.wait_update()
    if api.is_changing(klines.iloc[-1], ["datetime", "open"]):  # 新产生一根日线或开盘价发生变化: 重新计算上下轨
        buy_line, sell_line = dual_thrust(quote, klines)

    if api.is_changing(quote, "last_price"):
        if quote.last_price > buy_line:  # 高于上轨
            print("高于上轨,目标持仓 多头3手")
            target_pos.set_target_volume(3)  # 交易
        elif quote.last_price < sell_line:  # 低于下轨
            print("低于下轨,目标持仓 空头3手")
            target_pos.set_target_volume(-3)  # 交易
        else:
            print('未穿越上下轨,不调整持仓')

网格交易策略 (难度:中级)

策略说明 https://www.shinnytech.com/blog/grid-trading/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

"""
网格交易策略 (难度:中级)
参考: https://www.shinnytech.com/blog/grid-trading/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
"""

from functools import reduce
from tqsdk import TqApi, TqAuth, TargetPosTask

SYMBOL = "DCE.jd2011"  # 合约代码
START_PRICE = 4247  # 起始价位
GRID_AMOUNT = 10  # 网格在多头、空头方向的格子(档位)数量

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
grid_region_long = [0.005] * GRID_AMOUNT  # 多头每格价格跌幅(网格密度)
grid_region_short = [0.005] * GRID_AMOUNT  # 空头每格价格涨幅(网格密度)
grid_volume_long = [i for i in range(GRID_AMOUNT + 1)]  # 多头每格持仓手数
grid_volume_short = [i for i in range(GRID_AMOUNT + 1)]  # 空头每格持仓手数
grid_prices_long = [reduce(lambda p, r: p * (1 - r), grid_region_long[:i], START_PRICE) for i in
                    range(GRID_AMOUNT + 1)]  # 多头每格的触发价位列表
grid_prices_short = [reduce(lambda p, r: p * (1 + r), grid_region_short[:i], START_PRICE) for i in
                     range(GRID_AMOUNT + 1)]  # 空头每格的触发价位列表

print("策略开始运行, 起始价位: %f, 多头每格持仓手数:%s, 多头每格的价位:%s, 空头每格的价位:%s" % (
START_PRICE, grid_volume_long, grid_prices_long, grid_prices_short))
quote = api.get_quote(SYMBOL)  # 行情数据
target_pos = TargetPosTask(api, SYMBOL)
position = api.get_position(SYMBOL)  # 持仓信息


def wait_price(layer):
    """等待行情最新价变动到其他档位,则进入下一档位或回退到上一档位; 如果从下一档位回退到当前档位,则设置为当前对应的持仓手数;
        layer : 当前所在第几个档位层次; layer>0 表示多头方向, layer<0 表示空头方向
    """
    if layer > 0 or quote.last_price <= grid_prices_long[1]:  # 是多头方向
        while True:
            api.wait_update()
            # 如果当前档位小于最大档位,并且最新价小于等于下一个档位的价格: 则设置为下一档位对应的手数后进入下一档位层次
            if layer < GRID_AMOUNT and quote.last_price <= grid_prices_long[layer + 1]:
                target_pos.set_target_volume(grid_volume_long[layer + 1])
                print("最新价: %f, 进入: 多头第 %d 档" % (quote.last_price, layer + 1))
                wait_price(layer + 1)
                # 从下一档位回退到当前档位后, 设置回当前对应的持仓手数
                target_pos.set_target_volume(grid_volume_long[layer + 1])
            # 如果最新价大于当前档位的价格: 则回退到上一档位
            if quote.last_price > grid_prices_long[layer]:
                print("最新价: %f, 回退到: 多头第 %d 档" % (quote.last_price, layer))
                return
    elif layer < 0 or quote.last_price >= grid_prices_short[1]:  # 是空头方向
        layer = -layer  # 转为正数便于计算
        while True:
            api.wait_update()
            # 如果当前档位小于最大档位层次,并且最新价大于等于下一个档位的价格: 则设置为下一档位对应的持仓手数后进入下一档位层次
            if layer < GRID_AMOUNT and quote.last_price >= grid_prices_short[layer + 1]:
                target_pos.set_target_volume(-grid_volume_short[layer + 1])
                print("最新价: %f, 进入: 空头第 %d 档" % (quote.last_price, layer + 1))
                wait_price(-(layer + 1))
                # 从下一档位回退到当前档位后, 设置回当前对应的持仓手数
                target_pos.set_target_volume(-grid_volume_short[layer + 1])
            # 如果最新价小于当前档位的价格: 则回退到上一档位
            if quote.last_price < grid_prices_short[layer]:
                print("最新价: %f, 回退到: 空头第 %d 档" % (quote.last_price, layer))
                return


while True:
    api.wait_update()
    wait_price(0)  # 从第0层开始进入网格
    target_pos.set_target_volume(0)

网格交易策略 - 异步代码 (难度:中级)

策略说明 https://www.shinnytech.com/blog/grid-trading/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'chengzhi'

"""
网格交易策略
参考: https://www.shinnytech.com/blog/grid-trading/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
"""

from functools import reduce
from contextlib import closing
from tqsdk import TqApi, TqAuth, TargetPosTask

# 网格计划参数:
symbol = "DCE.jd2011"  # 合约代码
start_price = 4247  # 起始价位
grid_amount = 10  # 网格在多头、空头方向的格子(档位)数量
grid_region_long = [0.005] * grid_amount  # 多头每格价格跌幅(网格密度)
grid_region_short = [0.005] * grid_amount  # 空头每格价格涨幅(网格密度)
grid_volume_long = [1] * grid_amount  # 多头每格交易手数
grid_volume_short = [-1] * grid_amount  # 空头每格交易手数
grid_prices_long = [reduce(lambda p, r: p*(1-r), grid_region_long[:i], start_price) for i in range(grid_amount + 1)]  # 多头每格的触发价位列表, 第一个元素为起始价位
grid_prices_short = [reduce(lambda p, r: p*(1+r), grid_region_short[:i], start_price) for i in range(grid_amount + 1)]  # 空头每格的触发价位列表, 第一个元素为起始价位

print("起始价位:", start_price)
print("多头每格交易量:", grid_volume_long)
print("多头每格的价位:", grid_prices_long)
print("空头每格的价位:", grid_prices_short)

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
quote = api.get_quote(symbol)  # 行情数据
target_pos = TargetPosTask(api, symbol)
target_volume = 0  # 目标持仓手数


async def price_watcher(open_price, close_price, volume):
    """该task在价格触发开仓价时开仓,触发平仓价时平仓"""
    global target_volume
    async with api.register_update_notify(quote) as update_chan:  # 当 quote 有更新时会发送通知到 update_chan 上
        while True:
            async for _ in update_chan:  # 当从 update_chan 上收到行情更新通知时判断是否触发开仓条件
                if (volume > 0 and quote.last_price <= open_price) or (volume < 0 and quote.last_price >= open_price):
                    break
            target_volume += volume
            target_pos.set_target_volume(target_volume)
            print("时间:", quote.datetime, "最新价:", quote.last_price, "开仓", volume, "手", "总仓位:", target_volume, "手")
            async for _ in update_chan:  # 当从 update_chan 上收到行情更新通知时判断是否触发平仓条件
                if (volume > 0 and quote.last_price > close_price) or (volume < 0 and quote.last_price < close_price):
                    break
            target_volume -= volume
            target_pos.set_target_volume(target_volume)
            print("时间:", quote.datetime, "最新价:", quote.last_price, "平仓", volume, "手", "总仓位:", target_volume, "手")


for i in range(grid_amount):
    api.create_task(price_watcher(grid_prices_long[i+1], grid_prices_long[i], grid_volume_long[i]))
    api.create_task(price_watcher(grid_prices_short[i+1], grid_prices_short[i], grid_volume_short[i]))

with closing(api):
    while True:
        api.wait_update()

随机森林 (难度:中级)

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

import pandas as pd
import datetime
from contextlib import closing
from tqsdk import TqApi, TqAuth, TqBacktest, BacktestFinished, TargetPosTask
from tqsdk.tafunc import sma, ema2, trma
from sklearn.ensemble import RandomForestClassifier

pd.set_option('display.max_rows', None)  # 设置Pandas显示的行数
pd.set_option('display.width', None)  # 设置Pandas显示的宽度

'''
应用随机森林对某交易日涨跌情况的预测(使用sklearn包)
参考:https://www.joinquant.com/post/1571
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

symbol = "SHFE.ru1811"  # 交易合约代码
close_hour, close_minute = 14, 50  # 预定收盘时间(因为真实收盘后无法进行交易, 所以提前设定收盘时间)


def get_prediction_data(klines, n):
    """获取用于随机森林的n个输入数据(n为数据长度): n天中每天的特征参数及其涨跌情况"""
    close_prices = klines.close[- 30 - n:]  # 获取本交易日及以前的收盘价(此时在预定的收盘时间: 认为本交易日已收盘)
    # 计算所需指标
    sma_data = sma(close_prices, 30, 0.02)[-n:]  # SMA指标, 函数默认时间周期参数:30
    wma_data = ema2(close_prices, 30)[-n:]  # WMA指标
    mom_data = trma(close_prices, 30)[-n:]  # MOM指标
    x_all = list(zip(sma_data, wma_data, mom_data))  # 样本特征组
    y_all = list(klines.close.iloc[i] >= klines.close.iloc[i - 1] for i in list(reversed(range(-1, -n - 1, -1))))  # 样本标签组
    # x_all:            大前天指标 前天指标 昨天指标 (今天指标)
    # y_all:   (大前天)    前天     昨天    今天      -明天-
    # 准备算法需要用到的数据
    x_train = x_all[: -1]  # 训练数据: 特征
    x_predict = x_all[-1]  # 预测数据(用本交易日的指标预测下一交易日的涨跌)
    y_train = y_all[1:]  # 训练数据: 标签 (去掉第一个数据后让其与指标隔一位对齐(例如: 昨天的特征 -> 对应预测今天的涨跌标签))

    return x_train, y_train, x_predict


predictions = []  # 用于记录每次的预测结果(在每个交易日收盘时用收盘数据预测下一交易日的涨跌,并记录在此列表里)
api = TqApi(backtest=TqBacktest(start_dt=datetime.date(2018, 7, 2), end_dt=datetime.date(2018, 9, 26)), auth=TqAuth("快期账户", "账户密码"))
quote = api.get_quote(symbol)
klines = api.get_kline_serial(symbol, duration_seconds=24 * 60 * 60)  # 日线
target_pos = TargetPosTask(api, symbol)
with closing(api):
    try:
        while True:
            while not api.is_changing(klines.iloc[-1], "datetime"):  # 等到达下一个交易日
                api.wait_update()
            while True:
                api.wait_update()
                # 在收盘后预测下一交易日的涨跌情况
                if api.is_changing(quote, "datetime"):
                    now = datetime.datetime.strptime(quote.datetime, "%Y-%m-%d %H:%M:%S.%f")  # 当前quote的时间
                    # 判断是否到达预定收盘时间: 如果到达 则认为本交易日收盘, 此时预测下一交易日的涨跌情况, 并调整为对应仓位
                    if now.hour == close_hour and now.minute >= close_minute:
                        # 1- 获取数据
                        x_train, y_train, x_predict = get_prediction_data(klines, 75)  # 参数1: K线, 参数2:需要的数据长度

                        # 2- 利用机器学习算法预测下一个交易日的涨跌情况
                        # n_estimators 参数: 选择森林里(决策)树的数目; bootstrap 参数: 选择建立决策树时,是否使用有放回抽样
                        clf = RandomForestClassifier(n_estimators=30, bootstrap=True)
                        clf.fit(x_train, y_train)  # 传入训练数据, 进行参数训练
                        predictions.append(bool(clf.predict([x_predict])))  # 传入测试数据进行预测, 得到预测的结果

                        # 3- 进行交易
                        if predictions[-1] == True:  # 如果预测结果为涨: 买入
                            print(quote.datetime, "预测下一交易日为 涨")
                            target_pos.set_target_volume(10)
                        else:  # 如果预测结果为跌: 卖出
                            print(quote.datetime, "预测下一交易日为 跌")
                            target_pos.set_target_volume(-10)
                        break

    except BacktestFinished:  # 回测结束, 获取预测结果,统计正确率
        klines["pre_close"] = klines["close"].shift(1)  # 增加 pre_close(上一交易日的收盘价) 字段
        klines = klines[-len(predictions) + 1:]  # 取出在回测日期内的K线数据
        klines["prediction"] = predictions[:-1]  # 增加预测的本交易日涨跌情况字段(向后移一个数据目的: 将 本交易日对应下一交易日的涨跌 调整为 本交易日对应本交易日的涨跌)
        results = (klines["close"] - klines["pre_close"] >= 0) == klines["prediction"]

        print(klines)
        print("----回测结束----")
        print("预测结果正误:\n", results)
        print("预测结果数目统计: 总计", len(results),"个预测结果")
        print(pd.value_counts(results))
        print("预测的准确率:")
        print((pd.value_counts(results)[True]) / len(results))

海龟交易策略 (难度:中级)

策略说明 https://www.shinnytech.com/blog/turtle/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
海龟策略 (难度:中级)
参考: https://www.shinnytech.com/blog/turtle/
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

import json
import time
from tqsdk import TqApi, TqAuth, TargetPosTask
from tqsdk.ta import ATR


class Turtle:
    def __init__(self, symbol, account=None, auth=None, donchian_channel_open_position=20,
                 donchian_channel_stop_profit=10,
                 atr_day_length=20, max_risk_ratio=0.5):
        self.account = account  # 交易账号
        self.auth = auth  # 快期账户
        self.symbol = symbol  # 合约代码
        self.donchian_channel_open_position = donchian_channel_open_position  # 唐奇安通道的天数周期(开仓)
        self.donchian_channel_stop_profit = donchian_channel_stop_profit  # 唐奇安通道的天数周期(止盈)
        self.atr_day_length = atr_day_length  # ATR计算所用天数
        self.max_risk_ratio = max_risk_ratio  # 最高风险度
        self.state = {
            "position": 0,  # 本策略净持仓数(正数表示多头,负数表示空头,0表示空仓)
            "last_price": float("nan"),  # 上次调仓价
        }

        self.n = 0  # 平均真实波幅(N值)
        self.unit = 0  # 买卖单位
        self.donchian_channel_high = 0  # 唐奇安通道上轨
        self.donchian_channel_low = 0  # 唐奇安通道下轨

        self.api = TqApi(self.account, auth=self.auth)
        self.quote = self.api.get_quote(self.symbol)
        # 由于ATR是路径依赖函数,因此使用更长的数据序列进行计算以便使其值稳定下来
        kline_length = max(donchian_channel_open_position + 1, donchian_channel_stop_profit + 1, atr_day_length * 5)
        self.klines = self.api.get_kline_serial(self.symbol, 24 * 60 * 60, data_length=kline_length)
        self.account = self.api.get_account()
        self.target_pos = TargetPosTask(self.api, self.symbol)

    def recalc_paramter(self):
        # 平均真实波幅(N值)
        self.n = ATR(self.klines, self.atr_day_length)["atr"].iloc[-1]
        # 买卖单位
        self.unit = int((self.account.balance * 0.01) / (self.quote.volume_multiple * self.n))
        # 唐奇安通道上轨:前N个交易日的最高价
        self.donchian_channel_high = max(self.klines.high[-self.donchian_channel_open_position - 1:-1])
        # 唐奇安通道下轨:前N个交易日的最低价
        self.donchian_channel_low = min(self.klines.low[-self.donchian_channel_open_position - 1:-1])
        print("唐其安通道上下轨: %f, %f" % (self.donchian_channel_high, self.donchian_channel_low))
        return True

    def set_position(self, pos):
        self.state["position"] = pos
        self.state["last_price"] = self.quote["last_price"]
        self.target_pos.set_target_volume(self.state["position"])

    def try_open(self):
        """开仓策略"""
        while self.state["position"] == 0:
            self.api.wait_update()
            if self.api.is_changing(self.klines.iloc[-1], "datetime"):  # 如果产生新k线,则重新计算唐奇安通道及买卖单位
                self.recalc_paramter()
            if self.api.is_changing(self.quote, "last_price"):
                print("最新价: %f" % self.quote.last_price)
                if self.quote.last_price > self.donchian_channel_high:  # 当前价>唐奇安通道上轨,买入1个Unit;(持多仓)
                    print("当前价>唐奇安通道上轨,买入1个Unit(持多仓): %d 手" % self.unit)
                    self.set_position(self.state["position"] + self.unit)
                elif self.quote.last_price < self.donchian_channel_low:  # 当前价<唐奇安通道下轨,卖出1个Unit;(持空仓)
                    print("当前价<唐奇安通道下轨,卖出1个Unit(持空仓): %d 手" % self.unit)
                    self.set_position(self.state["position"] - self.unit)

    def try_close(self):
        """交易策略"""
        while self.state["position"] != 0:
            self.api.wait_update()
            if self.api.is_changing(self.quote, "last_price"):
                print("最新价: ", self.quote.last_price)
                if self.state["position"] > 0:  # 持多单
                    # 加仓策略: 如果是多仓且行情最新价在上一次建仓(或者加仓)的基础上又上涨了0.5N,就再加一个Unit的多仓,并且风险度在设定范围内(以防爆仓)
                    if self.quote.last_price >= self.state[
                        "last_price"] + 0.5 * self.n and self.account.risk_ratio <= self.max_risk_ratio:
                        print("加仓:加1个Unit的多仓")
                        self.set_position(self.state["position"] + self.unit)
                    # 止损策略: 如果是多仓且行情最新价在上一次建仓(或者加仓)的基础上又下跌了2N,就卖出全部头寸止损
                    elif self.quote.last_price <= self.state["last_price"] - 2 * self.n:
                        print("止损:卖出全部头寸")
                        self.set_position(0)
                    # 止盈策略: 如果是多仓且行情最新价跌破了10日唐奇安通道的下轨,就清空所有头寸结束策略,离场
                    if self.quote.last_price <= min(self.klines.low[-self.donchian_channel_stop_profit - 1:-1]):
                        print("止盈:清空所有头寸结束策略,离场")
                        self.set_position(0)

                elif self.state["position"] < 0:  # 持空单
                    # 加仓策略: 如果是空仓且行情最新价在上一次建仓(或者加仓)的基础上又下跌了0.5N,就再加一个Unit的空仓,并且风险度在设定范围内(以防爆仓)
                    if self.quote.last_price <= self.state[
                        "last_price"] - 0.5 * self.n and self.account.risk_ratio <= self.max_risk_ratio:
                        print("加仓:加1个Unit的空仓")
                        self.set_position(self.state["position"] - self.unit)
                    # 止损策略: 如果是空仓且行情最新价在上一次建仓(或者加仓)的基础上又上涨了2N,就平仓止损
                    elif self.quote.last_price >= self.state["last_price"] + 2 * self.n:
                        print("止损:卖出全部头寸")
                        self.set_position(0)
                    # 止盈策略: 如果是空仓且行情最新价升破了10日唐奇安通道的上轨,就清空所有头寸结束策略,离场
                    if self.quote.last_price >= max(self.klines.high[-self.donchian_channel_stop_profit - 1:-1]):
                        print("止盈:清空所有头寸结束策略,离场")
                        self.set_position(0)

    def strategy(self):
        """海龟策略"""
        print("等待K线及账户数据...")
        deadline = time.time() + 5
        while not self.recalc_paramter():
            if not self.api.wait_update(deadline=deadline):
                raise Exception("获取数据失败,请确认行情连接正常并已经登录交易账户")
        while True:
            self.try_open()
            self.try_close()


turtle = Turtle("SHFE.au2006")
print("策略开始运行")
try:
    turtle.state = json.load(open("turtle_state.json", "r"))  # 读取数据: 本策略目标净持仓数,上一次开仓价
except FileNotFoundError:
    pass
print("当前持仓数: %d, 上次调仓价: %f" % (turtle.state["position"], turtle.state["last_price"]))
try:
    turtle.strategy()
finally:
    turtle.api.close()
    json.dump(turtle.state, open("turtle_state.json", "w"))  # 保存数据

Volume Weighted Average Price 策略 (难度:高级)

策略说明 https://www.shinnytech.com/blog/vwap/

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = 'limin'

'''
Volume Weighted Average Price策略 (难度:高级)
参考: https://www.shinnytech.com/blog/vwap
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
'''

import datetime
from tqsdk import TqApi, TqAuth, TargetPosTask

TIME_CELL = 5 * 60  # 等时长下单的时间单元, 单位: 秒
TARGET_VOLUME = 300  # 目标交易手数 (>0: 多头, <0: 空头)
SYMBOL = "DCE.jd2001"  # 交易合约代码
HISTORY_DAY_LENGTH = 20  # 使用多少天的历史数据用来计算每个时间单元的下单手数
START_HOUR, START_MINUTE = 9, 35  # 计划交易时段起始时间点
END_HOUR, END_MINUTE = 10, 50  # 计划交易时段终点时间点

api = TqApi(auth=TqAuth("快期账户", "账户密码"))
print("策略开始运行")
# 根据 HISTORY_DAY_LENGTH 推算出需要订阅的历史数据长度, 需要注意history_day_length与time_cell的比例关系以避免超过订阅限制
time_slot_start = datetime.time(START_HOUR, START_MINUTE)  # 计划交易时段起始时间点
time_slot_end = datetime.time(END_HOUR, END_MINUTE)  # 计划交易时段终点时间点
klines = api.get_kline_serial(SYMBOL, TIME_CELL, data_length=int(10 * 60 * 60 / TIME_CELL * HISTORY_DAY_LENGTH))
target_pos = TargetPosTask(api, SYMBOL)
position = api.get_position(SYMBOL)  # 持仓信息


def get_kline_time(kline_datetime):
    """获取k线的时间(不包含日期)"""
    kline_time = datetime.datetime.fromtimestamp(kline_datetime // 1000000000).time()  # 每根k线的时间
    return kline_time


def get_market_day(kline_datetime):
    """获取k线所对应的交易日"""
    kline_dt = datetime.datetime.fromtimestamp(kline_datetime // 1000000000)  # 每根k线的日期和时间
    if kline_dt.hour >= 18:  # 当天18点以后: 移到下一个交易日
        kline_dt = kline_dt + datetime.timedelta(days=1)
    while kline_dt.weekday() >= 5:  # 是周六或周日,移到周一
        kline_dt = kline_dt + datetime.timedelta(days=1)
    return kline_dt.date()


# 添加辅助列: time及date, 分别为K线时间的时:分:秒和其所属的交易日
klines["time"] = klines.datetime.apply(lambda x: get_kline_time(x))
klines["date"] = klines.datetime.apply(lambda x: get_market_day(x))

# 获取在预设交易时间段内的所有K线, 即时间位于 time_slot_start 到 time_slot_end 之间的数据
if time_slot_end > time_slot_start:  # 判断是否类似 23:00:00 开始, 01:00:00 结束这样跨天的情况
    klines = klines[(klines["time"] >= time_slot_start) & (klines["time"] <= time_slot_end)]
else:
    klines = klines[(klines["time"] >= time_slot_start) | (klines["time"] <= time_slot_end)]

# 由于可能有节假日导致部分天并没有填满整个预设交易时间段
# 因此去除缺失部分交易时段的日期(即剩下的每个日期都包含预设的交易时间段内所需的全部时间单元)
date_cnt = klines["date"].value_counts()
max_num = date_cnt.max()  # 所有日期中最完整的交易时段长度
need_date = date_cnt[date_cnt == max_num].sort_index().index[-HISTORY_DAY_LENGTH - 1:-1]  # 获取今天以前的预设数目个交易日的日期
df = klines[klines["date"].isin(need_date)]  # 最终用来计算的k线数据

# 计算每个时间单元的成交量占比, 并使用算数平均计算出预测值
datetime_grouped = df.groupby(['date', 'time'])['volume'].sum()  # 将K线的volume按照date、time建立多重索引分组
# 计算每个交易日内的预设交易时间段内的成交量总和(level=0: 表示按第一级索引"data"来分组)后,将每根k线的成交量除以所在交易日内的总成交量,计算其所占比例
volume_percent = datetime_grouped / datetime_grouped.groupby(level=0).sum()
predicted_percent = volume_percent.groupby(level=1).mean()  # 将历史上相同时间单元的成交量占比使用算数平均计算出预测值
print("各时间单元成交量占比: %s" % predicted_percent)

# 计算每个时间单元的成交量预测值
predicted_volume = {}  # 记录每个时间单元需调整的持仓量
percentage_left = 1  # 剩余比例
volume_left = TARGET_VOLUME  # 剩余手数
for index, value in predicted_percent.items():
    volume = round(volume_left * (value / percentage_left))
    predicted_volume[index] = volume
    percentage_left -= value
    volume_left -= volume
print("各时间单元应下单手数: %s" % predicted_volume)

# 交易
current_volume = 0  # 记录已调整持仓量
while True:
    api.wait_update()
    # 新产生一根K线并且在计划交易时间段内: 调整目标持仓量
    if api.is_changing(klines.iloc[-1], "datetime"):
        t = datetime.datetime.fromtimestamp(klines.iloc[-1]["datetime"] // 1000000000).time()
        if t in predicted_volume:
            current_volume += predicted_volume[t]
            print("到达下一时间单元,调整持仓为: %d" % current_volume)
            target_pos.set_target_volume(current_volume)
    # 用持仓信息判断是否完成所有目标交易手数
    if api.is_changing(position, "volume_long") or api.is_changing(position, "volume_short"):
        if position["volume_long"] - position["volume_short"] == TARGET_VOLUME:
            break

api.close()

趋势策略

三重指数平滑平均线 (TRIX) 趋势策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/trix-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import ATR

# ===== 全局参数设置 =====
SYMBOL = "CFFEX.IC2306"  # 黄金期货合约
POSITION_SIZE = 30  # 基础持仓手数
START_DATE = date(2022, 11, 1)  # 回测开始日期
END_DATE = date(2023, 4, 30)  # 回测结束日期

# TRIX指标参数
TRIX_PERIOD = 12  # TRIX计算周期
SIGNAL_PERIOD = 9  # 信号线计算周期
MA_PERIOD = 60    # 长期移动平均线周期,用于趋势过滤

# 信号阈值参数
SIGNAL_THRESHOLD = 0.05  # TRIX与信号线差值的阈值,避免微小交叉

# 风控参数
ATR_PERIOD = 14  # ATR计算周期
STOP_LOSS_MULTIPLIER = 2.0  # 止损ATR倍数
TAKE_PROFIT_MULTIPLIER = 3.0  # 止盈ATR倍数
MAX_HOLDING_DAYS = 15  # 最大持仓天数

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
stop_loss_price = 0  # 止损价格
entry_date = None  # 开仓日期
trade_count = 0  # 交易次数
win_count = 0  # 盈利次数

# ===== TRIX指标计算函数 =====
def calculate_trix(close_prices, period):
    """计算TRIX指标和信号线"""
    # 第一重EMA
    ema1 = close_prices.ewm(span=period, adjust=False).mean()
    # 第二重EMA
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    # 第三重EMA
    ema3 = ema2.ewm(span=period, adjust=False).mean()
    # 计算TRIX
    trix = 100 * (ema3 / ema3.shift(1) - 1)
    # 计算信号线
    signal = trix.rolling(SIGNAL_PERIOD).mean()
    
    return trix, signal

# ===== 策略开始 =====
print("开始运行TRIX指标期货策略...")
print(f"品种: {SYMBOL}, 回测周期: {START_DATE} - {END_DATE}")
print(f"TRIX参数: 周期={TRIX_PERIOD}, 信号线周期={SIGNAL_PERIOD}")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()
        
        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < max(TRIX_PERIOD, SIGNAL_PERIOD, MA_PERIOD, ATR_PERIOD) + 10:
                continue
                
            # 计算TRIX指标和信号线
            klines['trix'], klines['signal'] = calculate_trix(klines.close, TRIX_PERIOD)
            
            # 计算长期移动平均线,用于趋势过滤
            klines['ma'] = klines.close.rolling(window=MA_PERIOD).mean()
            
            # 计算ATR用于设置止损
            atr_data = ATR(klines, ATR_PERIOD)
            
            # 获取最新数据
            current_price = float(klines.close.iloc[-1])
            current_datetime = pd.to_datetime(klines.datetime.iloc[-1], unit='ns')
            current_trix = float(klines.trix.iloc[-1])
            previous_trix = float(klines.trix.iloc[-2])
            current_signal = float(klines.signal.iloc[-1])
            previous_signal = float(klines.signal.iloc[-2])
            current_ma = float(klines.ma.iloc[-1])
            current_atr = float(atr_data.atr.iloc[-1])
            
            # 计算TRIX与信号线的差值
            trix_diff = current_trix - current_signal
            previous_trix_diff = previous_trix - previous_signal
            
            # 输出调试信息
            print(f"日期: {current_datetime.strftime('%Y-%m-%d')}, 价格: {current_price:.2f}")
            print(f"TRIX: {current_trix:.4f}, 信号线: {current_signal:.4f}, 差值: {trix_diff:.4f}")
            
            # ===== 交易逻辑 =====
            
            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件:TRIX上穿信号线
                if previous_trix < previous_signal and current_trix > current_signal:
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price - STOP_LOSS_MULTIPLIER * current_atr
                    take_profit_price = entry_price + TAKE_PROFIT_MULTIPLIER * current_atr
                    print(f"多头开仓: 价格={entry_price}, 止损={stop_loss_price:.2f}, 止盈={take_profit_price:.2f}")
                
                # 空头开仓条件:TRIX下穿信号线
                elif previous_trix > previous_signal and current_trix < current_signal:
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price + STOP_LOSS_MULTIPLIER * current_atr
                    take_profit_price = entry_price - TAKE_PROFIT_MULTIPLIER * current_atr
                    print(f"空头开仓: 价格={entry_price}, 止损={stop_loss_price:.2f}, 止盈={take_profit_price:.2f}")
            
            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 止损条件
                if current_price <= stop_loss_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 止盈条件
                elif current_price >= take_profit_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 信号平仓:TRIX下穿信号线
                elif previous_trix > previous_signal and current_trix < current_signal:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
            
            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 止损条件
                if current_price >= stop_loss_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 止盈条件
                elif current_price <= take_profit_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 信号平仓:TRIX上穿信号线
                elif previous_trix < previous_signal and current_trix > current_signal:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

except BacktestFinished as e:
    print("回测结束")
    api.close()

基于分形的趋势突破策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/fractal-trend-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import ATR, MA

# ===== 全局参数设置 =====
SYMBOL = "SHFE.au2306"
POSITION_SIZE = 30  # 每次交易手数
START_DATE = date(2023, 4, 10)  # 回测开始日期
END_DATE = date(2023, 4, 26)  # 回测结束日期

# 1小时K线
KLINE_PERIOD = 60 * 60

# 分形识别参数
FRACTAL_WINDOW = 2  # 分形窗口大小

# 确认指标参数
SHORT_MA_PERIOD = 6  # 短期MA周期
LONG_MA_PERIOD = 12  # 长期MA周期
VOLUME_PERIOD = 5  # 成交量平均周期
ATR_PERIOD = 10  # ATR计算周期

# 风控参数
ATR_MULTIPLIER = 1.2  # 止损ATR乘数
TP_RATIO = 1.8  # 止盈比例

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
stop_loss_price = 0  # 止损价格
take_profit_price = 0  # 止盈价格
last_bull_fractal = None  # 最近的牛市分形
last_bear_fractal = None  # 最近的熊市分形


# ===== 分形识别函数 =====
def identify_fractals(klines):
    """识别牛市和熊市分形"""
    bull_fractals = []  # 牛市分形列表 [(index, low_price, high_price), ...]
    bear_fractals = []  # 熊市分形列表 [(index, low_price, high_price), ...]

    # 至少需要2*FRACTAL_WINDOW+1个K线才能形成分形
    if len(klines) < 2 * FRACTAL_WINDOW + 1:
        return bull_fractals, bear_fractals

    # 遍历K线,寻找分形
    start_idx = max(FRACTAL_WINDOW, len(klines) - 20)
    end_idx = len(klines) - FRACTAL_WINDOW

    for i in range(start_idx, end_idx):
        # 牛市分形:中间K线的低点低于两侧K线的低点
        is_bull_fractal = True
        for j in range(1, FRACTAL_WINDOW + 1):
            if klines.low.iloc[i] >= klines.low.iloc[i - j] or klines.low.iloc[i] >= klines.low.iloc[i + j]:
                is_bull_fractal = False
                break

        if is_bull_fractal:
            bull_fractals.append((i, klines.low.iloc[i], klines.high.iloc[i]))

        # 熊市分形:中间K线的高点高于两侧K线的高点
        is_bear_fractal = True
        for j in range(1, FRACTAL_WINDOW + 1):
            if klines.high.iloc[i] <= klines.high.iloc[i - j] or klines.high.iloc[i] <= klines.high.iloc[i + j]:
                is_bear_fractal = False
                break

        if is_bear_fractal:
            bear_fractals.append((i, klines.low.iloc[i], klines.high.iloc[i]))

    return bull_fractals, bear_fractals


# ===== 策略开始 =====
print("开始运行基于分形的趋势突破期货策略...")
print(f"品种: {SYMBOL}, 回测周期: {START_DATE}{END_DATE}")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, KLINE_PERIOD)  # 1小时K线

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()

        # 如果K线更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < max(SHORT_MA_PERIOD, LONG_MA_PERIOD, ATR_PERIOD) + 5:
                print(f"数据不足,当前K线数量: {len(klines)}")
                continue

            # 计算确认指标
            klines["short_ma"] = MA(klines, SHORT_MA_PERIOD).ma
            klines["long_ma"] = MA(klines, LONG_MA_PERIOD).ma
            klines["volume_avg"] = klines.volume.rolling(VOLUME_PERIOD).mean().fillna(0)
            atr = ATR(klines, ATR_PERIOD).atr

            # 识别分形
            bull_fractals, bear_fractals = identify_fractals(klines)

            # 更新最近的分形
            if bull_fractals:
                last_bull_fractal = bull_fractals[-1]
                print(
                    f"发现新牛市分形: 索引={last_bull_fractal[0]}, 低点={last_bull_fractal[1]}, 高点={last_bull_fractal[2]}")

            if bear_fractals:
                last_bear_fractal = bear_fractals[-1]
                print(
                    f"发现新熊市分形: 索引={last_bear_fractal[0]}, 低点={last_bear_fractal[1]}, 高点={last_bear_fractal[2]}")

            # 获取当前价格和指标数据
            current_price = float(klines.close.iloc[-1])
            current_volume = float(klines.volume.iloc[-1])
            current_short_ma = float(klines.short_ma.iloc[-1])
            current_long_ma = float(klines.long_ma.iloc[-1])
            current_volume_avg = float(klines.volume_avg.iloc[-1] if not pd.isna(klines.volume_avg.iloc[-1]) else 0)
            current_atr = float(atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0)

            # 确定当前趋势
            uptrend = current_short_ma > current_long_ma
            downtrend = current_short_ma < current_long_ma

            # 输出当前状态
            current_time = pd.to_datetime(klines.datetime.iloc[-1], unit='ns')
            print(f"时间: {current_time.strftime('%Y-%m-%d %H:%M')}")
            print(f"价格: {current_price}, ATR: {current_atr:.2f}")
            print(f"短期MA: {current_short_ma:.2f}, 长期MA: {current_long_ma:.2f}")
            print(f"趋势方向: {'上升' if uptrend else '下降' if downtrend else '盘整'}")
            print(f"当前成交量: {current_volume}, 平均成交量: {current_volume_avg:.2f}")

            # ===== 交易逻辑 =====

            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头入场信号
                if (last_bull_fractal and
                        last_bull_fractal[0] < len(klines) - 1 and  # 确认分形不是最新K线
                        current_price > last_bull_fractal[2] and  # 价格突破牛市分形高点
                        uptrend):  # 上升趋势确认

                    current_direction = 1
                    entry_price = current_price
                    target_pos.set_target_volume(POSITION_SIZE)

                    # 设置止损和止盈
                    stop_loss_price = last_bull_fractal[1] - current_atr * ATR_MULTIPLIER
                    take_profit_price = entry_price + (entry_price - stop_loss_price) * TP_RATIO

                    print(f"多头开仓: 价格={entry_price}, 止损={stop_loss_price:.2f}, 止盈={take_profit_price:.2f}")

                # 空头入场信号
                elif (last_bear_fractal and
                      last_bear_fractal[0] < len(klines) - 1 and  # 确认分形不是最新K线
                      current_price < last_bear_fractal[1] and  # 价格跌破熊市分形低点
                      downtrend):  # 下降趋势确认

                    current_direction = -1
                    entry_price = current_price
                    target_pos.set_target_volume(-POSITION_SIZE)

                    # 设置止损和止盈
                    stop_loss_price = last_bear_fractal[2] + current_atr * ATR_MULTIPLIER
                    take_profit_price = entry_price - (stop_loss_price - entry_price) * TP_RATIO

                    print(f"空头开仓: 价格={entry_price}, 止损={stop_loss_price:.2f}, 止盈={take_profit_price:.2f}")

            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 1. 止损条件
                if current_price <= stop_loss_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 2. 止盈条件
                elif current_price >= take_profit_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 3. 价格跌破短期MA
                elif current_price < current_short_ma:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头MA平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 4. 出现新的熊市分形且价格跌破该分形的低点
                elif (last_bear_fractal and
                      last_bear_fractal[0] > klines.index[-10] and  # 确保是较新的分形
                      current_price < last_bear_fractal[1]):
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头分形反转平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 1. 止损条件
                if current_price >= stop_loss_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 2. 止盈条件
                elif current_price <= take_profit_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 3. 价格突破短期MA
                elif current_price > current_short_ma:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头MA平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 4. 出现新的牛市分形且价格突破该分形的高点
                elif (last_bull_fractal and
                      last_bull_fractal[0] > klines.index[-10] and  # 确保是较新的分形
                      current_price > last_bull_fractal[2]):
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头分形反转平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

except BacktestFinished as e:
    print("回测结束")
    api.close()

基于钱德动量震荡指标(CMO)的期货量化交易策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/cmo-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import ATR

# ===== 全局参数设置 =====
SYMBOL = "DCE.c2309"  # 玉米期货合约
POSITION_SIZE = 500  # 基础持仓手数
START_DATE = date(2023, 2, 1)  # 回测开始日期
END_DATE = date(2023, 7, 31)  # 回测结束日期

# CMO参数设置
CMO_PERIOD = 6  # CMO计算周期
SIGNAL_PERIOD = 4  # CMO信号线周期
CMO_SLOPE_PERIOD = 2  # CMO斜率计算周期
OVERBOUGHT_THRESHOLD = 50  # 超买阈值
OVERSOLD_THRESHOLD = -50  # 超卖阈值
SMA_PERIOD = 10  # 趋势确认移动平均线周期

# 止损止盈参数
FIXED_STOP_LOSS_PCT = 0.008  # 固定止损百分比(0.8%)
TAKE_PROFIT_PCT = 0.015  # 止盈百分比(1.5%)
ATR_STOP_MULTIPLIER = 2.0  # ATR止损乘数
MAX_HOLDING_DAYS = 10  # 最大持仓天数

# ===== 全局变量 =====
current_direction = 0   # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0         # 开仓价格
stop_loss_price = 0     # 止损价格
take_profit_price = 0   # 止盈价格
entry_date = None       # 开仓日期
entry_position = 0      # 开仓数量

# ===== CMO指标计算函数 =====
def calculate_cmo(close_prices, period=14):
    """计算钱德动量震荡指标(CMO)"""
    delta = close_prices.diff()
    
    # 分离上涨和下跌
    up_sum = np.zeros_like(delta)
    down_sum = np.zeros_like(delta)
    
    # 填充上涨和下跌数组
    up_sum[delta > 0] = delta[delta > 0]
    down_sum[delta < 0] = -delta[delta < 0]  # 注意要取绝对值
    
    # 计算上涨和下跌的滚动总和
    up_rolling_sum = pd.Series(up_sum).rolling(period).sum()
    down_rolling_sum = pd.Series(down_sum).rolling(period).sum()
    
    # 计算CMO值
    cmo = 100 * ((up_rolling_sum - down_rolling_sum) / (up_rolling_sum + down_rolling_sum))
    
    return cmo

# ===== 策略开始 =====
print("开始运行钱德动量震荡指标(CMO)期货策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()
        
        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < max(CMO_PERIOD, SIGNAL_PERIOD, SMA_PERIOD) + 10:
                continue
            
            # 计算CMO及相关指标
            klines['cmo'] = calculate_cmo(klines.close, CMO_PERIOD)
            klines['cmo_signal'] = klines['cmo'].rolling(SIGNAL_PERIOD).mean()  # CMO信号线
            klines['cmo_slope'] = klines['cmo'].diff(CMO_SLOPE_PERIOD)  # CMO斜率
            klines['sma'] = klines.close.rolling(SMA_PERIOD).mean()  # 趋势确认SMA
            
            # 计算ATR用于动态止损
            atr_data = ATR(klines, 14)
            
            # 获取最新数据和前一个交易日数据
            current_price = float(klines.close.iloc[-1])
            current_datetime = pd.to_datetime(klines.datetime.iloc[-1], unit='ns')
            current_cmo = float(klines.cmo.iloc[-1])
            current_cmo_signal = float(klines.cmo_signal.iloc[-1])
            current_cmo_slope = float(klines.cmo_slope.iloc[-1])
            current_sma = float(klines.sma.iloc[-1])
            current_atr = float(atr_data.atr.iloc[-1])
            
            prev_price = float(klines.close.iloc[-2])
            prev_cmo = float(klines.cmo.iloc[-2])
            prev_cmo_signal = float(klines.cmo_signal.iloc[-2])
            
            # 输出调试信息
            print(f"日期: {current_datetime.strftime('%Y-%m-%d')}, 价格: {current_price}, CMO: {current_cmo:.2f}, 信号线: {current_cmo_signal:.2f}, 斜率: {current_cmo_slope:.2f}")
            
            # ===== 交易逻辑 =====
            
            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 计算多头开仓信号
                # 信号1: 超卖反弹
                long_signal1 = prev_cmo < OVERSOLD_THRESHOLD and current_cmo > OVERSOLD_THRESHOLD and current_price > current_sma and current_cmo_slope > 0
                
                # 信号2: 信号线交叉
                long_signal2 = prev_cmo < prev_cmo_signal and current_cmo > current_cmo_signal and current_price > current_sma and current_cmo > -30
                
                # 信号3: 零轴交叉
                long_signal3 = prev_cmo < 0 and current_cmo > 0 and current_price > current_sma and prev_cmo < -10
                
                # 计算空头开仓信号
                # 信号1: 超买回落
                short_signal1 = prev_cmo > OVERBOUGHT_THRESHOLD and current_cmo < OVERBOUGHT_THRESHOLD and current_price < current_sma and current_cmo_slope < 0
                
                # 信号2: 信号线交叉
                short_signal2 = prev_cmo > prev_cmo_signal and current_cmo < current_cmo_signal and current_price < current_sma and current_cmo < 30
                
                # 信号3: 零轴交叉
                short_signal3 = prev_cmo > 0 and current_cmo < 0 and current_price < current_sma and prev_cmo > 10
                
                # 多头开仓条件
                if long_signal1 or long_signal2 or long_signal3:
                    # 确定信号强度和头寸规模
                    signal_strength = 1
                    # 多个信号同时满足时增加头寸
                    if sum([long_signal1, long_signal2, long_signal3]) > 1:
                        signal_strength = 1.5
                    # 极端CMO值时减少头寸
                    if abs(current_cmo) > 80:
                        signal_strength = 0.7
                    
                    # 设置持仓方向和规模
                    current_direction = 1
                    position_size = round(POSITION_SIZE * signal_strength)
                    entry_position = position_size
                    target_pos.set_target_volume(position_size)
                    
                    # 记录开仓信息
                    entry_price = current_price
                    entry_date = current_datetime
                    
                    # 设置止损价格
                    atr_stop = entry_price - ATR_STOP_MULTIPLIER * current_atr
                    fixed_stop = entry_price * (1 - FIXED_STOP_LOSS_PCT)
                    stop_loss_price = max(atr_stop, fixed_stop)  # 取较严格的止损
                    
                    # 设置止盈价格
                    take_profit_price = entry_price * (1 + TAKE_PROFIT_PCT)
                    
                    # 记录信号类型
                    signal_type = ""
                    if long_signal1: signal_type += "超卖反弹 "
                    if long_signal2: signal_type += "信号线上穿 "
                    if long_signal3: signal_type += "零轴上穿 "
                    
                    print(f"多头开仓: 价格={entry_price}, 手数={position_size}, 信号={signal_type}, 止损={stop_loss_price:.2f}, 止盈={take_profit_price:.2f}")
                
                # 空头开仓条件
                elif short_signal1 or short_signal2 or short_signal3:
                    # 确定信号强度和头寸规模
                    signal_strength = 1
                    # 多个信号同时满足时增加头寸
                    if sum([short_signal1, short_signal2, short_signal3]) > 1:
                        signal_strength = 1.5
                    # 极端CMO值时减少头寸
                    if abs(current_cmo) > 80:
                        signal_strength = 0.7
                    
                    # 设置持仓方向和规模
                    current_direction = -1
                    position_size = round(POSITION_SIZE * signal_strength)
                    entry_position = position_size
                    target_pos.set_target_volume(-position_size)
                    
                    # 记录开仓信息
                    entry_price = current_price
                    entry_date = current_datetime
                    
                    # 设置止损价格
                    atr_stop = entry_price + ATR_STOP_MULTIPLIER * current_atr
                    fixed_stop = entry_price * (1 + FIXED_STOP_LOSS_PCT)
                    stop_loss_price = min(atr_stop, fixed_stop)  # 取较严格的止损
                    
                    # 设置止盈价格
                    take_profit_price = entry_price * (1 - TAKE_PROFIT_PCT)
                    
                    # 记录信号类型
                    signal_type = ""
                    if short_signal1: signal_type += "超买回落 "
                    if short_signal2: signal_type += "信号线下穿 "
                    if short_signal3: signal_type += "零轴下穿 "
                    
                    print(f"空头开仓: 价格={entry_price}, 手数={position_size}, 信号={signal_type}, 止损={stop_loss_price:.2f}, 止盈={take_profit_price:.2f}")
            
            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 计算持仓天数
                holding_days = (current_datetime - entry_date).days
                
                # 3. 基于CMO信号平仓
                if (prev_cmo > prev_cmo_signal and current_cmo < current_cmo_signal and current_cmo > 30):
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}, 原因=动量减弱")
                
                # 4. 反向信号产生
                elif (prev_cmo > OVERBOUGHT_THRESHOLD and current_cmo < OVERBOUGHT_THRESHOLD and current_cmo_slope < 0) or \
                     (prev_cmo > prev_cmo_signal and current_cmo < current_cmo_signal and current_price < current_sma) or \
                     (prev_cmo > 0 and current_cmo < 0 and prev_cmo > 10):
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头反向平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}, 原因=反向信号")

            
            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 计算持仓天数
                holding_days = (current_datetime - entry_date).days

                
                # 3. 基于CMO信号平仓
                if (prev_cmo < prev_cmo_signal and current_cmo > current_cmo_signal and current_cmo < -30):
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}, 原因=动量减弱")
                
                # 4. 反向信号产生
                elif (prev_cmo < OVERSOLD_THRESHOLD and current_cmo > OVERSOLD_THRESHOLD and current_cmo_slope > 0) or \
                     (prev_cmo < prev_cmo_signal and current_cmo > current_cmo_signal and current_price > current_sma) or \
                     (prev_cmo < 0 and current_cmo > 0 and prev_cmo < -10):
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头反向平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}, 原因=反向信号")


except BacktestFinished as e:
    print("回测结束")
    api.close()

涡旋指标期货量化策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/vortex-indicator-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import ATR

# ===== 全局参数设置 =====
SYMBOL = "CFFEX.IC2306"  # 螺纹钢期货合约
POSITION_SIZE = 30  # 固定交易手数
START_DATE = date(2022, 11, 1)  # 回测开始日期
END_DATE = date(2023, 4, 19)  # 回测结束日期

# 涡旋指标参数
VI_PERIOD = 14  # 涡旋指标周期
ATR_PERIOD = 14  # ATR指标周期
ATR_MULTIPLIER = 2.0  # 止损倍数
VI_THRESHOLD = 1.0  # VI值阈值,筛选强度较大的信号

# ===== 全局变量 =====
current_direction = 0   # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0         # 开仓价格
stop_loss_price = 0     # 止损价格

# ===== 涡旋指标计算函数 =====
def calculate_vortex(df, period=14):
    """计算涡旋指标"""
    # 计算真实范围(TR)
    df['tr'] = np.maximum(
        np.maximum(
            df['high'] - df['low'],
            np.abs(df['high'] - df['close'].shift(1))
        ),
        np.abs(df['low'] - df['close'].shift(1))
    )

    # 计算正向涡旋运动(+VM)
    df['plus_vm'] = np.abs(df['high'] - df['low'].shift(1))

    # 计算负向涡旋运动(-VM)
    df['minus_vm'] = np.abs(df['low'] - df['high'].shift(1))

    # 计算N周期内的总和
    df['tr_sum'] = df['tr'].rolling(window=period).sum()
    df['plus_vm_sum'] = df['plus_vm'].rolling(window=period).sum()
    df['minus_vm_sum'] = df['minus_vm'].rolling(window=period).sum()

    # 计算涡旋指标
    df['plus_vi'] = df['plus_vm_sum'] / df['tr_sum']
    df['minus_vi'] = df['minus_vm_sum'] / df['tr_sum']

    return df

# ===== 策略开始 =====
print("开始运行涡旋指标(VI)期货策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的日K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()

        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < max(VI_PERIOD, ATR_PERIOD) + 5:
                continue

            # 计算涡旋指标
            df = pd.DataFrame(klines)
            df = calculate_vortex(df, VI_PERIOD)

            # 计算ATR
            atr_data = ATR(klines, ATR_PERIOD)
            current_atr = float(atr_data.atr.iloc[-1])

            # 获取最新和前一个周期的数据
            current_price = float(klines.close.iloc[-1])
            current_plus_vi = float(df.plus_vi.iloc[-1])
            current_minus_vi = float(df.minus_vi.iloc[-1])

            prev_plus_vi = float(df.plus_vi.iloc[-2])
            prev_minus_vi = float(df.minus_vi.iloc[-2])

            # 获取当前日期
            current_datetime = pd.to_datetime(klines.datetime.iloc[-1], unit='ns')
            date_str = current_datetime.strftime('%Y-%m-%d')

            # 输出调试信息
            print(f"日期: {date_str}, 价格: {current_price}, +VI: {current_plus_vi:.4f}, -VI: {current_minus_vi:.4f}, ATR: {current_atr:.2f}")

            # ===== 交易逻辑 =====

            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头信号: +VI上穿-VI且+VI > 阈值
                if prev_plus_vi <= prev_minus_vi and current_plus_vi > current_minus_vi and current_plus_vi > VI_THRESHOLD:
                    # 设置入场价格
                    entry_price = current_price

                    # 设置止损价格
                    stop_loss_price = entry_price - ATR_MULTIPLIER * current_atr

                    # 设置持仓方向和目标持仓
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)

                    print(f"多头开仓: 价格={entry_price}, 手数={POSITION_SIZE}, 止损价={stop_loss_price:.2f}")

                # 空头信号: -VI上穿+VI且-VI > 阈值
                elif prev_minus_vi <= prev_plus_vi and current_minus_vi > current_plus_vi and current_minus_vi > VI_THRESHOLD:
                    # 设置入场价格
                    entry_price = current_price

                    # 设置止损价格
                    stop_loss_price = entry_price + ATR_MULTIPLIER * current_atr

                    # 设置持仓方向和目标持仓
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)

                    print(f"空头开仓: 价格={entry_price}, 手数={POSITION_SIZE}, 止损价={stop_loss_price:.2f}")

            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 条件1: 止损触发
                if current_price <= stop_loss_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 条件2: 信号反转 (-VI上穿+VI)
                elif prev_minus_vi <= prev_plus_vi and current_minus_vi > current_plus_vi:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 条件1: 止损触发
                if current_price >= stop_loss_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

                # 条件2: 信号反转 (+VI上穿-VI)
                elif prev_plus_vi <= prev_minus_vi and current_plus_vi > current_minus_vi:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")

except BacktestFinished as e:
    print(f"策略运行异常: {e}")
    api.close()

Hull移动平均线策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/hull-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date

import numpy as np
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import ATR

# ===== 全局参数设置 =====
SYMBOL = "SHFE.au2306"  # 黄金期货合约
POSITION_SIZE = 30  # 单次交易手数
START_DATE = date(2022, 11, 1)  # 回测开始日期
END_DATE = date(2023, 4, 1)  # 回测结束日期

# Hull Moving Average 参数
LONG_HMA_PERIOD = 30  # 长周期HMA,用于确定大趋势
SHORT_HMA_PERIOD = 5  # 短周期HMA,用于入场信号

# 止损止盈参数
ATR_PERIOD = 14  # ATR计算周期
STOP_LOSS_ATR = 2.0  # 止损为入场点的2倍ATR
TAKE_PROFIT_ATR = 4.0  # 获利为入场点的4倍ATR
TRAILING_STOP = True  # 使用移动止损

# 新增止盈参数
FIXED_TAKE_PROFIT_PCT = 0.03  # 固定止盈比例(3%)
USE_TRAILING_PROFIT = True  # 是否使用追踪止盈
TRAILING_PROFIT_THRESHOLD = 0.02  # 触发追踪止盈的收益率阈值(2%)
TRAILING_PROFIT_STEP = 0.005  # 追踪止盈回撤幅度(0.5%)

# ===== 全局变量 =====
current_direction = 0   # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0         # 开仓价格
stop_loss_price = 0     # 止损价格
take_profit_price = 0   # 止盈价格
highest_since_entry = 0 # 入场后的最高价(用于多头追踪止盈)
lowest_since_entry = 0  # 入场后的最低价(用于空头追踪止盈)

# ===== Hull移动平均线相关函数 =====
def wma(series, period):
    """计算加权移动平均线"""
    weights = np.arange(1, period + 1)
    return series.rolling(period).apply(lambda x: np.sum(weights * x) / weights.sum(), raw=True)

def hma(series, period):
    """计算Hull移动平均线"""
    period = int(period)
    if period < 3:
        return series
    
    half_period = period // 2
    sqrt_period = int(np.sqrt(period))
    
    wma1 = wma(series, half_period)
    wma2 = wma(series, period)
    
    raw_hma = 2 * wma1 - wma2
    
    return wma(raw_hma, sqrt_period)

# ===== 策略开始 =====
print("开始运行Hull移动平均线期货策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()
        
        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < max(SHORT_HMA_PERIOD, LONG_HMA_PERIOD, ATR_PERIOD) + 10:
                continue
                
            # 计算短期和长期HMA
            klines['short_hma'] = hma(klines.close, SHORT_HMA_PERIOD)
            klines['long_hma'] = hma(klines.close, LONG_HMA_PERIOD)
            
            # 计算ATR用于止损设置
            atr_data = ATR(klines, ATR_PERIOD)
            
            # 获取最新数据
            current_price = float(klines.close.iloc[-1])
            current_short_hma = float(klines.short_hma.iloc[-1])
            current_long_hma = float(klines.long_hma.iloc[-1])
            current_atr = float(atr_data.atr.iloc[-1])
            
            # 获取前一个周期的数据
            prev_short_hma = float(klines.short_hma.iloc[-2])
            prev_long_hma = float(klines.long_hma.iloc[-2])
            
            # 输出调试信息
            print(f"价格: {current_price}, 短期HMA: {current_short_hma:.2f}, 长期HMA: {current_long_hma:.2f}")
            
            # ===== 交易逻辑 =====
            
            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件
                if (prev_short_hma <= prev_long_hma and current_short_hma > current_long_hma
                        and current_price > current_long_hma):
                    
                    print(f"多头开仓信号: 短期HMA上穿长期HMA")
                    
                    # 设置持仓和记录开仓价格
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    
                    # 设置止损价格
                    stop_loss_price = current_long_hma - STOP_LOSS_ATR * current_atr
                    
                    # 设置止盈价格
                    take_profit_price = entry_price * (1 + FIXED_TAKE_PROFIT_PCT)
                    
                    # 重置追踪止盈变量
                    highest_since_entry = entry_price
                    
                    print(f"多头开仓价格: {entry_price}, 止损: {stop_loss_price:.2f}, 止盈: {take_profit_price:.2f}")
                
                # 空头开仓条件
                elif (prev_short_hma >= prev_long_hma and current_short_hma < current_long_hma
                      and current_price < current_long_hma):
                    
                    print(f"空头开仓信号: 短期HMA下穿长期HMA")
                    
                    # 设置持仓和记录开仓价格
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    
                    # 设置止损价格
                    stop_loss_price = current_long_hma + STOP_LOSS_ATR * current_atr
                    
                    # 设置止盈价格
                    take_profit_price = entry_price * (1 - FIXED_TAKE_PROFIT_PCT)
                    
                    # 重置追踪止盈变量
                    lowest_since_entry = entry_price
                    
                    print(f"空头开仓价格: {entry_price}, 止损: {stop_loss_price:.2f}, 止盈: {take_profit_price:.2f}")
            
            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 更新入场后的最高价
                if current_price > highest_since_entry:
                    highest_since_entry = current_price
                
                # 计算固定止损
                fixed_stop_loss = entry_price * (1 - STOP_LOSS_ATR)
                
                # 更新追踪止损(只上移不下移)
                new_stop = current_short_hma - STOP_LOSS_ATR * current_atr
                if new_stop > stop_loss_price:
                    stop_loss_price = new_stop
                    print(f"更新多头止损: {stop_loss_price:.2f}")
                
                # 平仓条件1: 短期HMA下穿长期HMA
                if prev_short_hma >= prev_long_hma and current_short_hma < current_long_hma:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 平仓条件2: 价格跌破止损
                elif current_price < stop_loss_price or current_price < fixed_stop_loss:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 平仓条件3: 价格达到止盈价格
                elif current_price >= take_profit_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈: 价格={current_price}, 盈亏={profit_pct:.2f}%")
            
            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 更新入场后的最低价
                if current_price < lowest_since_entry or lowest_since_entry == 0:
                    lowest_since_entry = current_price
                
                # 计算固定止损
                fixed_stop_loss = entry_price * (1 + STOP_LOSS_ATR)
                
                # 更新追踪止损(只下移不上移)
                new_stop = current_short_hma + STOP_LOSS_ATR * current_atr
                if new_stop < stop_loss_price or stop_loss_price == 0:
                    stop_loss_price = new_stop
                    print(f"更新空头止损: {stop_loss_price:.2f}")
                
                # 平仓条件1: 短期HMA上穿长期HMA
                if prev_short_hma <= prev_long_hma and current_short_hma > current_long_hma:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 平仓条件2: 价格突破止损
                elif current_price > stop_loss_price or current_price > fixed_stop_loss:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损: 价格={current_price}, 盈亏={profit_pct:.2f}%")
                
                # 平仓条件3: 价格达到止盈价格
                elif current_price <= take_profit_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈: 价格={current_price}, 盈亏={profit_pct:.2f}%")

except BacktestFinished as e:
    print("回测结束")
    api.close()

Keltner Channel策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/keltner-channel-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from datetime import date
import numpy as np
import pandas as pd

# ===== 全局参数设置 =====
SYMBOL = "CFFEX.IC2306"  # 中证500指数期货合约
POSITION_SIZE = 30  # 持仓手数(黄金的合适仓位)
START_DATE = date(2022, 11, 1)  # 回测开始日期
END_DATE = date(2023, 4, 30)  # 回测结束日期

# Keltner Channel参数
EMA_PERIOD = 8  # EMA周期
ATR_PERIOD = 7  # ATR周期
ATR_MULTIPLIER = 1.5  # ATR乘数

# 新增参数 - 趋势确认与止损
SHORT_EMA_PERIOD = 5  # 短期EMA用于趋势确认
STOP_LOSS_PCT = 0.8  # 止损百分比(相对于ATR)
TRAILING_STOP = True  # 使用移动止损

print(f"开始回测 {SYMBOL} 的Keltner Channel策略...")
print(f"参数: EMA周期={EMA_PERIOD}, ATR周期={ATR_PERIOD}, ATR乘数={ATR_MULTIPLIER}")
print(f"额外参数: 短期EMA={SHORT_EMA_PERIOD}, 止损参数={STOP_LOSS_PCT}ATR, 移动止损={TRAILING_STOP}")

try:
    api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
                auth=TqAuth("快期账号", "快期密码"))

    # 订阅K线数据
    klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日K线
    # 订阅行情获取交易时间
    quote = api.get_quote(SYMBOL)
    target_pos = TargetPosTask(api, SYMBOL)

    # 初始化交易状态
    position = 0  # 当前持仓
    entry_price = 0  # 入场价格
    stop_loss = 0  # 止损价格
    high_since_entry = 0  # 入场后的最高价(用于移动止损)
    low_since_entry = 0  # 入场后的最低价(用于移动止损)
    trend_strength = 0  # 趋势强度

    # 记录交易信息
    trades = []

    while True:
        api.wait_update()

        if api.is_changing(klines):
            # 确保有足够的数据
            if len(klines) < max(EMA_PERIOD, ATR_PERIOD, SHORT_EMA_PERIOD) + 1:
                continue

            # 计算指标
            close = klines.close.values
            high = klines.high.values
            low = klines.low.values

            # 计算中轨(EMA)和短期EMA(用于趋势确认)
            ema = pd.Series(close).ewm(span=EMA_PERIOD, adjust=False).mean().values
            ema_short = pd.Series(close).ewm(span=SHORT_EMA_PERIOD, adjust=False).mean().values

            # 计算趋势方向和强度
            trend_direction = 1 if ema_short[-1] > ema[-1] else -1 if ema_short[-1] < ema[-1] else 0
            trend_strength = abs(ema_short[-1] - ema[-1]) / close[-1] * 100  # 趋势强度百分比

            # 计算ATR
            tr = np.maximum(high - low,
                            np.maximum(
                                np.abs(high - np.roll(close, 1)),
                                np.abs(low - np.roll(close, 1))
                            ))
            atr = pd.Series(tr).rolling(ATR_PERIOD).mean().values
            current_atr = float(atr[-1])

            # 动态调整ATR乘数,根据趋势强度调整通道宽度
            dynamic_multiplier = ATR_MULTIPLIER
            if trend_strength > 0.5:  # 强趋势时使用更窄的通道
                dynamic_multiplier = ATR_MULTIPLIER * 0.8

            # 计算通道上下轨
            upper_band = ema + dynamic_multiplier * atr
            lower_band = ema - dynamic_multiplier * atr

            # 获取当前价格和指标值
            current_price = float(close[-1])
            current_upper = float(upper_band[-1])
            current_lower = float(lower_band[-1])
            current_ema = float(ema[-1])
            current_time = quote.datetime  # 使用quote的datetime获取当前时间

            # 更新入场后的最高/最低价
            if position > 0:
                high_since_entry = max(high_since_entry, current_price)
                # 更新移动止损
                if TRAILING_STOP and high_since_entry > entry_price:
                    trailing_stop = high_since_entry * (1 - STOP_LOSS_PCT * current_atr / current_price)
                    stop_loss = max(stop_loss, trailing_stop)
            elif position < 0:
                low_since_entry = min(low_since_entry, current_price)
                # 更新移动止损
                if TRAILING_STOP and low_since_entry < entry_price:
                    trailing_stop = low_since_entry * (1 + STOP_LOSS_PCT * current_atr / current_price)
                    stop_loss = min(stop_loss if stop_loss > 0 else float('inf'), trailing_stop)

            # 交易逻辑
            if position == 0:  # 空仓
                # 确认趋势方向并突破通道
                if current_price > current_upper and trend_direction > 0:
                    # 增加成交量过滤
                    position = POSITION_SIZE
                    entry_price = current_price
                    high_since_entry = current_price
                    low_since_entry = current_price
                    # 设置初始止损
                    stop_loss = current_price * (1 - STOP_LOSS_PCT * current_atr / current_price)
                    target_pos.set_target_volume(position)
                    print(f"开多仓: 价格={current_price:.2f}, 上轨={current_upper:.2f}, 止损={stop_loss:.2f}")
                    trades.append(("开多", current_time, current_price))

                elif current_price < current_lower and trend_direction < 0:
                    position = -POSITION_SIZE
                    entry_price = current_price
                    high_since_entry = current_price
                    low_since_entry = current_price
                    # 设置初始止损
                    stop_loss = current_price * (1 + STOP_LOSS_PCT * current_atr / current_price)
                    target_pos.set_target_volume(position)
                    print(f"开空仓: 价格={current_price:.2f}, 下轨={current_lower:.2f}, 止损={stop_loss:.2f}")
                    trades.append(("开空", current_time, current_price))

            elif position > 0:  # 持有多头
                # 止损、回落到中轨或趋势转向时平仓
                if (current_price <= stop_loss or
                        current_price <= current_ema or
                        (current_price < current_lower and trend_direction < 0)):
                    profit_pct = (current_price / entry_price - 1) * 100
                    profit_points = current_price - entry_price
                    target_pos.set_target_volume(0)
                    print(f"平多仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%, {profit_points:.2f}点")
                    position = 0
                    entry_price = 0
                    stop_loss = 0
                    trades.append(("平多", current_time, current_price))

            elif position < 0:  # 持有空头
                # 止损、回升到中轨或趋势转向时平仓
                if (current_price >= stop_loss or
                        current_price >= current_ema or
                        (current_price > current_upper and trend_direction > 0)):
                    profit_pct = (entry_price / current_price - 1) * 100
                    profit_points = entry_price - current_price
                    target_pos.set_target_volume(0)
                    print(f"平空仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%, {profit_points:.2f}点")
                    position = 0
                    entry_price = 0
                    stop_loss = 0
                    trades.append(("平空", current_time, current_price))

except BacktestFinished as e:
    print(f"回测完成: {e}")
    api.close()

Qstick趋势策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/qstick-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
import pandas as pd
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import ATR

# ===== 全局参数设置 =====
SYMBOL = "CFFEX.IC2303"  # 中证500指数期货
POSITION_SIZE = 30  # 持仓手数
START_DATE = date(2022, 8, 1)  # 回测开始日期
END_DATE = date(2023, 1, 30)  # 回测结束日期

# Qstick指标参数
QSTICK_PERIOD = 10  # Qstick周期
SMA_PERIOD = 8  # 价格SMA周期,用于确认趋势

# 风控参数
ATR_PERIOD = 14  # ATR计算周期
STOP_LOSS_MULTIPLIER = 2.0  # 止损ATR倍数
MAX_HOLDING_DAYS = 5  # 最大持仓天数

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
stop_loss_price = 0  # 止损价格
entry_date = None  # 开仓日期


# ===== Qstick指标计算函数 =====
def calculate_qstick(open_prices, close_prices, period):
    """计算Qstick指标"""
    # 计算收盘价与开盘价的差值
    diff = close_prices - open_prices
    # 计算移动平均
    qstick = diff.rolling(window=period).mean()
    return qstick


# ===== 策略开始 =====
print("开始运行Qstick趋势指标期货策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()

        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < max(QSTICK_PERIOD, SMA_PERIOD, ATR_PERIOD) + 5:
                continue

            # 计算Qstick指标
            klines['qstick'] = calculate_qstick(klines.open, klines.close, QSTICK_PERIOD)

            # 计算价格SMA,用于确认趋势
            klines['price_sma'] = klines.close.rolling(window=SMA_PERIOD).mean()

            # 计算ATR用于设置止损
            atr_data = ATR(klines, ATR_PERIOD)

            # 获取最新数据
            current_price = float(klines.close.iloc[-1])
            current_datetime = pd.to_datetime(klines.datetime.iloc[-1], unit='ns')
            current_qstick = float(klines.qstick.iloc[-1])
            previous_qstick = float(klines.qstick.iloc[-2])
            current_price_sma = float(klines.price_sma.iloc[-1])
            current_atr = float(atr_data.atr.iloc[-1])

            # 输出调试信息
            print(f"日期: {current_datetime.strftime('%Y-%m-%d')}, 价格: {current_price:.2f}")
            print(f"Qstick: {current_qstick:.4f}")

            # ===== 交易逻辑 =====

            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件:Qstick从负值向上穿越零轴,价格在SMA上方
                if previous_qstick < 0 and current_qstick > 0 and current_price > current_price_sma:
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    entry_date = current_datetime

                    # 设置止损价格
                    stop_loss_price = entry_price - STOP_LOSS_MULTIPLIER * current_atr

                    print(f"多头开仓: 价格={entry_price}, 止损={stop_loss_price:.2f}")

                # 空头开仓条件:Qstick从正值向下穿越零轴,价格在SMA下方
                elif previous_qstick > 0 and current_qstick < 0 and current_price < current_price_sma:
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    entry_date = current_datetime

                    # 设置止损价格
                    stop_loss_price = entry_price + STOP_LOSS_MULTIPLIER * current_atr

                    print(f"空头开仓: 价格={entry_price}, 止损={stop_loss_price:.2f}")

            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 计算持仓天数
                holding_days = (current_datetime - entry_date).days

                # 1. 止损条件:价格低于止损价格
                if current_price <= stop_loss_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}")

                # 2. 反向信号平仓:Qstick从正值向下穿越零轴
                elif previous_qstick > 0 and current_qstick < 0:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}")

                # 3. 时间止损:持仓时间过长
                elif holding_days >= MAX_HOLDING_DAYS:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头时间平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}")

            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 计算持仓天数
                holding_days = (current_datetime - entry_date).days

                # 1. 止损条件:价格高于止损价格
                if current_price >= stop_loss_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}")

                # 2. 反向信号平仓:Qstick从负值向上穿越零轴
                elif previous_qstick < 0 and current_qstick > 0:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头信号平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}")

                # 3. 时间止损:持仓时间过长
                elif holding_days >= MAX_HOLDING_DAYS:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头时间平仓: 价格={current_price}, 盈亏={profit_pct:.2f}%, 持仓天数={holding_days}")

except BacktestFinished as e:
    print("回测结束")
    api.close()

Aroon指标趋势策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/aroon-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
import pandas as pd
from datetime import date

# ===== 全局参数设置 =====
SYMBOL = "SHFE.au2306"  # 黄金期货合约
POSITION_SIZE = 30  # 持仓手数
START_DATE = date(2023, 2, 20)  # 回测开始日期
END_DATE = date(2023, 5, 5)  # 回测结束日期

# Aroon指标参数
AROON_PERIOD = 10  # Aroon计算周期
AROON_UPPER_THRESHOLD = 75  # Aroon上线阈值
AROON_LOWER_THRESHOLD = 25  # Aroon下线阈值

# 风控参数
STOP_LOSS_PCT = 1.2  # 止损百分比

# ===== 全局变量 =====
position = 0  # 当前持仓
entry_price = 0  # 入场价格
trades = []  # 交易记录

# ===== 主程序 =====
print(f"开始回测 {SYMBOL} 的Aroon指标策略...")
print(f"参数: Aroon周期={AROON_PERIOD}, 上阈值={AROON_UPPER_THRESHOLD}, 下阈值={AROON_LOWER_THRESHOLD}")
print(f"回测期间: {START_DATE}{END_DATE}")

try:
    # 创建API实例
    api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
                auth=TqAuth("快期账号", "快期密码"))

    # 订阅K线数据
    klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日K线
    target_pos = TargetPosTask(api, SYMBOL)

    # 主循环
    while True:
        api.wait_update()

        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据
            if len(klines) < AROON_PERIOD + 10:
                continue

            # ===== 计算Aroon指标 =====
            # 找出最近N周期内最高价和最低价的位置
            klines['rolling_high'] = klines['high'].rolling(window=AROON_PERIOD).max()
            klines['rolling_low'] = klines['low'].rolling(window=AROON_PERIOD).min()

            # 初始化Aroon Up和Aroon Down数组
            aroon_up = []
            aroon_down = []

            # 遍历计算每个时间点的Aroon值
            for i in range(len(klines)):
                if i < AROON_PERIOD - 1:
                    aroon_up.append(0)
                    aroon_down.append(0)
                    continue

                period_data = klines.iloc[i - AROON_PERIOD + 1:i + 1]
                # 明确指定skipna=True来处理NaN值
                high_idx = period_data['high'].fillna(float('-inf')).argmax()
                low_idx = period_data['low'].fillna(float('inf')).argmin()

                days_since_high = i - (i - AROON_PERIOD + 1 + high_idx)
                days_since_low = i - (i - AROON_PERIOD + 1 + low_idx)

                aroon_up.append(((AROON_PERIOD - days_since_high) / AROON_PERIOD) * 100)
                aroon_down.append(((AROON_PERIOD - days_since_low) / AROON_PERIOD) * 100)

            # 将Aroon值添加到klines
            klines['aroon_up'] = aroon_up
            klines['aroon_down'] = aroon_down

            # 计算Aroon Oscillator (可选)
            klines['aroon_osc'] = klines['aroon_up'] - klines['aroon_down']

            # 获取当前和前一个周期的数据
            current_price = float(klines.close.iloc[-1])
            current_time = pd.to_datetime(klines.datetime.iloc[-1], unit='ns')
            current_aroon_up = float(klines.aroon_up.iloc[-1])
            current_aroon_down = float(klines.aroon_down.iloc[-1])

            prev_aroon_up = float(klines.aroon_up.iloc[-2])
            prev_aroon_down = float(klines.aroon_down.iloc[-2])

            # 输出当前指标值,帮助调试
            print(f"当前K线: {current_time.strftime('%Y-%m-%d')}, 价格: {current_price:.2f}")
            print(f"Aroon Up: {current_aroon_up:.2f}, Aroon Down: {current_aroon_down:.2f}")

            # ===== 止损检查 =====
            if position != 0 and entry_price != 0:
                if position > 0:  # 多头止损
                    profit_pct = (current_price / entry_price - 1) * 100
                    if profit_pct < -STOP_LOSS_PCT:
                        print(f"触发止损: 当前价格={current_price}, 入场价={entry_price}, 亏损={profit_pct:.2f}%")
                        target_pos.set_target_volume(0)
                        trades.append({
                            'type': '止损平多',
                            'time': current_time,
                            'price': current_price,
                            'profit_pct': profit_pct
                        })
                        print(f"止损平多: {current_time}, 价格: {current_price:.2f}, 亏损: {profit_pct:.2f}%")
                        position = 0
                        entry_price = 0
                        continue

                elif position < 0:  # 空头止损
                    profit_pct = (entry_price / current_price - 1) * 100
                    if profit_pct < -STOP_LOSS_PCT:
                        print(f"触发止损: 当前价格={current_price}, 入场价={entry_price}, 亏损={profit_pct:.2f}%")
                        target_pos.set_target_volume(0)
                        trades.append({
                            'type': '止损平空',
                            'time': current_time,
                            'price': current_price,
                            'profit_pct': profit_pct
                        })
                        print(f"止损平空: {current_time}, 价格: {current_price:.2f}, 亏损: {profit_pct:.2f}%")
                        position = 0
                        entry_price = 0
                        continue

            # ===== 交易信号判断 =====
            # 1. Aroon交叉信号
            aroon_cross_up = prev_aroon_up < prev_aroon_down and current_aroon_up > current_aroon_down
            aroon_cross_down = prev_aroon_up > prev_aroon_down and current_aroon_up < current_aroon_down

            # 2. 强势信号
            strong_up = current_aroon_up > AROON_UPPER_THRESHOLD and current_aroon_down < AROON_LOWER_THRESHOLD
            strong_down = current_aroon_down > AROON_UPPER_THRESHOLD and current_aroon_up < AROON_LOWER_THRESHOLD

            # ===== 交易决策 =====
            if position == 0:  # 空仓状态
                # 多头信号
                if aroon_cross_up or strong_up:
                    position = POSITION_SIZE
                    entry_price = current_price
                    target_pos.set_target_volume(position)
                    signal_type = "交叉" if aroon_cross_up else "强势"
                    trades.append({
                        'type': '开多',
                        'time': current_time,
                        'price': current_price,
                        'signal': signal_type
                    })
                    print(f"开多仓: {current_time}, 价格: {current_price:.2f}, 信号: Aroon {signal_type}")

                # 空头信号
                elif aroon_cross_down or strong_down:
                    position = -POSITION_SIZE
                    entry_price = current_price
                    target_pos.set_target_volume(position)
                    signal_type = "交叉" if aroon_cross_down else "强势"
                    trades.append({
                        'type': '开空',
                        'time': current_time,
                        'price': current_price,
                        'signal': signal_type
                    })
                    print(f"开空仓: {current_time}, 价格: {current_price:.2f}, 信号: Aroon {signal_type}")

            elif position > 0:  # 持有多头
                # 平多信号
                if aroon_cross_down:
                    profit_pct = (current_price / entry_price - 1) * 100
                    target_pos.set_target_volume(0)
                    trades.append({
                        'type': '平多',
                        'time': current_time,
                        'price': current_price,
                        'profit_pct': profit_pct
                    })
                    print(f"平多仓: {current_time}, 价格: {current_price:.2f}, 盈亏: {profit_pct:.2f}%")
                    position = 0
                    entry_price = 0

            elif position < 0:  # 持有空头
                # 平空信号
                if aroon_cross_up:
                    profit_pct = (entry_price / current_price - 1) * 100
                    target_pos.set_target_volume(0)
                    trades.append({
                        'type': '平空',
                        'time': current_time,
                        'price': current_price,
                        'profit_pct': profit_pct
                    })
                    print(f"平空仓: {current_time}, 价格: {current_price:.2f}, 盈亏: {profit_pct:.2f}%")
                    position = 0
                    entry_price = 0

except BacktestFinished as e:
    print("回测结束")
    api.close()

量价趋势策略(VPT) (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/trend-following/vpt-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date

import numpy as np
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished

# ===== 全局参数设置 =====
SYMBOL = "SHFE.au2306"  # 交易合约
POSITION_SIZE = 30  # 持仓手数
START_DATE = date(2023, 1, 15)  # 回测开始日期
END_DATE = date(2023, 5, 15)  # 回测结束日期

# VPT策略参数
VPT_MA_PERIOD = 14  # VPT均线周期
VOLUME_THRESHOLD = 1.5  # 成交量放大倍数阈值

print(f"开始回测 {SYMBOL} 的量价趋势(VPT)策略...")
print(f"参数: VPT均线周期={VPT_MA_PERIOD}, 成交量阈值={VOLUME_THRESHOLD}")

api = None
try:
    api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
                auth=TqAuth("快期账号", "快期密码"))

    # 订阅日K线数据
    klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)
    target_pos = TargetPosTask(api, SYMBOL)

    # 初始化交易状态
    position = 0  # 当前持仓
    entry_price = 0  # 入场价格
    vpt_values = []  # 存储VPT值

    while True:
        api.wait_update()

        if api.is_changing(klines):
            # 确保有足够的数据
            if len(klines) < VPT_MA_PERIOD + 1:
                continue

            # 计算VPT指标
            close = klines.close.values
            volume = klines.volume.values

            # 计算最新的VPT值
            if len(vpt_values) == 0:
                vpt_values.append(volume[-1])
            else:
                price_change_pct = (close[-1] - close[-2]) / close[-2]
                new_vpt = vpt_values[-1] + volume[-1] * price_change_pct
                vpt_values.append(new_vpt)

            # 保持VPT列表长度与K线数据同步
            if len(vpt_values) > len(klines):
                vpt_values.pop(0)

            # 计算VPT均线
            if len(vpt_values) >= VPT_MA_PERIOD:
                vpt_ma = np.mean(vpt_values[-VPT_MA_PERIOD:])

                # 获取当前价格和成交量数据
                current_price = float(close[-1])
                current_volume = float(volume[-1])
                avg_volume = np.mean(volume[-VPT_MA_PERIOD:-1])

                # 判断成交量是否放大
                volume_increased = current_volume > avg_volume * VOLUME_THRESHOLD

                # 交易信号判断
                vpt_trend_up = vpt_values[-1] > vpt_ma
                vpt_trend_down = vpt_values[-1] < vpt_ma

                # 交易逻辑
                if position == 0:  # 空仓
                    if vpt_trend_up and volume_increased and close[-1] > close[-2]:
                        position = POSITION_SIZE
                        entry_price = current_price
                        target_pos.set_target_volume(position)
                        print(f"开多仓: 价格={current_price:.2f}, VPT上穿均线")

                    elif vpt_trend_down and volume_increased and close[-1] < close[-2]:
                        position = -POSITION_SIZE
                        entry_price = current_price
                        target_pos.set_target_volume(position)
                        print(f"开空仓: 价格={current_price:.2f}, VPT下穿均线")

                elif position > 0:  # 持有多头
                    if vpt_trend_down and volume_increased:
                        profit_pct = (current_price / entry_price - 1) * 100
                        target_pos.set_target_volume(0)
                        print(f"平多仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")
                        position = 0
                        entry_price = 0

                elif position < 0:  # 持有空头
                    if vpt_trend_up and volume_increased:
                        profit_pct = (entry_price / current_price - 1) * 100
                        target_pos.set_target_volume(0)
                        print(f"平空仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")
                        position = 0
                        entry_price = 0

except BacktestFinished as e:
    print(f"策略运行异常: {e}")
    api.close()

套利策略

PTA产业链套利 (涤纶短纤生产利润)策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/pta-spread

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from datetime import date
from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
import numpy as np
import time

# === 用户参数 ===
# 合约参数
PF = "CZCE.PF409"      # 涤纶短纤期货合约
PTA = "CZCE.TA409"     # PTA期货合约
EG = "DCE.eg2409"      # 乙二醇期货合约
START_DATE = date(2024, 2, 1)   # 回测开始日期
END_DATE = date(2024, 4, 30)     # 回测结束日期

# 套利参数
LOOKBACK_DAYS = 30        # 计算历史价差的回溯天数
STD_THRESHOLD = 2.0       # 标准差阈值,超过此阈值视为套利机会
ORDER_VOLUME = 500         # 涤纶短纤的下单手数
CLOSE_THRESHOLD = 0.5     # 平仓阈值(标准差)

# 生产比例(可根据实际工艺调整)
PF_RATIO = 1
PTA_RATIO = 0.86
EG_RATIO = 0.34

# === 初始化API ===
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 获取合约行情和K线
pf_quote = api.get_quote(PF)
pta_quote = api.get_quote(PTA)
eg_quote = api.get_quote(EG)

pf_klines = api.get_kline_serial(PF, 60*60*24, LOOKBACK_DAYS)
pta_klines = api.get_kline_serial(PTA, 60*60*24, LOOKBACK_DAYS)
eg_klines = api.get_kline_serial(EG, 60*60*24, LOOKBACK_DAYS)

# 创建目标持仓任务
pf_pos = TargetPosTask(api, PF)
pta_pos = TargetPosTask(api, PTA)
eg_pos = TargetPosTask(api, EG)

# 获取合约乘数
pf_volume_multiple = pf_quote.volume_multiple
pta_volume_multiple = pta_quote.volume_multiple
eg_volume_multiple = eg_quote.volume_multiple

# 初始化状态变量
position_time = 0        # 建仓时间
in_position = False      # 是否有持仓
mean_spread = 0          # 历史价差均值
std_spread = 0           # 历史价差标准差

print(f"策略启动,监控合约: {PF}, {PTA}, {EG}")

# === 主循环 ===
try:
    # 初始计算历史统计值
    spreads = []
    for i in range(len(pf_klines) - 1):
        pf_price = pf_klines.close.iloc[i] * pf_volume_multiple * PF_RATIO
        pta_price = pta_klines.close.iloc[i] * pta_volume_multiple * PTA_RATIO
        eg_price = eg_klines.close.iloc[i] * eg_volume_multiple * EG_RATIO
        
        # 涤纶短纤生产利润 = 涤纶短纤价值 - (PTA成本 + EG成本)
        spread = pf_price - (pta_price + eg_price)
        spreads.append(spread)
    
    mean_spread = np.mean(spreads)
    std_spread = np.std(spreads)
    print(f"历史涤纶短纤利润均值: {mean_spread:.2f}, 标准差: {std_spread:.2f}")

    # 主循环
    while True:
        api.wait_update()
        
        # 当K线数据有变化时进行计算
        if api.is_changing(pf_klines) or api.is_changing(pta_klines) or api.is_changing(eg_klines):
            # 重新计算历史价差统计
            spreads = []
            for i in range(len(pf_klines) - 1):
                pf_price = pf_klines.close.iloc[i] * pf_volume_multiple * PF_RATIO
                pta_price = pta_klines.close.iloc[i] * pta_volume_multiple * PTA_RATIO
                eg_price = eg_klines.close.iloc[i] * eg_volume_multiple * EG_RATIO
                
                spread = pf_price - (pta_price + eg_price)
                spreads.append(spread)
            
            mean_spread = np.mean(spreads)
            std_spread = np.std(spreads)
            
            # 计算当前利润价差
            pf_price = pf_klines.close.iloc[-1] * pf_volume_multiple * PF_RATIO
            pta_price = pta_klines.close.iloc[-1] * pta_volume_multiple * PTA_RATIO
            eg_price = eg_klines.close.iloc[-1] * eg_volume_multiple * EG_RATIO
            
            current_spread = pf_price - (pta_price + eg_price)
            
            # 计算z-score (标准化的价差)
            z_score = (current_spread - mean_spread) / std_spread
            
            print(f"当前涤纶短纤利润: {current_spread:.2f}, Z-score: {z_score:.2f}")
            
            # 获取当前持仓
            pf_position = api.get_position(PF)
            pta_position = api.get_position(PTA)
            eg_position = api.get_position(EG)
            
            current_pf_pos = pf_position.pos_long - pf_position.pos_short
            current_pta_pos = pta_position.pos_long - pta_position.pos_short
            current_eg_pos = eg_position.pos_long - eg_position.pos_short
            
            # 计算实际下单手数(依据比例)
            pta_volume = int(ORDER_VOLUME * PTA_RATIO / PF_RATIO)
            eg_volume = int(ORDER_VOLUME * EG_RATIO / PF_RATIO)
            
            # === 交易信号判断 ===
            if not in_position:  # 如果没有持仓
                if z_score > STD_THRESHOLD:  # 利润显著高于均值
                    # 做空利润:卖出PF,买入PTA和EG
                    print(f"做空利润:卖出PF{ORDER_VOLUME}手,买入PTA{pta_volume}手和EG{eg_volume}手")
                    pf_pos.set_target_volume(-ORDER_VOLUME)
                    pta_pos.set_target_volume(pta_volume)
                    eg_pos.set_target_volume(eg_volume)
                    position_time = time.time()
                    in_position = True
                    
                elif z_score < -STD_THRESHOLD:  # 利润显著低于均值
                    # 做多利润:买入PF,卖出PTA和EG
                    print(f"做多利润:买入PF{ORDER_VOLUME}手,卖出PTA{pta_volume}手和EG{eg_volume}手")
                    pf_pos.set_target_volume(ORDER_VOLUME)
                    pta_pos.set_target_volume(-pta_volume)
                    eg_pos.set_target_volume(-eg_volume)
                    position_time = time.time()
                    in_position = True
            
            else:  # 如果已有持仓
                # 检查是否应当平仓
                if abs(z_score) < CLOSE_THRESHOLD:  # 利润回归正常
                    print("利润回归正常,平仓所有头寸")
                    pf_pos.set_target_volume(0)
                    pta_pos.set_target_volume(0)
                    eg_pos.set_target_volume(0)
                    in_position = False

                
                # 止损逻辑
                if (z_score > STD_THRESHOLD * 1.5 and current_pf_pos > 0) or \
                   (z_score < -STD_THRESHOLD * 1.5 and current_pf_pos < 0):
                    print("止损:利润向不利方向进一步偏离")
                    pf_pos.set_target_volume(0)
                    pta_pos.set_target_volume(0)
                    eg_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()

养殖价差套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/smash-spread

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from datetime import date

from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
import numpy as np
import time

# === 用户参数 ===
# 合约参数
LIVE_HOG = "DCE.lh2409"  # 生猪期货合约
CORN = "DCE.c2409"  # 玉米期货合约
SOYMEAL = "DCE.m2409"  # 豆粕期货合约
START_DATE = date(2023, 11, 1)  # 回测开始日期
END_DATE = date(2024, 3, 13)  # 回测结束日期

# 套利参数
LOOKBACK_DAYS = 30  # 计算历史价差的回溯天数
STD_THRESHOLD = 2.0  # 标准差阈值,超过此阈值视为套利机会
ORDER_VOLUME = 100  # 生猪的下单手数
CLOSE_THRESHOLD = 0.5  # 平仓阈值(标准差)

# 饲养利润价差比例 - 生产1吨生猪约需要3吨玉米和0.6吨豆粕
# 可根据实际养殖转化比调整
HOG_RATIO = 1
CORN_RATIO = 3
SOYMEAL_RATIO = 0.6

# === 初始化API ===
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 获取合约行情和K线
hog_quote = api.get_quote(LIVE_HOG)
corn_quote = api.get_quote(CORN)
meal_quote = api.get_quote(SOYMEAL)

hog_klines = api.get_kline_serial(LIVE_HOG, 60 * 60 * 24, LOOKBACK_DAYS)
corn_klines = api.get_kline_serial(CORN, 60 * 60 * 24, LOOKBACK_DAYS)
meal_klines = api.get_kline_serial(SOYMEAL, 60 * 60 * 24, LOOKBACK_DAYS)

# 创建目标持仓任务
hog_pos = TargetPosTask(api, LIVE_HOG)
corn_pos = TargetPosTask(api, CORN)
meal_pos = TargetPosTask(api, SOYMEAL)

# 获取合约乘数
hog_volume_multiple = hog_quote.volume_multiple
corn_volume_multiple = corn_quote.volume_multiple
meal_volume_multiple = meal_quote.volume_multiple

# 初始化状态变量
position_time = 0  # 建仓时间
in_position = False  # 是否有持仓
mean_spread = 0  # 历史价差均值
std_spread = 0  # 历史价差标准差

print(f"策略启动,监控合约: {LIVE_HOG}, {CORN}, {SOYMEAL}")

# === 主循环 ===
try:
    # 初始计算历史统计值
    spreads = []
    for i in range(len(hog_klines) - 1):
        hog_price = hog_klines.close.iloc[i] * hog_volume_multiple * HOG_RATIO
        corn_price = corn_klines.close.iloc[i] * corn_volume_multiple * CORN_RATIO
        meal_price = meal_klines.close.iloc[i] * meal_volume_multiple * SOYMEAL_RATIO

        # 饲养利润 = 生猪价值 - 饲料成本价值
        spread = hog_price - (corn_price + meal_price)
        spreads.append(spread)

    mean_spread = np.mean(spreads)
    std_spread = np.std(spreads)
    print(f"历史饲养利润均值: {mean_spread:.2f}, 标准差: {std_spread:.2f}")

    # 主循环
    while True:
        api.wait_update()

        # 当K线数据有变化时进行计算
        if api.is_changing(hog_klines) or api.is_changing(corn_klines) or api.is_changing(meal_klines):
            # 重新计算历史价差统计
            spreads = []
            for i in range(len(hog_klines) - 1):
                hog_price = hog_klines.close.iloc[i] * hog_volume_multiple * HOG_RATIO
                corn_price = corn_klines.close.iloc[i] * corn_volume_multiple * CORN_RATIO
                meal_price = meal_klines.close.iloc[i] * meal_volume_multiple * SOYMEAL_RATIO

                spread = hog_price - (corn_price + meal_price)
                spreads.append(spread)

            mean_spread = np.mean(spreads)
            std_spread = np.std(spreads)

            # 计算当前饲养利润价差
            hog_price = hog_klines.close.iloc[-1] * hog_volume_multiple * HOG_RATIO
            corn_price = corn_klines.close.iloc[-1] * corn_volume_multiple * CORN_RATIO
            meal_price = meal_klines.close.iloc[-1] * meal_volume_multiple * SOYMEAL_RATIO

            current_spread = hog_price - (corn_price + meal_price)

            # 计算z-score (标准化的价差)
            z_score = (current_spread - mean_spread) / std_spread

            print(f"当前饲养利润: {current_spread:.2f}, Z-score: {z_score:.2f}")

            # 获取当前持仓
            hog_position = api.get_position(LIVE_HOG)
            corn_position = api.get_position(CORN)
            meal_position = api.get_position(SOYMEAL)

            current_hog_pos = hog_position.pos_long - hog_position.pos_short
            current_corn_pos = corn_position.pos_long - corn_position.pos_short
            current_meal_pos = meal_position.pos_long - meal_position.pos_short

            # 计算实际下单手数(依据比例)
            corn_volume = int(ORDER_VOLUME * CORN_RATIO / HOG_RATIO)
            meal_volume = int(ORDER_VOLUME * SOYMEAL_RATIO / HOG_RATIO)

            # === 交易信号判断 ===
            if not in_position:  # 如果没有持仓
                if z_score > STD_THRESHOLD:  # 饲养利润显著高于均值
                    # 做空饲养利润:卖出生猪,买入玉米和豆粕
                    print(f"做空饲养利润:卖出生猪{ORDER_VOLUME}手,买入玉米{corn_volume}手和豆粕{meal_volume}手")
                    hog_pos.set_target_volume(-ORDER_VOLUME)
                    corn_pos.set_target_volume(corn_volume)
                    meal_pos.set_target_volume(meal_volume)
                    position_time = time.time()
                    in_position = True

                elif z_score < -STD_THRESHOLD:  # 饲养利润显著低于均值
                    # 做多饲养利润:买入生猪,卖出玉米和豆粕
                    print(f"做多饲养利润:买入生猪{ORDER_VOLUME}手,卖出玉米{corn_volume}手和豆粕{meal_volume}手")
                    hog_pos.set_target_volume(ORDER_VOLUME)
                    corn_pos.set_target_volume(-corn_volume)
                    meal_pos.set_target_volume(-meal_volume)
                    position_time = time.time()
                    in_position = True

            else:  # 如果已有持仓
                # 检查是否应当平仓
                if abs(z_score) < CLOSE_THRESHOLD:  # 饲养利润恢复正常
                    print("饲养利润回归正常水平,平仓所有头寸")
                    hog_pos.set_target_volume(0)
                    corn_pos.set_target_volume(0)
                    meal_pos.set_target_volume(0)
                    in_position = False

                # 止损逻辑
                if (z_score > STD_THRESHOLD * 1.5 and current_hog_pos > 0) or \
                        (z_score < -STD_THRESHOLD * 1.5 and current_hog_pos < 0):
                    print("止损:饲养利润向不利方向进一步偏离")
                    hog_pos.set_target_volume(0)
                    corn_pos.set_target_volume(0)
                    meal_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()

双焦(焦煤-焦炭)套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/jm-j-spread

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from datetime import date
from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
import numpy as np
import time

# === 用户参数 ===
# 合约参数
J = "DCE.j2409"  # 焦炭期货合约
JM = "DCE.jm2409"  # 焦煤期货合约
START_DATE = date(2023, 11, 1)  # 回测开始日期
END_DATE = date(2024, 4, 30)  # 回测结束日期

# 套利参数
LOOKBACK_DAYS = 30  # 计算历史价差的回溯天数
STD_THRESHOLD = 2.0  # 标准差阈值,超过此阈值视为套利机会
ORDER_VOLUME = 50  # 焦炭的下单手数
CLOSE_THRESHOLD = 0.5  # 平仓阈值(标准差)

# 配比参数(可根据实际工艺调整)
J_RATIO = 10  # 10手焦炭
JM_RATIO = 22  # 22手焦煤(约1.32配比)

# === 初始化API ===
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 获取合约行情和K线
j_quote = api.get_quote(J)
jm_quote = api.get_quote(JM)

j_klines = api.get_kline_serial(J, 60 * 60 * 24, LOOKBACK_DAYS)
jm_klines = api.get_kline_serial(JM, 60 * 60 * 24, LOOKBACK_DAYS)

# 创建目标持仓任务
j_pos = TargetPosTask(api, J)
jm_pos = TargetPosTask(api, JM)

# 获取合约乘数
j_volume_multiple = j_quote.volume_multiple
jm_volume_multiple = jm_quote.volume_multiple

# 初始化状态变量
position_time = 0  # 建仓时间
in_position = False  # 是否有持仓
mean_spread = 0  # 历史价差均值
std_spread = 0  # 历史价差标准差

print(f"策略启动,监控合约: {J}, {JM}")

# === 主循环 ===
try:
    # 初始计算历史统计值
    spreads = []
    for i in range(len(j_klines) - 1):
        j_value = j_klines.close.iloc[i] * j_volume_multiple * J_RATIO
        jm_value = jm_klines.close.iloc[i] * jm_volume_multiple * JM_RATIO
        spread = j_value - jm_value
        spreads.append(spread)

    mean_spread = np.mean(spreads)
    std_spread = np.std(spreads)
    print(f"历史炼焦利润均值: {mean_spread:.2f}, 标准差: {std_spread:.2f}")

    # 主循环
    while True:
        api.wait_update()

        # 当K线数据有变化时进行计算
        if api.is_changing(j_klines) or api.is_changing(jm_klines):
            # 重新计算历史价差统计
            spreads = []
            for i in range(len(j_klines) - 1):
                j_value = j_klines.close.iloc[i] * j_volume_multiple * J_RATIO
                jm_value = jm_klines.close.iloc[i] * jm_volume_multiple * JM_RATIO
                spread = j_value - jm_value
                spreads.append(spread)

            mean_spread = np.mean(spreads)
            std_spread = np.std(spreads)

            # 计算当前炼焦利润价差
            j_value = j_klines.close.iloc[-1] * j_volume_multiple * J_RATIO
            jm_value = jm_klines.close.iloc[-1] * jm_volume_multiple * JM_RATIO
            current_spread = j_value - jm_value

            # 计算z-score (标准化的价差)
            z_score = (current_spread - mean_spread) / std_spread

            print(f"当前炼焦利润: {current_spread:.2f}, Z-score: {z_score:.2f}")

            # 获取当前持仓
            j_position = api.get_position(J)
            jm_position = api.get_position(JM)

            current_j_pos = j_position.pos_long - j_position.pos_short
            current_jm_pos = jm_position.pos_long - jm_position.pos_short

            # === 交易信号判断 ===
            if not in_position:
                if z_score > STD_THRESHOLD:
                    # 做空炼焦利润:卖出焦炭,买入焦煤
                    print(f"做空炼焦利润:卖出焦炭{ORDER_VOLUME}手,买入焦煤{int(ORDER_VOLUME * JM_RATIO / J_RATIO)}手")
                    j_pos.set_target_volume(-ORDER_VOLUME)
                    jm_pos.set_target_volume(int(ORDER_VOLUME * JM_RATIO / J_RATIO))
                    position_time = time.time()
                    in_position = True
                elif z_score < -STD_THRESHOLD:
                    # 做多炼焦利润:买入焦炭,卖出焦煤
                    print(f"做多炼焦利润:买入焦炭{ORDER_VOLUME}手,卖出焦煤{int(ORDER_VOLUME * JM_RATIO / J_RATIO)}手")
                    j_pos.set_target_volume(ORDER_VOLUME)
                    jm_pos.set_target_volume(-int(ORDER_VOLUME * JM_RATIO / J_RATIO))
                    position_time = time.time()
                    in_position = True

            else:  # 如果已有持仓
                # 检查是否应当平仓
                if abs(z_score) < CLOSE_THRESHOLD:  # 利润回归正常
                    print("利润回归正常,平仓所有头寸")
                    j_pos.set_target_volume(0)
                    jm_pos.set_target_volume(0)
                    in_position = False
                # 止损逻辑
                if (z_score > STD_THRESHOLD * 1.5 and current_j_pos < 0) or \
                        (z_score < -STD_THRESHOLD * 1.5 and current_j_pos > 0):
                    print("止损:利润向不利方向进一步偏离")
                    j_pos.set_target_volume(0)
                    jm_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()

甲醇制烯烃(MTO)产业链套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/mto-spread

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from datetime import date
from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
import numpy as np
import time

# === 用户参数 ===
# 合约参数
MA = "CZCE.MA409"      # 甲醇期货合约
L = "DCE.l2409"        # 聚乙烯期货合约
PP = "DCE.pp2409"      # 聚丙烯期货合约
START_DATE = date(2023, 11, 1)   # 回测开始日期
END_DATE = date(2024, 4, 30)     # 回测结束日期

# 套利参数
LOOKBACK_DAYS = 30        # 计算历史价差的回溯天数
STD_THRESHOLD = 2.0       # 标准差阈值,超过此阈值视为套利机会
ORDER_VOLUME = 100         # 聚乙烯的下单手数
CLOSE_THRESHOLD = 0.5     # 平仓阈值(标准差)

# 生产比例(可根据实际工艺调整)
MA_RATIO = 3      # 生产1吨烯烃消耗3吨甲醇
L_RATIO = 1       # 产出1吨聚乙烯
PP_RATIO = 1      # 产出1吨聚丙烯

# === 初始化API ===
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 获取合约行情和K线
ma_quote = api.get_quote(MA)
l_quote = api.get_quote(L)
pp_quote = api.get_quote(PP)

ma_klines = api.get_kline_serial(MA, 60*60*24, LOOKBACK_DAYS)
l_klines = api.get_kline_serial(L, 60*60*24, LOOKBACK_DAYS)
pp_klines = api.get_kline_serial(PP, 60*60*24, LOOKBACK_DAYS)

# 创建目标持仓任务
ma_pos = TargetPosTask(api, MA)
l_pos = TargetPosTask(api, L)
pp_pos = TargetPosTask(api, PP)

# 获取合约乘数
ma_volume_multiple = ma_quote.volume_multiple
l_volume_multiple = l_quote.volume_multiple
pp_volume_multiple = pp_quote.volume_multiple

# 初始化状态变量
position_time = 0        # 建仓时间
in_position = False      # 是否有持仓
mean_spread = 0          # 历史价差均值
std_spread = 0           # 历史价差标准差

print(f"策略启动,监控合约: {MA}, {L}, {PP}")

# === 主循环 ===
try:
    # 初始计算历史统计值
    spreads = []
    for i in range(len(ma_klines) - 1):
        ma_price = ma_klines.close.iloc[i] * ma_volume_multiple * MA_RATIO
        l_price = l_klines.close.iloc[i] * l_volume_multiple * L_RATIO
        pp_price = pp_klines.close.iloc[i] * pp_volume_multiple * PP_RATIO
        
        # MTO利润 = (L价值 + PP价值) - MA成本
        spread = (l_price + pp_price) - ma_price
        spreads.append(spread)
    
    mean_spread = np.mean(spreads)
    std_spread = np.std(spreads)
    print(f"历史MTO利润均值: {mean_spread:.2f}, 标准差: {std_spread:.2f}")

    # 主循环
    while True:
        api.wait_update()
        
        # 当K线数据有变化时进行计算
        if api.is_changing(ma_klines) or api.is_changing(l_klines) or api.is_changing(pp_klines):
            # 重新计算历史价差统计
            spreads = []
            for i in range(len(ma_klines) - 1):
                ma_price = ma_klines.close.iloc[i] * ma_volume_multiple * MA_RATIO
                l_price = l_klines.close.iloc[i] * l_volume_multiple * L_RATIO
                pp_price = pp_klines.close.iloc[i] * pp_volume_multiple * PP_RATIO
                
                spread = (l_price + pp_price) - ma_price
                spreads.append(spread)
            
            mean_spread = np.mean(spreads)
            std_spread = np.std(spreads)
            
            # 计算当前利润价差
            ma_price = ma_klines.close.iloc[-1] * ma_volume_multiple * MA_RATIO
            l_price = l_klines.close.iloc[-1] * l_volume_multiple * L_RATIO
            pp_price = pp_klines.close.iloc[-1] * pp_volume_multiple * PP_RATIO
            
            current_spread = (l_price + pp_price) - ma_price
            
            # 计算z-score (标准化的价差)
            z_score = (current_spread - mean_spread) / std_spread
            
            print(f"当前MTO利润: {current_spread:.2f}, Z-score: {z_score:.2f}")
            
            # 获取当前持仓
            ma_position = api.get_position(MA)
            l_position = api.get_position(L)
            pp_position = api.get_position(PP)
            
            current_ma_pos = ma_position.pos_long - ma_position.pos_short
            current_l_pos = l_position.pos_long - l_position.pos_short
            current_pp_pos = pp_position.pos_long - pp_position.pos_short
            
            # 计算实际下单手数(依据比例)
            ma_volume = int(ORDER_VOLUME * MA_RATIO / L_RATIO)
            # L和PP按同等手数下单
            
            # === 交易信号判断 ===
            if not in_position:  # 如果没有持仓
                if z_score > STD_THRESHOLD:  # 利润显著高于均值
                    # 做空利润:卖出L和PP,买入MA
                    print(f"做空利润:卖出L{ORDER_VOLUME}手和PP{ORDER_VOLUME}手,买入MA{ma_volume}手")
                    l_pos.set_target_volume(-ORDER_VOLUME)
                    pp_pos.set_target_volume(-ORDER_VOLUME)
                    ma_pos.set_target_volume(ma_volume)
                    position_time = time.time()
                    in_position = True
                    
                elif z_score < -STD_THRESHOLD:  # 利润显著低于均值
                    # 做多利润:买入L和PP,卖出MA
                    print(f"做多利润:买入L{ORDER_VOLUME}手和PP{ORDER_VOLUME}手,卖出MA{ma_volume}手")
                    l_pos.set_target_volume(ORDER_VOLUME)
                    pp_pos.set_target_volume(ORDER_VOLUME)
                    ma_pos.set_target_volume(-ma_volume)
                    position_time = time.time()
                    in_position = True
            
            else:  # 如果已有持仓
                # 检查是否应当平仓
                if abs(z_score) < CLOSE_THRESHOLD:  # 利润回归正常
                    print("利润回归正常,平仓所有头寸")
                    l_pos.set_target_volume(0)
                    pp_pos.set_target_volume(0)
                    ma_pos.set_target_volume(0)
                    in_position = False
                
                # 止损逻辑
                if (z_score > STD_THRESHOLD * 1.5 and current_l_pos < 0) or \
                   (z_score < -STD_THRESHOLD * 1.5 and current_l_pos > 0):
                    print("止损:利润向不利方向进一步偏离")
                    l_pos.set_target_volume(0)
                    pp_pos.set_target_volume(0)
                    ma_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()

铜铝比价套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/cu-al-spread

from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
import numpy as np
from datetime import date

CU = "SHFE.cu2407"
AL = "SHFE.al2407"
START_DATE = date(2023, 11, 1)
END_DATE = date(2024, 4, 30)
LOOKBACK_DAYS = 30
STD_THRESHOLD = 2.0
ORDER_VOLUME = 30
CLOSE_THRESHOLD = 0.5

api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE), auth=TqAuth("快期账号", "快期密码"))

cu_quote = api.get_quote(CU)
al_quote = api.get_quote(AL)
cu_klines = api.get_kline_serial(CU, 60*60*24, LOOKBACK_DAYS)
al_klines = api.get_kline_serial(AL, 60*60*24, LOOKBACK_DAYS)
cu_pos = TargetPosTask(api, CU)
al_pos = TargetPosTask(api, AL)

try:
    # 计算历史铜铝比
    ratios = []
    for i in range(len(cu_klines) - 1):
        cu_price = cu_klines.close.iloc[i] * cu_quote.volume_multiple
        al_price = al_klines.close.iloc[i] * al_quote.volume_multiple
        ratios.append(cu_price / al_price)
    mean_ratio = np.mean(ratios)
    std_ratio = np.std(ratios)
    print(f"历史铜铝比均值: {mean_ratio:.4f}, 标准差: {std_ratio:.4f}")

    in_position = False
    while True:
        api.wait_update()
        if api.is_changing(cu_klines) or api.is_changing(al_klines):
            # 重新计算
            ratios = []
            for i in range(len(cu_klines) - 1):
                cu_price = cu_klines.close.iloc[i] * cu_quote.volume_multiple
                al_price = al_klines.close.iloc[i] * al_quote.volume_multiple
                ratios.append(cu_price / al_price)
            mean_ratio = np.mean(ratios)
            std_ratio = np.std(ratios)
            cu_price = cu_klines.close.iloc[-1] * cu_quote.volume_multiple
            al_price = al_klines.close.iloc[-1] * al_quote.volume_multiple
            current_ratio = cu_price / al_price
            z_score = (current_ratio - mean_ratio) / std_ratio
            print(f"当前铜铝比: {current_ratio:.4f}, Z-score: {z_score:.2f}")

            cu_position = api.get_position(CU)
            al_position = api.get_position(AL)
            current_cu_pos = cu_position.pos_long - cu_position.pos_short
            current_al_pos = al_position.pos_long - al_position.pos_short

            if not in_position:
                if z_score > STD_THRESHOLD:
                    # 做多铜铝比:买入铜,卖出铝
                    print(f"做多铜铝比:买入铜{ORDER_VOLUME}手,卖出铝{ORDER_VOLUME}手")
                    cu_pos.set_target_volume(ORDER_VOLUME)
                    al_pos.set_target_volume(-ORDER_VOLUME)
                    in_position = True
                elif z_score < -STD_THRESHOLD:
                    # 做空铜铝比:卖出铜,买入铝
                    print(f"做空铜铝比:卖出铜{ORDER_VOLUME}手,买入铝{ORDER_VOLUME}手")
                    cu_pos.set_target_volume(-ORDER_VOLUME)
                    al_pos.set_target_volume(ORDER_VOLUME)
                    in_position = True
            else:
                if abs(z_score) < CLOSE_THRESHOLD:
                    print("比率回归正常,平仓所有头寸")
                    cu_pos.set_target_volume(0)
                    al_pos.set_target_volume(0)
                    in_position = False
                # 止损逻辑
                if (z_score > STD_THRESHOLD * 1.5 and current_cu_pos < 0) or \
                   (z_score < -STD_THRESHOLD * 1.5 and current_cu_pos > 0):
                    print("止损:比率向不利方向进一步偏离")
                    cu_pos.set_target_volume(0)
                    al_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()

压榨价差套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/crush-spread

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from datetime import date

from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
import numpy as np
import time

# === 用户参数 ===
# 合约参数
SOYBEAN = "DCE.a2409"    # 大豆期货合约
SOYMEAL = "DCE.m2409"    # 豆粕期货合约
SOYOIL = "DCE.y2409"     # 豆油期货合约
START_DATE = date(2023, 11, 1)   # 回测开始日期
END_DATE = date(2024, 4, 30)     # 回测结束日期

# 套利参数
LOOKBACK_DAYS = 30       # 计算历史价差的回溯天数
STD_THRESHOLD = 2.0      # 标准差阈值,超过此阈值视为套利机会
ORDER_VOLUME = 500        # 大豆的下单手数
CLOSE_THRESHOLD = 0.5    # 平仓阈值(标准差)

# 压榨价差比例 - 1吨大豆压榨可得约0.785吨豆粕和0.18吨豆油
# 为了简化,使用10:8:2的整数比例
BEAN_RATIO = 10
MEAL_RATIO = 8
OIL_RATIO = 2

# === 初始化API ===
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 获取合约行情和K线
bean_quote = api.get_quote(SOYBEAN)
meal_quote = api.get_quote(SOYMEAL)
oil_quote = api.get_quote(SOYOIL)

bean_klines = api.get_kline_serial(SOYBEAN, 60*60*24, LOOKBACK_DAYS)
meal_klines = api.get_kline_serial(SOYMEAL, 60*60*24, LOOKBACK_DAYS)
oil_klines = api.get_kline_serial(SOYOIL, 60*60*24, LOOKBACK_DAYS)

# 创建目标持仓任务
bean_pos = TargetPosTask(api, SOYBEAN)
meal_pos = TargetPosTask(api, SOYMEAL)
oil_pos = TargetPosTask(api, SOYOIL)

# 获取合约乘数
bean_volume_multiple = bean_quote.volume_multiple
meal_volume_multiple = meal_quote.volume_multiple
oil_volume_multiple = oil_quote.volume_multiple

# 初始化状态变量
position_time = 0        # 建仓时间
in_position = False      # 是否有持仓
mean_spread = 0          # 历史价差均值
std_spread = 0           # 历史价差标准差

print(f"策略启动,监控合约: {SOYBEAN}, {SOYMEAL}, {SOYOIL}")

# === 主循环 ===
try:
    # 初始计算历史统计值
    spreads = []
    for i in range(len(bean_klines) - 1):
        bean_price = bean_klines.close.iloc[i] * bean_volume_multiple * BEAN_RATIO
        meal_price = meal_klines.close.iloc[i] * meal_volume_multiple * MEAL_RATIO
        oil_price = oil_klines.close.iloc[i] * oil_volume_multiple * OIL_RATIO
        
        # 压榨价差 = (豆粕价值 + 豆油价值) - 大豆价值
        spread = (meal_price + oil_price) - bean_price
        spreads.append(spread)
    
    mean_spread = np.mean(spreads)
    std_spread = np.std(spreads)
    print(f"历史压榨价差均值: {mean_spread:.2f}, 标准差: {std_spread:.2f}")

    # 主循环
    while True:
        api.wait_update()
        
        # 当K线数据有变化时进行计算
        if api.is_changing(bean_klines) or api.is_changing(meal_klines) or api.is_changing(oil_klines):
            # 重新计算历史价差统计
            spreads = []
            for i in range(len(bean_klines) - 1):
                bean_price = bean_klines.close.iloc[i] * bean_volume_multiple * BEAN_RATIO
                meal_price = meal_klines.close.iloc[i] * meal_volume_multiple * MEAL_RATIO
                oil_price = oil_klines.close.iloc[i] * oil_volume_multiple * OIL_RATIO
                
                spread = (meal_price + oil_price) - bean_price
                spreads.append(spread)
            
            mean_spread = np.mean(spreads)
            std_spread = np.std(spreads)
            
            # 计算当前压榨价差
            bean_price = bean_klines.close.iloc[-1] * bean_volume_multiple * BEAN_RATIO
            meal_price = meal_klines.close.iloc[-1] * meal_volume_multiple * MEAL_RATIO
            oil_price = oil_klines.close.iloc[-1] * oil_volume_multiple * OIL_RATIO
            
            current_spread = (meal_price + oil_price) - bean_price
            
            # 计算z-score (标准化的价差)
            z_score = (current_spread - mean_spread) / std_spread
            
            print(f"当前压榨价差: {current_spread:.2f}, Z-score: {z_score:.2f}")
            
            # 获取当前持仓
            bean_position = api.get_position(SOYBEAN)
            meal_position = api.get_position(SOYMEAL)
            oil_position = api.get_position(SOYOIL)
            
            current_bean_pos = bean_position.pos_long - bean_position.pos_short
            current_meal_pos = meal_position.pos_long - meal_position.pos_short
            current_oil_pos = oil_position.pos_long - oil_position.pos_short
            
            # 计算实际下单手数(依据比例)
            meal_volume = int(ORDER_VOLUME * MEAL_RATIO / BEAN_RATIO)
            oil_volume = int(ORDER_VOLUME * OIL_RATIO / BEAN_RATIO)
            
            # === 交易信号判断 ===
            if not in_position:  # 如果没有持仓
                if z_score > STD_THRESHOLD:  # 价差显著高于均值,压榨利润偏高
                    # 卖出压榨价差:买入大豆,卖出豆粕和豆油
                    print(f"卖出压榨价差:买入大豆{ORDER_VOLUME}手,卖出豆粕{meal_volume}手和豆油{oil_volume}手")
                    bean_pos.set_target_volume(ORDER_VOLUME)
                    meal_pos.set_target_volume(-meal_volume)
                    oil_pos.set_target_volume(-oil_volume)
                    position_time = time.time()
                    in_position = True
                    
                elif z_score < -STD_THRESHOLD:  # 价差显著低于均值,压榨利润偏低
                    # 买入压榨价差:卖出大豆,买入豆粕和豆油
                    print(f"买入压榨价差:卖出大豆{ORDER_VOLUME}手,买入豆粕{meal_volume}手和豆油{oil_volume}手")
                    bean_pos.set_target_volume(-ORDER_VOLUME)
                    meal_pos.set_target_volume(meal_volume)
                    oil_pos.set_target_volume(oil_volume)
                    position_time = time.time()
                    in_position = True
            
            else:  # 如果已有持仓
                # 检查是否应当平仓
                if abs(z_score) < CLOSE_THRESHOLD:  # 价差恢复正常
                    print("价差恢复正常,平仓所有头寸")
                    bean_pos.set_target_volume(0)
                    meal_pos.set_target_volume(0)
                    oil_pos.set_target_volume(0)
                    in_position = False
                
                # 也可以添加止损逻辑
                if (z_score > STD_THRESHOLD * 1.5 and current_bean_pos > 0) or \
                   (z_score < -STD_THRESHOLD * 1.5 and current_bean_pos < 0):
                    print("止损:价差向不利方向进一步偏离")
                    bean_pos.set_target_volume(0)
                    meal_pos.set_target_volume(0)
                    oil_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()

裂解价差套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/crack-spread

#!/usr/bin/env python
# coding=utf-8
"""
裂解价差均值回归策略
基于原油及其主要炼化产品之间的价格关系在短期内可能偏离其长期均衡水平,并最终回归的假设
"""

from tqsdk import TqApi, TqAuth, TqKq, TargetPosTask, TqBacktest, BacktestFinished
from datetime import date
import numpy as np
import time

# === 用户参数 ===
# 合约参数
CRUDE_OIL = "SHFE.sc2406"  # 原油期货合约
GASOLINE = "SHFE.fu2406"   # 燃料油期货合约
DIESEL = "INE.nr2406"      # 柴油期货合约

# 回测参数
START_DATE = date(2023, 11, 1)  # 回测开始日期
END_DATE = date(2024, 4, 1)     # 回测结束日期

# 裂解比例 - 3:2:1 裂解价差
OIL_RATIO = 3        # 原油比例
GAS_RATIO = 2        # 汽油比例
DIESEL_RATIO = 1     # 柴油比例

# 套利参数
LOOKBACK_DAYS = 60         # 计算历史价差的回溯天数
DEVIATION_THRESHOLD = 1.5  # 偏离阈值(标准差倍数)
OIL_LOTS = 5               # 原油交易手数
CLOSE_AT_MEAN = True       # 是否在价差回归到均值时平仓
MAX_HOLDING_DAYS = 10      # 最大持仓天数

# === 初始化API ===
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("你的天勤账号", "你的天勤密码"))

# 获取合约行情和K线
crude_quote = api.get_quote(CRUDE_OIL)
gasoline_quote = api.get_quote(GASOLINE)
diesel_quote = api.get_quote(DIESEL)

crude_klines = api.get_kline_serial(CRUDE_OIL, 60*60*24, LOOKBACK_DAYS)
gasoline_klines = api.get_kline_serial(GASOLINE, 60*60*24, LOOKBACK_DAYS)
diesel_klines = api.get_kline_serial(DIESEL, 60*60*24, LOOKBACK_DAYS)

# 创建目标持仓任务
crude_pos = TargetPosTask(api, CRUDE_OIL)
gasoline_pos = TargetPosTask(api, GASOLINE)
diesel_pos = TargetPosTask(api, DIESEL)

# 获取合约乘数
crude_volume_multiple = crude_quote.volume_multiple
gasoline_volume_multiple = gasoline_quote.volume_multiple
diesel_volume_multiple = diesel_quote.volume_multiple

# 计算汽油和柴油的交易手数(基于原油手数和裂解比例)
GAS_LOTS = round(GAS_RATIO / OIL_RATIO * OIL_LOTS)
DIESEL_LOTS = round(DIESEL_RATIO / OIL_RATIO * OIL_LOTS)

# 初始化状态变量
position_time = 0  # 建仓时间
in_position = False  # 是否有持仓
last_trade_direction = ""  # 上次交易方向 "BUY_SPREAD" 或 "SELL_SPREAD"
mean_spread = 0  # 历史价差均值
std_spread = 0  # 历史价差标准差

print(f"裂解价差套利策略启动")
print(f"监控合约: 原油({CRUDE_OIL}) - {OIL_LOTS}手, 汽油({GASOLINE}) - {GAS_LOTS}手, 柴油({DIESEL}) - {DIESEL_LOTS}手")
print(f"裂解比例: {OIL_RATIO}:{GAS_RATIO}:{DIESEL_RATIO}")

# === 主循环 ===
try:
    # 初始计算历史统计值
    spreads = []
    for i in range(len(crude_klines) - 1):
        crude_price = crude_klines.close.iloc[i] * crude_volume_multiple * OIL_RATIO
        gasoline_price = gasoline_klines.close.iloc[i] * gasoline_volume_multiple * GAS_RATIO
        diesel_price = diesel_klines.close.iloc[i] * diesel_volume_multiple * DIESEL_RATIO
        
        # 裂解价差 = (汽油价值 + 柴油价值) - 原油价值
        spread = (gasoline_price + diesel_price) - crude_price
        spreads.append(spread)
    
    mean_spread = np.mean(spreads)
    std_spread = np.std(spreads)
    print(f"历史裂解价差 - 均值: {mean_spread:.2f}, 标准差: {std_spread:.2f}")

    # 主循环
    while True:
        api.wait_update()
        
        # 当K线数据有变化时进行计算
        if api.is_changing(crude_klines) or api.is_changing(gasoline_klines) or api.is_changing(diesel_klines):
            # 重新计算历史价差统计
            spreads = []
            for i in range(len(crude_klines) - 1):
                crude_price = crude_klines.close.iloc[i] * crude_volume_multiple * OIL_RATIO
                gasoline_price = gasoline_klines.close.iloc[i] * gasoline_volume_multiple * GAS_RATIO
                diesel_price = diesel_klines.close.iloc[i] * diesel_volume_multiple * DIESEL_RATIO
                
                spread = (gasoline_price + diesel_price) - crude_price
                spreads.append(spread)
            
            mean_spread = np.mean(spreads)
            std_spread = np.std(spreads)
            
            # 计算当前裂解价差
            crude_price = crude_klines.close.iloc[-1] * crude_volume_multiple * OIL_RATIO
            gasoline_price = gasoline_klines.close.iloc[-1] * gasoline_volume_multiple * GAS_RATIO
            diesel_price = diesel_klines.close.iloc[-1] * diesel_volume_multiple * DIESEL_RATIO
            
            current_spread = (gasoline_price + diesel_price) - crude_price
            
            # 计算z-score (标准化的价差)
            z_score = (current_spread - mean_spread) / std_spread
            
            print(f"当前裂解价差: {current_spread:.2f}, Z-score: {z_score:.2f}, 均值: {mean_spread:.2f}")
            
            # 获取当前持仓
            crude_position = api.get_position(CRUDE_OIL)
            gasoline_position = api.get_position(GASOLINE)
            diesel_position = api.get_position(DIESEL)
            
            current_crude_pos = crude_position.pos_long - crude_position.pos_short
            current_gasoline_pos = gasoline_position.pos_long - gasoline_position.pos_short
            current_diesel_pos = diesel_position.pos_long - diesel_position.pos_short
            
            # === 交易信号判断 ===
            if not in_position:  # 如果没有持仓
                if z_score > DEVIATION_THRESHOLD:  # 价差显著高于均值
                    # 卖出裂解价差:卖出原油,买入汽油和柴油
                    print(f"信号: 卖出裂解价差 (Z-score: {z_score:.2f})")
                    print(f"操作: 卖出原油{OIL_LOTS}手,买入汽油{GAS_LOTS}手,买入柴油{DIESEL_LOTS}手")
                    crude_pos.set_target_volume(-OIL_LOTS)
                    gasoline_pos.set_target_volume(GAS_LOTS)
                    diesel_pos.set_target_volume(DIESEL_LOTS)
                    position_time = time.time()
                    in_position = True
                    last_trade_direction = "SELL_SPREAD"
                    
                elif z_score < -DEVIATION_THRESHOLD:  # 价差显著低于均值
                    # 买入裂解价差:买入原油,卖出汽油和柴油
                    print(f"信号: 买入裂解价差 (Z-score: {z_score:.2f})")
                    print(f"操作: 买入原油{OIL_LOTS}手,卖出汽油{GAS_LOTS}手,卖出柴油{DIESEL_LOTS}手")
                    crude_pos.set_target_volume(OIL_LOTS)
                    gasoline_pos.set_target_volume(-GAS_LOTS)
                    diesel_pos.set_target_volume(-DIESEL_LOTS)
                    position_time = time.time()
                    in_position = True
                    last_trade_direction = "BUY_SPREAD"
            
            elif in_position:  # 如果已有持仓
                # 检查是否应当平仓
                if CLOSE_AT_MEAN:  # 在价差回归均值时平仓
                    if (last_trade_direction == "BUY_SPREAD" and current_spread >= mean_spread) or \
                       (last_trade_direction == "SELL_SPREAD" and current_spread <= mean_spread):
                        print(f"信号: 价差回归均值,平仓所有头寸")
                        print(f"当前价差: {current_spread:.2f}, 均值: {mean_spread:.2f}")
                        crude_pos.set_target_volume(0)
                        gasoline_pos.set_target_volume(0)
                        diesel_pos.set_target_volume(0)
                        in_position = False
                else:  # 在价差回归(穿过阈值)时平仓
                    if (last_trade_direction == "BUY_SPREAD" and z_score >= 0) or \
                       (last_trade_direction == "SELL_SPREAD" and z_score <= 0):
                        print(f"信号: 价差穿过均值,平仓所有头寸")
                        print(f"当前价差: {current_spread:.2f}, Z-score: {z_score:.2f}")
                        crude_pos.set_target_volume(0)
                        gasoline_pos.set_target_volume(0)
                        diesel_pos.set_target_volume(0)
                        in_position = False
                
                # 持仓时间监控
                position_duration = (time.time() - position_time) / (60*60*24)  # 天数
                if position_duration > MAX_HOLDING_DAYS:  # 持仓超过最大天数
                    print(f"警告: 持仓时间已超过{MAX_HOLDING_DAYS}天 ({position_duration:.1f}天)")
                    print(f"强制平仓所有头寸")
                    crude_pos.set_target_volume(0)
                    gasoline_pos.set_target_volume(0)
                    diesel_pos.set_target_volume(0)
                    in_position = False

except BacktestFinished as e:
    print("回测结束")
    api.close()
except KeyboardInterrupt:
    print("用户中断程序执行")
    api.close()
    print("策略已停止运行") 

股指期货跨期套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/stock-future-cross-period-spread

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date

import numpy as np
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished

# 参数设置
NEAR_CONTRACT = "CFFEX.IH2101"  # 近月合约
FAR_CONTRACT = "CFFEX.IH2102"  # 远月合约
K = 2  # 标准差倍数
WINDOW = 80  # 计算窗口
LOTS = 20  # 交易手数
START_DATE = date(2020, 12, 21)  # 回测开始日期
END_DATE = date(2021, 1, 15)  # 回测结束日期

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅行情
near_quote = api.get_quote(NEAR_CONTRACT)
far_quote = api.get_quote(FAR_CONTRACT)
near_klines = api.get_kline_serial(NEAR_CONTRACT, 15 * 60, WINDOW * 2)  # 15分钟K线
far_klines = api.get_kline_serial(FAR_CONTRACT, 15 * 60, WINDOW * 2)

# 创建目标持仓任务
near_pos = TargetPosTask(api, NEAR_CONTRACT)
far_pos = TargetPosTask(api, FAR_CONTRACT)

# 持仓状态: 0-无持仓, 1-多价差(买近卖远), -1-空价差(卖近买远)
position_state = 0

print(f"策略启动: {NEAR_CONTRACT}-{FAR_CONTRACT} 跨期套利")
print(f"参数设置: K={K}倍标准差, 窗口={WINDOW}, 交易手数={LOTS}手")

try:
    while True:
        api.wait_update()

        # 检查K线是否更新
        if api.is_changing(near_klines) or api.is_changing(far_klines):
            # 确保有足够的数据
            if len(near_klines) < WINDOW or len(far_klines) < WINDOW:
                continue

            # 计算价差指标
            near_close = near_klines.close.iloc[-WINDOW:]
            far_close = far_klines.close.iloc[-WINDOW:]
            spread = near_close - far_close

            # 计算均值和标准差
            mean = np.mean(spread)
            std = np.std(spread)
            current_spread = near_quote.last_price - far_quote.last_price

            # 计算上下边界
            upper_bound = mean + K * std
            lower_bound = mean - K * std

            print(f"价差: {current_spread:.2f}, 均值: {mean:.2f}, "
                  f"上界: {upper_bound:.2f}, 下界: {lower_bound:.2f}")

            # 交易逻辑
            if position_state == 0:  # 无持仓状态
                if current_spread > upper_bound:  # 做空价差(卖近买远)
                    near_pos.set_target_volume(-LOTS)
                    far_pos.set_target_volume(LOTS)
                    position_state = -1
                    print(f"开仓: 卖出{LOTS}{NEAR_CONTRACT}, 买入{LOTS}{FAR_CONTRACT}")

                elif current_spread < lower_bound:  # 做多价差(买近卖远)
                    near_pos.set_target_volume(LOTS)
                    far_pos.set_target_volume(-LOTS)
                    position_state = 1
                    print(f"开仓: 买入{LOTS}{NEAR_CONTRACT}, 卖出{LOTS}{FAR_CONTRACT}")

            elif position_state == 1:  # 持有多价差
                if current_spread >= mean:  # 平仓获利
                    near_pos.set_target_volume(0)
                    far_pos.set_target_volume(0)
                    position_state = 0
                    print("平仓: 价差回到均值,平仓获利")

            elif position_state == -1:  # 持有空价差
                if current_spread <= mean:  # 平仓获利
                    near_pos.set_target_volume(0)
                    far_pos.set_target_volume(0)
                    position_state = 0
                    print("平仓: 价差回到均值,平仓获利")

except BacktestFinished as e:
    api.close()
    print("回测结束")

钢厂利润套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/rb-i-jt-spread

from tqsdk import TqApi, TargetPosTask
from tqsdk.tafunc import ma
import numpy as np

api = TqApi()

SYMBOL_rb = "SHFE.rb2001"
SYMBOL_i = "DCE.i2001"
SYMBOL_j = "DCE.j2001"

klines_rb = api.get_kline_serial(SYMBOL_rb, 86400)
klines_i = api.get_kline_serial(SYMBOL_i, 86400)
klines_j = api.get_kline_serial(SYMBOL_j, 86400)

target_pos_rb = TargetPosTask(api, SYMBOL_rb)
target_pos_i = TargetPosTask(api, SYMBOL_i)
target_pos_j = TargetPosTask(api, SYMBOL_j)


# 计算钢厂利润线,并将利润线画到副图
def cal_spread(klines_rb, klines_i, klines_j):
    index_spread = klines_rb.close - 1.6 * klines_i.close - 0.5 * klines_j.close
    # 使用15日均值,与注释保持一致
    ma_spread = ma(index_spread, 15)
    # 计算标准差
    spread_std = np.std(index_spread)
    klines_rb["index_spread"] = index_spread
    klines_rb["index_spread.board"] = "index_spread"

    return index_spread, ma_spread, spread_std


# 初始计算利润线
index_spread, ma_spread, spread_std = cal_spread(klines_rb, klines_i, klines_j)

print("ma_spread是%.2f,index_spread是%.2f,spread_std是%.2f" % (ma_spread.iloc[-1], index_spread.iloc[-1], spread_std))

# 记录当前持仓状态,避免重复发出信号
current_position = 0  # 0表示空仓,1表示多螺纹空焦炭焦煤,-1表示空螺纹多焦炭焦煤

while True:
    api.wait_update()

    # 每次有新日线生成时重新计算利润线
    if api.is_changing(klines_j.iloc[-1], "datetime"):
        index_spread, ma_spread, spread_std = cal_spread(klines_rb, klines_i, klines_j)

        # 计算上下轨
        upper_band = ma_spread.iloc[-1] + 0.5 * spread_std
        lower_band = ma_spread.iloc[-1] - 0.5 * spread_std

        print("ma_spread是%.2f,index_spread是%.2f,spread_std是%.2f" % (
            ma_spread.iloc[-1], index_spread.iloc[-1], spread_std))
        print("上轨是%.2f,下轨是%.2f" % (upper_band, lower_band))

        # 确保有足够的历史数据
        if len(index_spread) >= 2:
            # 1. 检测下穿上轨:前一个K线在上轨之上,当前K线在上轨之下或等于上轨
            if index_spread.iloc[-2] > upper_band and index_spread.iloc[-1] <= upper_band:
                if current_position != -1:  # 避免重复开仓
                    # 价差序列下穿上轨,利润冲高回落进行回复,策略空螺纹钢、多焦煤焦炭
                    target_pos_rb.set_target_volume(-100)
                    target_pos_i.set_target_volume(100)
                    target_pos_j.set_target_volume(100)
                    current_position = -1
                    print("下穿上轨:空螺纹钢、多焦煤焦炭")

            # 2. 检测上穿下轨:前一个K线在下轨之下,当前K线在下轨之上或等于下轨
            elif index_spread.iloc[-2] < lower_band and index_spread.iloc[-1] >= lower_band:
                if current_position != 1:  # 避免重复开仓
                    # 价差序列上穿下轨,利润过低回复上升,策略多螺纹钢、空焦煤焦炭
                    target_pos_rb.set_target_volume(100)
                    target_pos_i.set_target_volume(-100)
                    target_pos_j.set_target_volume(-100)
                    current_position = 1
                    print("上穿下轨:多螺纹钢、空焦煤焦炭")

    # 实时监控价差变化情况
    if api.is_changing(klines_rb.iloc[-1], "close") or api.is_changing(klines_i.iloc[-1], "close") or api.is_changing(
            klines_j.iloc[-1], "close"):
        # 实时更新价差
        current_spread = klines_rb.close.iloc[-1] - 1.6 * klines_i.close.iloc[-1] - 0.5 * klines_j.close.iloc[-1]
        # 可以添加实时监控输出
        # print("当前价差: %.2f" % current_spread)

菜油、豆油、棕榈油多品种套利策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/arbitrage/oi-y-p-spreads

#!/usr/bin/env python
#  -*- coding: utf-8 -*-
__author__ = "Ringo"

"""
豆油、棕榈油、菜油套利策略
注: 该示例策略仅用于功能示范, 实盘时请根据自己的策略/经验进行修改
"""
from tqsdk import TqApi, TargetPosTask
from tqsdk.tafunc import ma

# 设定豆油,菜油,棕榈油指定合约
SYMBOL_Y = "DCE.y2001"
SYMBOL_OI = "CZCE.OI001"
SYMBOL_P = "DCE.p2001"

api = TqApi()

klines_y = api.get_kline_serial(SYMBOL_Y, 24 * 60 * 60)
klines_oi = api.get_kline_serial(SYMBOL_OI, 24 * 60 * 60)
klines_p = api.get_kline_serial(SYMBOL_P, 24 * 60 * 60)

target_pos_oi = TargetPosTask(api, SYMBOL_OI)
target_pos_y = TargetPosTask(api, SYMBOL_Y)
target_pos_p = TargetPosTask(api, SYMBOL_P)


# 设置指标计算函数,计算三种合约品种的相对位置,并将指标画在副图
def cal_spread(klines_y, klines_p, klines_oi):
    index_spread = ((klines_y.close - klines_p.close) - (klines_oi.close - klines_y.close)) / (
            klines_oi.close - klines_p.close)
    klines_y["index_spread"] = index_spread
    ma_short = ma(index_spread, 5)
    ma_long = ma(index_spread, 15)
    return index_spread, ma_short, ma_long


index_spread, ma_short, ma_long = cal_spread(klines_y, klines_p, klines_oi)

klines_y["index_spread.board"] = "index_spread"

print("ma_short是%.2f,ma_long是%.2f,index_spread是%.2f" % (ma_short.iloc[-2], ma_long.iloc[-2], index_spread.iloc[-2]))

while True:
    api.wait_update()
    if api.is_changing(klines_y.iloc[-1], "datetime"):
        index_spread, ma_short, ma_long = cal_spread(klines_y, klines_p, klines_oi)
        print("日线更新,ma_short是%.2f,ma_long是%.2f,index_spread是%.2f" % (
            ma_short.iloc[-2], ma_long.iloc[-2], index_spread.iloc[-2]))

    # 指数上涨,短期上穿长期,则认为相对于y,oi被低估,做多oi,做空y
    if (ma_short.iloc[-2] > ma_long.iloc[-2]) and (ma_short.iloc[-3] < ma_long.iloc[-3]) and (
            index_spread.iloc[-2] > 1.02 * ma_short.iloc[-2]):
        target_pos_y.set_target_volume(-100)
        target_pos_oi.set_target_volume(100)
    # 指数下跌,短期下穿长期,则认为相对于y,p被高估,做多y,做空p
    elif (ma_short.iloc[-2] < ma_long.iloc[-2]) and (ma_short.iloc[-3] > ma_long.iloc[-3]) and (
            index_spread.iloc[-2] < 0.98 * ma_short.iloc[-2]):
        target_pos_y.set_target_volume(100)
        target_pos_p.set_target_volume(-100)
    # 现在策略表现平稳,则平仓,赚取之前策略收益
    elif ma_short.iloc[-2] * 0.98 < index_spread.iloc[-2] < ma_long.iloc[-2] * 1.02:
        target_pos_oi.set_target_volume(0)
        target_pos_p.set_target_volume(0)
        target_pos_y.set_target_volume(0)

均值回归策略

卡尔曼滤波配对交易策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/mean-reversion/kalman-filter-strategy

import numpy as np
import pandas as pd
from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
from datetime import date

# === 全局参数 ===
SYMBOL_Y = "SHFE.ss2209"
SYMBOL_X = "SHFE.ni2209"
OBS_VAR = 0.01
STATE_VAR = 0.0001
INIT_MEAN = 1.0
INIT_VAR = 1.0
WIN = 60
OPEN_H = 2.0
OPEN_L = -2.0
CLOSE_H = 0.5
CLOSE_L = -0.5
STOP_SPREAD = 3.0
MAX_HOLD = 10
POS_PCT = 0.05
INIT_CAP = 10000000

# === 全局变量 ===
state_mean = INIT_MEAN
state_var = INIT_VAR
prices_y, prices_x, hedge_ratios, spreads, zscores = [], [], [], [], []
position = 0
entry_z = 0
entry_time = None
trade_count = 0
win_count = 0
total_profit = 0
total_loss = 0
hold_days = 0
last_day = None

# === API初始化 ===
api = TqApi(backtest=TqBacktest(start_dt=date(2022, 7, 4), end_dt=date(2022, 8, 31)),
                auth=TqAuth("快期账号", "快期密码"))
quote_y = api.get_quote(SYMBOL_Y)
quote_x = api.get_quote(SYMBOL_X)
klines_y = api.get_kline_serial(SYMBOL_Y, 60*60)
klines_x = api.get_kline_serial(SYMBOL_X, 60*60)
target_y = TargetPosTask(api, SYMBOL_Y)
target_x = TargetPosTask(api, SYMBOL_X)

try:
    while True:
        api.wait_update()
        if api.is_changing(klines_y.iloc[-1], "datetime") or api.is_changing(klines_x.iloc[-1], "datetime"):
            price_y = klines_y.iloc[-1]["close"]
            price_x = klines_x.iloc[-1]["close"]
            now = pd.to_datetime(klines_y.iloc[-1]["datetime"], unit="ns")
            today = now.date()
            if last_day and today != last_day and position != 0:
                hold_days += 1
            last_day = today
            prices_y.append(price_y)
            prices_x.append(price_x)
            if len(prices_y) > 10:
                # 卡尔曼滤波
                pred_mean = state_mean
                pred_var = state_var + STATE_VAR
                k_gain = pred_var / (pred_var * price_x**2 + OBS_VAR)
                state_mean = pred_mean + k_gain * (price_y - pred_mean * price_x)
                state_var = (1 - k_gain * price_x) * pred_var
                hedge_ratios.append(state_mean)
                spread = price_y - state_mean * price_x
                spreads.append(spread)
                if len(spreads) >= WIN:
                    recent = spreads[-WIN:]
                    mean = np.mean(recent)
                    std = np.std(recent)
                    z = (spread - mean) / std if std > 0 else 0
                    zscores.append(z)
                    print(f"时间:{now}, Y:{price_y}, X:{price_x}, 对冲比:{state_mean:.4f}, Z:{z:.4f}")
                    # 开仓
                    if position == 0:
                        if z < OPEN_L:
                            lots = int(INIT_CAP * POS_PCT / quote_y.margin)
                            lots_x = int(lots * state_mean * price_y * quote_y.volume_multiple / (price_x * quote_x.volume_multiple))
                            if lots > 0 and lots_x > 0:
                                target_y.set_target_volume(lots)
                                target_x.set_target_volume(-lots_x)
                                position = 1
                                entry_z = z
                                entry_time = now
                                print(f"开多Y空X, Y:{lots}, X:{lots_x}, 入场Z:{z:.4f}")
                        elif z > OPEN_H:
                            lots = int(INIT_CAP * POS_PCT / quote_y.margin)
                            lots_x = int(lots * state_mean * price_y * quote_y.volume_multiple / (price_x * quote_x.volume_multiple))
                            if lots > 0 and lots_x > 0:
                                target_y.set_target_volume(-lots)
                                target_x.set_target_volume(lots_x)
                                position = -1
                                entry_z = z
                                entry_time = now
                                print(f"开空Y多X, Y:{lots}, X:{lots_x}, 入场Z:{z:.4f}")
                    # 平仓
                    else:
                        profit_cond = CLOSE_L < z < CLOSE_H
                        stop_cond = (position == 1 and z < entry_z - STOP_SPREAD) or (position == -1 and z > entry_z + STOP_SPREAD)
                        max_hold = hold_days >= MAX_HOLD
                        if profit_cond or stop_cond or max_hold:
                            target_y.set_target_volume(0)
                            target_x.set_target_volume(0)
                            trade_count += 1
                            pnl = (z - entry_z) * position
                            if pnl > 0:
                                win_count += 1
                                total_profit += pnl
                            else:
                                total_loss -= pnl
                            reason = "回归" if profit_cond else "止损" if stop_cond else "超期"
                            print(f"平仓:{reason}, 出场Z:{z:.4f}, 收益:{pnl:.4f}, 持有天:{hold_days}")
                            position = 0
                            entry_z = 0
                            entry_time = None
                            hold_days = 0

except BacktestFinished as e:
    print("回测结束")
    api.close()

基于距离的配对交易策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/mean-reversion/distance-based-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'Chaos'

from tqsdk import TqApi, TqAuth, TargetPosTask, TqBacktest, BacktestFinished
from datetime import date, datetime
import numpy as np

# === 全局参数 ===
SYMBOL1 = "SHFE.rb2305"
SYMBOL2 = "SHFE.hc2305"
WINDOW = 30
K_THRESHOLD = 2.0
CLOSE_THRESHOLD = 0.5
MAX_HOLD_DAYS = 10
STOP_LOSS_PCT = 0.05
POSITION_LOTS1 = 200      # 合约1固定手数
POSITION_RATIO = 1.0     # 合约2与合约1的数量比例

# === 全局变量 ===
price_data1, price_data2 = [], []
position_long = False
position_short = False
entry_price1 = 0
entry_price2 = 0
position_time = None
trade_ratio = 1
entry_spread = 0
trade_count = 0
win_count = 0
total_profit = 0

# === API初始化 ===
api = TqApi(backtest=TqBacktest(start_dt=date(2023, 2, 1),end_dt=date(2023, 4, 27)),
            auth=TqAuth("快期账号", "快期密码")
)
quote1 = api.get_quote(SYMBOL1)
quote2 = api.get_quote(SYMBOL2)
klines1 = api.get_kline_serial(SYMBOL1, 24*60*60)
klines2 = api.get_kline_serial(SYMBOL2, 24*60*60)
target_pos1 = TargetPosTask(api, SYMBOL1)
target_pos2 = TargetPosTask(api, SYMBOL2)

print(f"策略开始运行,交易品种: {SYMBOL1}{SYMBOL2}")

try:
    while True:
        api.wait_update()
        if api.is_changing(klines1.iloc[-1], "datetime") or api.is_changing(klines2.iloc[-1], "datetime"):
            price_data1.append(klines1.iloc[-1]["close"])
            price_data2.append(klines2.iloc[-1]["close"])
            if len(price_data1) <= WINDOW:
                continue
            if len(price_data1) > WINDOW:
                price_data1 = price_data1[-WINDOW:]
                price_data2 = price_data2[-WINDOW:]
            data1 = np.array(price_data1)
            data2 = np.array(price_data2)
            norm1 = (data1 - np.mean(data1)) / np.std(data1)
            norm2 = (data2 - np.mean(data2)) / np.std(data2)
            spread = norm1 - norm2
            mean_spread = np.mean(spread)
            std_spread = np.std(spread)
            current_spread = spread[-1]
            price_ratio = quote2.last_price / quote1.last_price
            trade_ratio = round(price_ratio * POSITION_RATIO, 2)
            position_lots2 = int(POSITION_LOTS1 * trade_ratio)
            current_time = datetime.fromtimestamp(klines1.iloc[-1]["datetime"] / 1e9)

            # === 平仓逻辑 ===
            if position_long or position_short:
                days_held = (current_time - position_time).days
                if position_long:
                    current_profit = (quote1.last_price - entry_price1) * POSITION_LOTS1 - (quote2.last_price - entry_price2) * position_lots2
                else:
                    current_profit = (entry_price1 - quote1.last_price) * POSITION_LOTS1 - (entry_price2 - quote2.last_price) * position_lots2
                profit_pct = current_profit / (entry_price1 * POSITION_LOTS1)
                close_by_mean = abs(current_spread - mean_spread) < CLOSE_THRESHOLD * std_spread
                close_by_time = days_held >= MAX_HOLD_DAYS
                close_by_stop = profit_pct <= -STOP_LOSS_PCT
                if close_by_mean or close_by_time or close_by_stop:
                    target_pos1.set_target_volume(0)
                    target_pos2.set_target_volume(0)
                    trade_count += 1
                    if profit_pct > 0:
                        win_count += 1
                    total_profit += current_profit
                    reason = "均值回归" if close_by_mean else "时间限制" if close_by_time else "止损"
                    print(f"平仓 - {reason}, 盈亏: {profit_pct:.2%}, 持仓天数: {days_held}")
                    position_long = False
                    position_short = False

            # === 开仓逻辑 ===
            else:
                if current_spread < mean_spread - K_THRESHOLD * std_spread:
                    target_pos1.set_target_volume(POSITION_LOTS1)
                    target_pos2.set_target_volume(-position_lots2)
                    position_long = True
                    position_time = current_time
                    entry_price1 = quote1.last_price
                    entry_price2 = quote2.last_price
                    entry_spread = current_spread
                    print(f"开仓 - 多价差, 合约1: {POSITION_LOTS1}手, 合约2: {-position_lots2}手, 比例: {trade_ratio}")
                elif current_spread > mean_spread + K_THRESHOLD * std_spread:
                    target_pos1.set_target_volume(-POSITION_LOTS1)
                    target_pos2.set_target_volume(position_lots2)
                    position_short = True
                    position_time = current_time
                    entry_price1 = quote1.last_price
                    entry_price2 = quote2.last_price
                    entry_spread = current_spread
                    print(f"开仓 - 空价差, 合约1: {-POSITION_LOTS1}手, 合约2: {position_lots2}手, 比例: {trade_ratio}")

        # 每日统计
        if api.is_changing(klines1.iloc[-1], "datetime"):
            account = api.get_account()
            print(f"日期: {current_time.date()}, 账户权益: {account.balance:.2f}, 可用资金: {account.available:.2f}")
            if trade_count > 0:
                print(f"交易统计 - 总交易: {trade_count}, 胜率: {win_count/trade_count:.2%}, 总盈亏: {total_profit:.2f}")

except BacktestFinished as e:
    print("回测结束")
    api.close()

枢轴点反转策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/mean-reversion/pivot-point-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.tafunc import time_to_str

# ===== 全局参数设置 =====
SYMBOL = "SHFE.cu2309"
POSITION_SIZE = 100
START_DATE = date(2023, 2, 10)  # 回测开始日期
END_DATE = date(2023, 3, 15)  # 回测结束日期

# 策略参数
REVERSAL_CONFIRM = 50  # 反转确认点数
STOP_LOSS_POINTS = 100  # 止损点数

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
stop_loss_price = 0  # 止损价格
prev_high = 0  # 前一日最高价
prev_low = 0  # 前一日最低价
prev_close = 0  # 前一日收盘价

# ===== 策略开始 =====
print("开始运行枢轴点反转策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

def calculate_pivot_points(high, low, close):
    """计算枢轴点及支撑阻力位"""
    pivot = (high + low + close) / 3
    r1 = (2 * pivot) - low
    s1 = (2 * pivot) - high
    r2 = pivot + (high - low)
    s2 = pivot - (high - low)
    r3 = r1 + (high - low)
    s3 = s1 - (high - low)
    return pivot, r1, r2, r3, s1, s2, s3

try:
    while True:
        # 等待更新
        api.wait_update()

        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据
            if len(klines) < 2:
                continue

            # 获取当前价格和前一日数据
            current_price = klines.close.iloc[-1].item()
            current_high = klines.high.iloc[-1].item()
            current_low = klines.low.iloc[-1].item()
            prev_close = klines.close.iloc[-2].item()

            # 如果是新的一天,更新前一日数据
            if klines.datetime.iloc[-1] != klines.datetime.iloc[-2]:
                prev_high = klines.high.iloc[-2].item()
                prev_low = klines.low.iloc[-2].item()
                prev_close = klines.close.iloc[-2].item()
                print(f"\n新的一天开始:")
                print(f"前一日数据 - 最高价: {prev_high:.2f}, 最低价: {prev_low:.2f}, 收盘价: {prev_close:.2f}")

            # 计算枢轴点及支撑阻力位
            pivot, r1, r2, r3, s1, s2, s3 = calculate_pivot_points(prev_high, prev_low, prev_close)

            # 获取最新数据
            current_timestamp = klines.datetime.iloc[-1]
            current_datetime = time_to_str(current_timestamp)

            # 打印当前状态
            print(f"\n日期: {current_datetime}")
            print(f"当前价格: {current_price:.2f}")
            print(f"枢轴点: {pivot:.2f}")
            print(f"支撑位: S1={s1:.2f}, S2={s2:.2f}, S3={s3:.2f}")
            print(f"阻力位: R1={r1:.2f}, R2={r2:.2f}, R3={r3:.2f}")

            # 打印信号条件
            print("\n多头信号条件:")
            print(f"1. 价格在S1附近: {current_price <= s1 + REVERSAL_CONFIRM and current_price > s1 - REVERSAL_CONFIRM}")
            print(f"2. 价格高于当日最低价: {current_price > klines.low.iloc[-1].item()}")
            print(f"3. 价格高于前一日收盘价: {current_price > prev_close}")

            print("\n空头信号条件:")
            print(f"1. 价格在R1附近: {current_price >= r1 - REVERSAL_CONFIRM and current_price < r1 + REVERSAL_CONFIRM}")
            print(f"2. 价格低于当日最高价: {current_price < klines.high.iloc[-1].item()}")
            print(f"3. 价格低于前一日收盘价: {current_price < prev_close}")

            # ===== 交易逻辑 =====

            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件:价格低于S1
                if current_price < s1:
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price - STOP_LOSS_POINTS
                    print(f"\n多头开仓信号! 开仓价: {entry_price:.2f}, 止损价: {stop_loss_price:.2f}")

                # 空头开仓条件:价格高于R1
                elif current_price > r1:
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price + STOP_LOSS_POINTS
                    print(f"\n空头开仓信号! 开仓价: {entry_price:.2f}, 止损价: {stop_loss_price:.2f}")

            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 止盈条件:价格回到枢轴点或更高
                if current_price >= pivot:
                    profit = (current_price - entry_price) * POSITION_SIZE
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈平仓: 价格={current_price:.2f}, 盈利={profit:.2f}")
                # 止损条件
                elif current_price <= stop_loss_price:
                    loss = (entry_price - current_price) * POSITION_SIZE
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price:.2f}, 亏损={loss:.2f}")

            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 止盈条件:价格回到枢轴点或更低
                if current_price <= pivot:
                    profit = (entry_price - current_price) * POSITION_SIZE
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈平仓: 价格={current_price:.2f}, 盈利={profit:.2f}")
                # 止损条件
                elif current_price >= stop_loss_price:
                    loss = (current_price - entry_price) * POSITION_SIZE
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price:.2f}, 亏损={loss:.2f}")

except BacktestFinished as e:
    print("回测结束")
    api.close()

CCI均值回归策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/mean-reversion/cci-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.tafunc import time_to_str
from tqsdk.ta import CCI

# ===== 全局参数设置 =====
SYMBOL = "SHFE.au2406"  # 黄金期货主力合约
POSITION_SIZE = 50  # 每次交易手数
START_DATE = date(2023, 9, 20)  # 回测开始日期
END_DATE = date(2024, 2, 28)  # 回测结束日期

# CCI参数
CCI_PERIOD = 10  # CCI计算周期
CCI_UPPER = 100  # CCI上轨
CCI_LOWER = -100  # CCI下轨
STOP_LOSS_PERCENT = 0.01  # 止损比例

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
stop_loss_price = 0  # 止损价格
last_cci_state = 0  # 上一次CCI状态:1=高于上轨,-1=低于下轨,0=中间区域

# ===== 策略开始 =====
print("开始运行CCI均值回归策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()
        
        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < CCI_PERIOD + 10:
                continue
            
            # 计算CCI
            cci_series = CCI(klines, CCI_PERIOD)
            cci = cci_series.iloc[-1].item()  # 使用item()方法获取标量值
            current_price = klines.close.iloc[-1].item()  # 同样使用item()方法
            
            # 确定当前CCI状态
            current_cci_state = 0
            if cci > CCI_UPPER:
                current_cci_state = 1
            elif cci < CCI_LOWER:
                current_cci_state = -1
            
            # 获取最新数据
            current_timestamp = klines.datetime.iloc[-1]
            current_datetime = time_to_str(current_timestamp)
            
            # 打印当前状态
            print(f"日期: {current_datetime}, 价格: {current_price:.2f}, CCI: {cci:.2f}")
            
            # ===== 交易逻辑 =====
            
            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件:CCI从下轨上穿
                if last_cci_state == -1 and current_cci_state == 0:
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price * (1 - STOP_LOSS_PERCENT)
                    print(f"多头开仓: 价格={entry_price:.2f}, 止损价={stop_loss_price:.2f}")
                
                # 空头开仓条件:CCI从上轨下穿
                elif last_cci_state == 1 and current_cci_state == 0:
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price * (1 + STOP_LOSS_PERCENT)
                    print(f"空头开仓: 价格={entry_price:.2f}, 止损价={stop_loss_price:.2f}")
            
            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 止盈条件:CCI从上轨下穿
                if last_cci_state == 1 and current_cci_state == 0:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")
                
                # 止损条件:价格跌破止损价
                elif current_price < stop_loss_price:
                    loss_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price:.2f}, 亏损={loss_pct:.2f}%")
            
            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 止盈条件:CCI从下轨上穿
                if last_cci_state == -1 and current_cci_state == 0:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")
                
                # 止损条件:价格突破止损价
                elif current_price > stop_loss_price:
                    loss_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price:.2f}, 亏损={loss_pct:.2f}%")
            
            # 更新上一次CCI状态
            last_cci_state = current_cci_state

except BacktestFinished as e:
    print("回测结束")
    api.close()

RSI超买/超卖交易策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/mean-reversion/rsi-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.ta import RSI
from tqsdk.tafunc import time_to_str

# ===== 全局参数设置 =====
SYMBOL = "DCE.a2101"
POSITION_SIZE = 500  # 每次交易手数
START_DATE = date(2020, 4, 20)  # 回测开始日期
END_DATE = date(2020, 11, 20)  # 回测结束日期

# RSI参数
RSI_PERIOD = 6  # RSI计算周期,
OVERBOUGHT_THRESHOLD = 65  # 超买阈值
OVERSOLD_THRESHOLD = 35  # 超卖阈值

# 风控参数
STOP_LOSS_PERCENT = 0.01  # 止损百分比
TIME_STOP_DAYS = 10  # 缩短时间止损天数

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
stop_loss_price = 0  # 止损价格
entry_date = None  # 开仓日期
was_overbought = False  # 是否曾进入超买区域
was_oversold = False  # 是否曾进入超卖区域

# ===== 策略开始 =====
print("开始运行RSI超买/超卖反转策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()

        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < RSI_PERIOD + 10:
                continue

            # 使用天勤的RSI函数计算
            rsi = RSI(klines, RSI_PERIOD)
            current_rsi = float(rsi.iloc[-1].iloc[0])
            previous_rsi = float(rsi.iloc[-2].iloc[0])

            # 更新超买/超卖状态
            if previous_rsi > OVERBOUGHT_THRESHOLD:
                was_overbought = True
            if previous_rsi < OVERSOLD_THRESHOLD:
                was_oversold = True

            # 获取最新数据
            current_price = float(klines.close.iloc[-1])
            current_timestamp = klines.datetime.iloc[-1]
            current_datetime = time_to_str(current_timestamp)  # 使用time_to_str转换时间

            # 打印当前状态
            print(f"日期: {current_datetime}, 价格: {current_price:.2f}, RSI: {current_rsi:.2f}")

            # ===== 交易逻辑 =====

            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件:RSI从超卖区域回升
                if was_oversold and previous_rsi < OVERSOLD_THRESHOLD and current_rsi > OVERSOLD_THRESHOLD:
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price * (1 - STOP_LOSS_PERCENT)
                    entry_date = current_timestamp  # 存储时间戳
                    print(f"多头开仓: 价格={entry_price:.2f}, 止损={stop_loss_price:.2f}")
                    was_oversold = False  # 重置超卖状态

                # 空头开仓条件:RSI从超买区域回落
                elif was_overbought and previous_rsi > OVERBOUGHT_THRESHOLD and current_rsi < OVERBOUGHT_THRESHOLD:
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    stop_loss_price = entry_price * (1 + STOP_LOSS_PERCENT)
                    entry_date = current_timestamp  # 存储时间戳
                    print(f"空头开仓: 价格={entry_price:.2f}, 止损={stop_loss_price:.2f}")
                    was_overbought = False  # 重置超买状态

            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 止损条件
                if current_price <= stop_loss_price:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 止盈条件:RSI进入超买区域
                elif current_rsi > OVERBOUGHT_THRESHOLD:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 时间止损
                elif (current_timestamp - entry_date) / (60 * 60 * 24) >= TIME_STOP_DAYS:  # 计算天数差
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头时间止损: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 止损条件
                if current_price >= stop_loss_price:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 止盈条件:RSI进入超卖区域
                elif current_rsi < OVERSOLD_THRESHOLD:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 时间止损
                elif (current_timestamp - entry_date) / (60 * 60 * 24) >= TIME_STOP_DAYS:  # 计算天数差
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头时间止损: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

except BacktestFinished as e:
    print("回测结束")
    api.close()

Z-Score均值回归策略 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/mean-reversion/zscore-strategy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = "Chaos"

from datetime import date
import numpy as np
from tqsdk import TqApi, TqAuth, TqBacktest, TargetPosTask, BacktestFinished
from tqsdk.tafunc import time_to_str

# ===== 全局参数设置 =====
SYMBOL = "SHFE.au2106"
POSITION_SIZE = 50  # 每次交易手数
START_DATE = date(2020, 11, 1)  # 回测开始日期
END_DATE = date(2020, 12, 15)  # 回测结束日期

# Z-Score参数
WINDOW_SIZE = 14  # Z-Score计算窗口期
ENTRY_THRESHOLD = 1.8  # 开仓阈值
EXIT_THRESHOLD = 0.4  # 平仓阈值
STOP_LOSS_THRESHOLD = 2.5  # 止损阈值

# 风控参数
TIME_STOP_DAYS = 8  # 时间止损天数

# ===== 全局变量 =====
current_direction = 0  # 当前持仓方向:1=多头,-1=空头,0=空仓
entry_price = 0  # 开仓价格
entry_date = None  # 开仓日期

# ===== 策略开始 =====
print("开始运行Z-Score均值回归策略...")

# 创建API实例
api = TqApi(backtest=TqBacktest(start_dt=START_DATE, end_dt=END_DATE),
            auth=TqAuth("快期账号", "快期密码"))

# 订阅合约的K线数据
klines = api.get_kline_serial(SYMBOL, 60 * 60 * 24)  # 日线数据

# 创建目标持仓任务
target_pos = TargetPosTask(api, SYMBOL)

try:
    while True:
        # 等待更新
        api.wait_update()

        # 如果K线有更新
        if api.is_changing(klines.iloc[-1], "datetime"):
            # 确保有足够的数据计算指标
            if len(klines) < WINDOW_SIZE + 10:
                continue

            # 计算Z-Score
            prices = klines.close.iloc[-WINDOW_SIZE:]  # 获取最近20天的收盘价
            mean = np.mean(prices)  # 计算均值
            std = np.std(prices)  # 计算标准差
            current_price = float(klines.close.iloc[-1])  # 当前价格

            # 处理标准差为0的情况
            if std == 0:
                z_score = 0  # 如果标准差为0,说明所有价格都相同,Z-Score设为0
            else:
                z_score = (current_price - mean) / std  # 计算Z-Score

            # 获取最新数据
            current_timestamp = klines.datetime.iloc[-1]
            current_datetime = time_to_str(current_timestamp)

            # 打印当前状态
            print(f"日期: {current_datetime}, 价格: {current_price:.2f}, Z-Score: {z_score:.2f}")

            # ===== 交易逻辑 =====

            # 空仓状态 - 寻找开仓机会
            if current_direction == 0:
                # 多头开仓条件:Z-Score显著低于均值
                if z_score < -ENTRY_THRESHOLD:
                    current_direction = 1
                    target_pos.set_target_volume(POSITION_SIZE)
                    entry_price = current_price
                    entry_date = current_timestamp
                    print(f"多头开仓: 价格={entry_price:.2f}, Z-Score={z_score:.2f}")

                # 空头开仓条件:Z-Score显著高于均值
                elif z_score > ENTRY_THRESHOLD:
                    current_direction = -1
                    target_pos.set_target_volume(-POSITION_SIZE)
                    entry_price = current_price
                    entry_date = current_timestamp
                    print(f"空头开仓: 价格={entry_price:.2f}, Z-Score={z_score:.2f}")

            # 多头持仓 - 检查平仓条件
            elif current_direction == 1:
                # 止损条件:Z-Score继续大幅下跌
                if z_score < -STOP_LOSS_THRESHOLD:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止损平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 止盈条件:Z-Score回归到均值附近
                elif -EXIT_THRESHOLD <= z_score <= EXIT_THRESHOLD:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头止盈平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 时间止损
                elif (current_timestamp - entry_date) / (60 * 60 * 24) >= TIME_STOP_DAYS:
                    profit_pct = (current_price - entry_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"多头时间止损: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

            # 空头持仓 - 检查平仓条件
            elif current_direction == -1:
                # 止损条件:Z-Score继续大幅上涨
                if z_score > STOP_LOSS_THRESHOLD:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止损平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 止盈条件:Z-Score回归到均值附近
                elif -EXIT_THRESHOLD <= z_score <= EXIT_THRESHOLD:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头止盈平仓: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

                # 时间止损
                elif (current_timestamp - entry_date) / (60 * 60 * 24) >= TIME_STOP_DAYS:
                    profit_pct = (entry_price - current_price) / entry_price * 100
                    target_pos.set_target_volume(0)
                    current_direction = 0
                    print(f"空头时间止损: 价格={current_price:.2f}, 盈亏={profit_pct:.2f}%")

except BacktestFinished as e:
    print("回测结束")
    api.close()

算法交易策略

冰山订单算法 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/other/iceberg-order

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from tqsdk import TqApi, TqAuth, TqKq, TargetPosTask

# === 用户参数 ===
SYMBOL = "SHFE.ag2506"      # 交易合约
TOTAL_VOLUME = 100          # 目标总手数
MIN_VOLUME = 1              # 每笔最小委托手数
MAX_VOLUME = 10             # 每笔最大委托手数
DIRECTION = "BUY"           # "BUY"为买入,"SELL"为卖出
ORDER_TYPE = "ACTIVE"       # 对价 "ACTIVE" / 挂价 "PASSIVE" / 指定价 lambda direction: 价格

# === 初始化 ===
api = TqApi(account=TqKq(), auth=TqAuth("快期账号", "快期密码"))
quote = api.get_quote(SYMBOL)

# 创建目标持仓任务
target_pos = TargetPosTask(
    api, SYMBOL,
    price=ORDER_TYPE,
    min_volume=MIN_VOLUME,
    max_volume=MAX_VOLUME
)

# 获取下单方式描述
order_type_str = (f"指定价 {ORDER_TYPE(DIRECTION)}" if callable(ORDER_TYPE) 
                 else str(ORDER_TYPE))

print(f"冰山算法启动,合约: {SYMBOL},目标: {TOTAL_VOLUME}手,"
      f"每批: {MIN_VOLUME}-{MAX_VOLUME}手,方向: {DIRECTION},下单方式: {order_type_str}")

try:
    # 获取初始持仓并设置目标
    pos = api.get_position(SYMBOL)
    start_net_pos = pos.pos_long - pos.pos_short
    target_volume = start_net_pos + (TOTAL_VOLUME if DIRECTION == "BUY" else -TOTAL_VOLUME)
    target_pos.set_target_volume(target_volume)
    
    last_progress = 0  # 记录上次进度

    while True:
        api.wait_update()
        pos = api.get_position(SYMBOL)
        net_pos = pos.pos_long - pos.pos_short
        progress = abs(net_pos - start_net_pos)
        
        # 当进度发生变化时打印
        if progress != last_progress:
            print(f"当前进度: {progress}/{TOTAL_VOLUME}")
            last_progress = progress
        
        # 检查是否完成
        if (DIRECTION == "BUY" and net_pos >= target_volume) or \
           (DIRECTION == "SELL" and net_pos <= target_volume):
            print(f"冰山算法完成")
            break

except Exception as e:
    print(f"算法执行异常: {e}")
finally:
    api.close()

盘口下单算法 (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/other/handicap

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from tqsdk import TqApi, TqAuth, TqKq
import math

# === 用户参数 ===
SYMBOL = "SHFE.ag2506"      # 交易合约
DIRECTION = "BUY"          # "BUY"为买入,"SELL"为卖出
OFFSET = "OPEN"       # "OPEN"为开仓,"CLOSE"为平仓,"CLOSETODAY"为平今仓
TOTAL_VOLUME = 20          # 目标总手数
ORDERBOOK_RATIO = 0.2       # 盘口量比比例(如0.2表示20%)
ORDER_TYPE = "对价"         # "对价"为对价报单,"挂价"为挂价报单
ORDERBOOK_TYPE = "对手盘口"  # "对手盘口"或"挂单盘口"

# === 初始化API ===
acc = TqKq()
api = TqApi(account=acc, auth=TqAuth("快期账号", "快期密码"))
quote = api.get_quote(SYMBOL)

# === 初始化变量 ===
traded_volume = 0           # 已成交手数
last_printed_volume = 0     # 上次打印的成交手数
current_order = None        # 当前订单

print(f"盘口算法启动,合约: {SYMBOL},目标: {TOTAL_VOLUME}手,方向: {DIRECTION},量比比例: {ORDERBOOK_RATIO*100}%")

def get_orderbook_volume():
    """获取盘口数量"""
    if ORDERBOOK_TYPE == "对手盘口":
        # 对手盘口:买入时看卖一量,卖出时看买一量
        return quote.ask_volume1 if DIRECTION == "BUY" else quote.bid_volume1
    else:
        # 挂单盘口:买入时看买一量,卖出时看卖一量
        return quote.bid_volume1 if DIRECTION == "BUY" else quote.ask_volume1

def get_order_price():
    """获取下单价格"""
    if ORDER_TYPE == "对价":
        # 对价报单
        return quote.ask_price1 if DIRECTION == "BUY" else quote.bid_price1
    else:
        # 挂价报单
        return quote.bid_price1 if DIRECTION == "BUY" else quote.ask_price1

try:
    while traded_volume < TOTAL_VOLUME:
        api.wait_update()
        
        # 获取当前盘口数量
        orderbook_volume = get_orderbook_volume()
        # 计算本轮应下单手数
        order_volume = int(math.floor(orderbook_volume * ORDERBOOK_RATIO))
        # 不能超过剩余目标
        order_volume = min(order_volume, TOTAL_VOLUME - traded_volume)
        
        if order_volume > 0:
            print(f"\n当前盘口数量: {orderbook_volume}手")
            print(f"计算下单手数: {orderbook_volume} * {ORDERBOOK_RATIO} = {order_volume}手")
            
            # 下新单
            price = get_order_price()
            current_order = api.insert_order(
                symbol=SYMBOL,
                direction=DIRECTION,
                offset=OFFSET,
                volume=order_volume,
                limit_price=price
            )
            print(f"下单: {order_volume}手,价格: {price},报单类型: {ORDER_TYPE}")
            
            # 记录上一次的状态和剩余量
            last_status = current_order.status
            last_volume_left = current_order.volume_left
            
            # 等待订单状态更新
            while current_order.status == "ALIVE":
                api.wait_update()
                # 只在状态或剩余量发生变化时打印
                if current_order.status != last_status or current_order.volume_left != last_volume_left:
                    print(f"订单状态更新: {current_order.status}, 剩余量: {current_order.volume_left}")
                    last_status = current_order.status
                    last_volume_left = current_order.volume_left
                
                # 检查价格是否变化
                new_price = get_order_price()
                if new_price != price:
                    print(f"价格发生变化: {price} -> {new_price}")
                    # 如果还有未成交部分,先撤单
                    if current_order.volume_left > 0:
                        print(f"撤单: {current_order.volume_left}手")
                        api.cancel_order(current_order.order_id)
                        # 等待撤单完成
                        while current_order.status == "ALIVE":
                            api.wait_update()
                        # 重新下单
                        current_order = api.insert_order(
                            symbol=SYMBOL,
                            direction=DIRECTION,
                            offset=OFFSET,
                            volume=current_order.volume_left,
                            limit_price=new_price
                        )
                        print(f"重新下单: {current_order.volume_left}手,价格: {new_price}")
                        price = new_price
                        last_status = current_order.status
                        last_volume_left = current_order.volume_left
                    else:
                        # 如果订单已经完成,跳出循环
                        break
            
            # 检查订单是否出错
            if current_order.is_error:
                print(f"下单失败: {current_order.last_msg}")
                break
            
            # 计算实际成交量
            actual_trade = current_order.volume_orign - current_order.volume_left
            if actual_trade > 0:
                print(f"本轮成交: {actual_trade}手")
                traded_volume += actual_trade
                
                # 只在成交手数变化时打印进度
                if traded_volume > last_printed_volume:
                    print(f"当前进度: {traded_volume} / {TOTAL_VOLUME}")
                    last_printed_volume = traded_volume
            else:
                print(f"订单未成交: {current_order.last_msg}")
                
    print("盘口算法执行完毕")
except Exception as e:
    print(f"算法执行异常: {e}")
finally:
    api.close()

跟量下单算法(POV) (难度:初级)

策略说明 https://www.shinnytech.com/articles/trading-strategy/other/pov

#!/usr/bin/env python
# coding=utf-8
__author__ = "Chaos"

from tqsdk import TqApi, TqAuth, TqKq
import math

# === 用户参数 ===
SYMBOL = "SHFE.ag2506"      # 交易合约
DIRECTION = "BUY"           # "BUY"为买入,"SELL"为卖出
OFFSET = "OPEN"             # "OPEN"为开仓,"CLOSE"为平仓,"CLOSETODAY"为平今仓
TOTAL_VOLUME = 30          # 目标总手数
POV_RATIO = 0.1             # 跟量比例(如0.1表示10%)
ORDER_TYPE = "对价"         # "对价"为对价报单,"报价"为挂价报单

# === 初始化API ===
acc = TqKq()
api = TqApi(account=acc, auth=TqAuth("快期账号", "快期密码"))
quote = api.get_quote(SYMBOL)

# === 初始化变量 ===
base_volume = quote.volume  # 启动时的市场累计成交量
traded_volume = 0           # 已成交手数
last_printed_volume = 0     # 上次打印的成交手数

print(f"POV算法启动,合约: {SYMBOL},目标: {TOTAL_VOLUME}手,方向: {DIRECTION},量比比例: {POV_RATIO*100}%")

try:
    while traded_volume < TOTAL_VOLUME:
        api.wait_update()
        new_volume = quote.volume
        delta = new_volume - base_volume
        # 计算本轮应下单手数
        order_volume = int(math.floor(delta * POV_RATIO))
        # 不能超过剩余目标
        order_volume = min(order_volume, TOTAL_VOLUME - traded_volume)

        if order_volume > 0:
            print(f"\n市场成交量: {base_volume} -> {new_volume}手")
            print(f"变化量: {delta}手")
            print(f"计算下单手数: {delta} * {POV_RATIO} = {order_volume}手")

            # 根据报单类型选择价格
            if ORDER_TYPE == "对价":
                # 对价报单
                price = quote.ask_price1 if DIRECTION == "BUY" else quote.bid_price1
            else:
                # 挂价报单
                price = quote.bid_price1 if DIRECTION == "BUY" else quote.ask_price1

            order = api.insert_order(
                symbol=SYMBOL,
                direction=DIRECTION,
                offset=OFFSET,
                volume=order_volume,
                limit_price=price
            )
            print(f"下单: {order_volume}手,价格: {price},报单类型: {ORDER_TYPE}")

            # 记录上一次的状态和剩余量
            last_status = order.status
            last_volume_left = order.volume_left

            # 等待订单状态更新
            while order.status == "ALIVE":
                api.wait_update()
                # 只在状态或剩余量发生变化时打印
                if order.status != last_status or order.volume_left != last_volume_left:
                    print(f"订单状态更新: {order.status}, 剩余量: {order.volume_left}")
                    last_status = order.status
                    last_volume_left = order.volume_left

            # 检查订单是否出错
            if order.is_error:
                print(f"下单失败: {order.last_msg}")
                break

            # 计算实际成交量
            actual_trade = order.volume_orign - order.volume_left
            if actual_trade > 0:
                print(f"本轮成交: {actual_trade}手")
                traded_volume += actual_trade
                base_volume = new_volume  # 更新基准成交量

                # 只在成交手数变化时打印进度
                if traded_volume > last_printed_volume:
                    print(f"当前进度: {traded_volume} / {TOTAL_VOLUME}")
                    last_printed_volume = traded_volume
            else:
                print(f"订单未成交: {order.last_msg}")

    print("POV算法执行完毕")
except Exception as e:
    print(f"捕获到异常: {e}")
finally:
    api.close()