回测系统

回测是量化交易策略开发的核心环节,通过历史数据验证策略的有效性。宽图量化提供了强大的回测系统,帮助您全面评估策略表现。

回测系统概述

我们的回测系统具有以下特点:

  • 高精度模拟: 精确模拟真实交易环境
  • 多维度分析: 全面的性能指标分析
  • 可视化报告: 直观的图表和报告
  • 参数优化: 自动寻找最优参数组合
  • 风险分析: 详细的风险评估

回测流程

1. 准备阶段

在开始回测之前,需要准备以下内容:

# 回测配置
backtest_config = {
    # 基础设置
    'symbol': 'BTCUSDT',
    'start_date': '2023-01-01',
    'end_date': '2023-12-31',
    'timeframe': '1h',
    
    # 资金设置
    'initial_capital': 10000,
    'currency': 'USDT',
    
    # 交易成本
    'commission': 0.001,  # 0.1% 手续费
    'slippage': 0.0005,   # 0.05% 滑点
    
    # 其他设置
    'benchmark': 'BTCUSDT',  # 基准对比
    'risk_free_rate': 0.02   # 无风险利率
}

2. 策略实现

class BacktestStrategy(Strategy):
    def __init__(self):
        super().__init__()
        self.setup_parameters()
        self.setup_indicators()
    
    def setup_parameters(self):
        # 可优化参数
        self.short_ma = self.add_parameter('short_ma', 20, 10, 50)
        self.long_ma = self.add_parameter('long_ma', 50, 30, 100)
        self.stop_loss = self.add_parameter('stop_loss', 0.02, 0.01, 0.05)
        self.take_profit = self.add_parameter('take_profit', 0.06, 0.03, 0.10)
    
    def setup_indicators(self):
        self.sma_short = self.add_indicator('SMA', period=self.short_ma)
        self.sma_long = self.add_indicator('SMA', period=self.long_ma)
        self.rsi = self.add_indicator('RSI', period=14)
        self.atr = self.add_indicator('ATR', period=14)
    
    def on_bar(self, bar):
        # 策略逻辑
        if self.should_buy(bar):
            self.buy_signal(bar)
        elif self.should_sell(bar):
            self.sell_signal(bar)
        
        # 风险管理
        self.manage_positions(bar)
    
    def should_buy(self, bar):
        return (self.sma_short.value > self.sma_long.value and
                self.rsi.value < 70 and
                self.get_position() == 0)
    
    def should_sell(self, bar):
        return (self.sma_short.value < self.sma_long.value and
                self.rsi.value > 30 and
                self.get_position() > 0)

3. 运行回测

# 创建回测引擎
backtest_engine = BacktestEngine(config=backtest_config)

# 添加策略
strategy = BacktestStrategy()
backtest_engine.add_strategy(strategy)

# 运行回测
results = backtest_engine.run()

# 输出结果
print("回测完成!")
print(f"总收益率: {results['total_return']:.2%}")
print(f"年化收益率: {results['annual_return']:.2%}")
print(f"最大回撤: {results['max_drawdown']:.2%}")
print(f"夏普比率: {results['sharpe_ratio']:.2f}")

性能指标

1. 收益指标

总收益率

def calculate_total_return(initial_capital, final_capital):
    return (final_capital - initial_capital) / initial_capital

# 示例
initial_capital = 10000
final_capital = 12500
total_return = calculate_total_return(initial_capital, final_capital)
print(f"总收益率: {total_return:.2%}")  # 25.00%

年化收益率

def calculate_annual_return(total_return, days):
    return (1 + total_return) ** (365 / days) - 1

# 示例
total_return = 0.25
days = 365
annual_return = calculate_annual_return(total_return, days)
print(f"年化收益率: {annual_return:.2%}")  # 25.00%

累计收益曲线

def calculate_cumulative_returns(daily_returns):
    cumulative_returns = []
    cumulative = 1.0
    
    for daily_return in daily_returns:
        cumulative *= (1 + daily_return)
        cumulative_returns.append(cumulative - 1)
    
    return cumulative_returns

2. 风险指标

最大回撤

def calculate_max_drawdown(equity_curve):
    peak = equity_curve[0]
    max_drawdown = 0
    
    for value in equity_curve:
        if value > peak:
            peak = value
        
        drawdown = (peak - value) / peak
        if drawdown > max_drawdown:
            max_drawdown = drawdown
    
    return max_drawdown

# 示例
equity_curve = [10000, 11000, 10500, 12000, 11000, 13000]
max_dd = calculate_max_drawdown(equity_curve)
print(f"最大回撤: {max_dd:.2%}")

波动率

import math

def calculate_volatility(returns):
    mean_return = sum(returns) / len(returns)
    variance = sum((r - mean_return) ** 2 for r in returns) / len(returns)
    volatility = math.sqrt(variance) * math.sqrt(252)  # 年化波动率
    return volatility

VaR (风险价值)

def calculate_var(returns, confidence_level=0.05):
    sorted_returns = sorted(returns)
    index = int(len(sorted_returns) * confidence_level)
    return sorted_returns[index]

# 示例:95%置信度下的VaR
daily_returns = [-0.02, 0.01, -0.01, 0.03, -0.015, 0.02]
var_95 = calculate_var(daily_returns, 0.05)
print(f"95% VaR: {var_95:.2%}")

3. 风险调整收益指标

夏普比率

def calculate_sharpe_ratio(returns, risk_free_rate=0.02):
    excess_returns = [r - risk_free_rate/252 for r in returns]
    mean_excess_return = sum(excess_returns) / len(excess_returns)
    volatility = calculate_volatility(returns) / math.sqrt(252)
    
    if volatility == 0:
        return 0
    
    return mean_excess_return / volatility * math.sqrt(252)

索提诺比率

def calculate_sortino_ratio(returns, risk_free_rate=0.02):
    excess_returns = [r - risk_free_rate/252 for r in returns]
    mean_excess_return = sum(excess_returns) / len(excess_returns)
    
    # 只考虑负收益的波动率
    negative_returns = [r for r in excess_returns if r < 0]
    if not negative_returns:
        return float('inf')
    
    downside_variance = sum(r ** 2 for r in negative_returns) / len(returns)
    downside_volatility = math.sqrt(downside_variance) * math.sqrt(252)
    
    return mean_excess_return * math.sqrt(252) / downside_volatility

卡尔马比率

def calculate_calmar_ratio(annual_return, max_drawdown):
    if max_drawdown == 0:
        return float('inf')
    return annual_return / max_drawdown

4. 交易指标

胜率

def calculate_win_rate(trades):
    winning_trades = sum(1 for trade in trades if trade.profit > 0)
    return winning_trades / len(trades) if trades else 0

盈亏比

def calculate_profit_factor(trades):
    gross_profit = sum(trade.profit for trade in trades if trade.profit > 0)
    gross_loss = abs(sum(trade.profit for trade in trades if trade.profit < 0))
    
    if gross_loss == 0:
        return float('inf')
    
    return gross_profit / gross_loss

平均持仓时间

def calculate_average_holding_period(trades):
    if not trades:
        return 0
    
    total_holding_time = sum(trade.exit_time - trade.entry_time for trade in trades)
    return total_holding_time / len(trades)

回测报告

1. 基础统计报告

def generate_basic_report(results):
    report = f"""
    ==================== 回测报告 ====================
    
    基础信息:
    策略名称: {results['strategy_name']}
    回测期间: {results['start_date']} - {results['end_date']}
    交易对: {results['symbol']}
    初始资金: {results['initial_capital']:,.2f} USDT
    
    收益指标:
    总收益率: {results['total_return']:.2%}
    年化收益率: {results['annual_return']:.2%}
    基准收益率: {results['benchmark_return']:.2%}
    超额收益: {results['excess_return']:.2%}
    
    风险指标:
    最大回撤: {results['max_drawdown']:.2%}
    波动率: {results['volatility']:.2%}
    95% VaR: {results['var_95']:.2%}
    
    风险调整收益:
    夏普比率: {results['sharpe_ratio']:.2f}
    索提诺比率: {results['sortino_ratio']:.2f}
    卡尔马比率: {results['calmar_ratio']:.2f}
    
    交易统计:
    总交易次数: {results['total_trades']}
    胜率: {results['win_rate']:.2%}
    盈亏比: {results['profit_factor']:.2f}
    平均持仓时间: {results['avg_holding_period']:.1f} 小时
    
    ==================================================
    """
    return report

2. 详细分析报告

def generate_detailed_report(results):
    # 月度收益分析
    monthly_returns = calculate_monthly_returns(results['daily_returns'])
    
    # 回撤分析
    drawdown_periods = analyze_drawdown_periods(results['equity_curve'])
    
    # 交易分析
    trade_analysis = analyze_trades(results['trades'])
    
    report = {
        'summary': generate_basic_report(results),
        'monthly_returns': monthly_returns,
        'drawdown_analysis': drawdown_periods,
        'trade_analysis': trade_analysis,
        'risk_metrics': calculate_risk_metrics(results),
        'performance_attribution': analyze_performance_attribution(results)
    }
    
    return report

3. 可视化报告

import matplotlib.pyplot as plt
import pandas as pd

def create_performance_charts(results):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # 1. 累计收益曲线
    axes[0, 0].plot(results['dates'], results['cumulative_returns'], label='策略')
    axes[0, 0].plot(results['dates'], results['benchmark_returns'], label='基准')
    axes[0, 0].set_title('累计收益曲线')
    axes[0, 0].legend()
    axes[0, 0].grid(True)
    
    # 2. 回撤曲线
    axes[0, 1].fill_between(results['dates'], results['drawdown'], 0, 
                           color='red', alpha=0.3)
    axes[0, 1].set_title('回撤曲线')
    axes[0, 1].grid(True)
    
    # 3. 月度收益热力图
    monthly_returns = pd.Series(results['monthly_returns'])
    monthly_returns.plot(kind='bar', ax=axes[1, 0])
    axes[1, 0].set_title('月度收益')
    axes[1, 0].tick_params(axis='x', rotation=45)
    
    # 4. 收益分布直方图
    axes[1, 1].hist(results['daily_returns'], bins=50, alpha=0.7)
    axes[1, 1].set_title('日收益分布')
    axes[1, 1].grid(True)
    
    plt.tight_layout()
    return fig

参数优化

1. 网格搜索优化

def grid_search_optimization(strategy_class, param_ranges, data):
    best_params = None
    best_score = -float('inf')
    results = []
    
    # 生成所有参数组合
    param_combinations = generate_param_combinations(param_ranges)
    
    for params in param_combinations:
        # 创建策略实例
        strategy = strategy_class(**params)
        
        # 运行回测
        backtest_result = run_backtest(strategy, data)
        
        # 评估性能(使用夏普比率作为优化目标)
        score = backtest_result['sharpe_ratio']
        
        results.append({
            'params': params,
            'score': score,
            'result': backtest_result
        })
        
        if score > best_score:
            best_score = score
            best_params = params
    
    return best_params, results

# 使用示例
param_ranges = {
    'short_ma': range(10, 31, 5),
    'long_ma': range(40, 101, 10),
    'stop_loss': [0.01, 0.02, 0.03],
    'take_profit': [0.04, 0.06, 0.08]
}

best_params, all_results = grid_search_optimization(
    MyStrategy, param_ranges, historical_data
)

2. 遗传算法优化

import random

class GeneticOptimizer:
    def __init__(self, strategy_class, param_bounds, population_size=50):
        self.strategy_class = strategy_class
        self.param_bounds = param_bounds
        self.population_size = population_size
    
    def create_individual(self):
        individual = {}
        for param_name, (min_val, max_val) in self.param_bounds.items():
            if isinstance(min_val, int):
                individual[param_name] = random.randint(min_val, max_val)
            else:
                individual[param_name] = random.uniform(min_val, max_val)
        return individual
    
    def evaluate_fitness(self, individual, data):
        strategy = self.strategy_class(**individual)
        result = run_backtest(strategy, data)
        return result['sharpe_ratio']
    
    def crossover(self, parent1, parent2):
        child = {}
        for param_name in parent1:
            if random.random() < 0.5:
                child[param_name] = parent1[param_name]
            else:
                child[param_name] = parent2[param_name]
        return child
    
    def mutate(self, individual, mutation_rate=0.1):
        for param_name, value in individual.items():
            if random.random() < mutation_rate:
                min_val, max_val = self.param_bounds[param_name]
                if isinstance(min_val, int):
                    individual[param_name] = random.randint(min_val, max_val)
                else:
                    individual[param_name] = random.uniform(min_val, max_val)
        return individual
    
    def optimize(self, data, generations=100):
        # 初始化种群
        population = [self.create_individual() for _ in range(self.population_size)]
        
        for generation in range(generations):
            # 评估适应度
            fitness_scores = [self.evaluate_fitness(ind, data) for ind in population]
            
            # 选择、交叉、变异
            new_population = []
            for _ in range(self.population_size):
                parent1 = self.tournament_selection(population, fitness_scores)
                parent2 = self.tournament_selection(population, fitness_scores)
                child = self.crossover(parent1, parent2)
                child = self.mutate(child)
                new_population.append(child)
            
            population = new_population
        
        # 返回最优个体
        final_fitness = [self.evaluate_fitness(ind, data) for ind in population]
        best_index = final_fitness.index(max(final_fitness))
        return population[best_index]

3. 贝叶斯优化

from skopt import gp_minimize
from skopt.space import Real, Integer

def bayesian_optimization(strategy_class, data):
    # 定义搜索空间
    space = [
        Integer(10, 30, name='short_ma'),
        Integer(40, 100, name='long_ma'),
        Real(0.01, 0.05, name='stop_loss'),
        Real(0.03, 0.10, name='take_profit')
    ]
    
    # 定义目标函数
    def objective(params):
        short_ma, long_ma, stop_loss, take_profit = params
        strategy = strategy_class(
            short_ma=short_ma,
            long_ma=long_ma,
            stop_loss=stop_loss,
            take_profit=take_profit
        )
        result = run_backtest(strategy, data)
        return -result['sharpe_ratio']  # 最小化负夏普比率
    
    # 执行优化
    result = gp_minimize(
        func=objective,
        dimensions=space,
        n_calls=100,
        random_state=42
    )
    
    return {
        'short_ma': result.x[0],
        'long_ma': result.x[1],
        'stop_loss': result.x[2],
        'take_profit': result.x[3]
    }

回测最佳实践

1. 数据质量控制

def validate_data_quality(data):
    issues = []
    
    # 检查缺失数据
    missing_data = data.isnull().sum()
    if missing_data.any():
        issues.append(f"发现缺失数据: {missing_data}")
    
    # 检查异常价格
    for column in ['open', 'high', 'low', 'close']:
        if (data[column] <= 0).any():
            issues.append(f"{column} 列存在非正数价格")
    
    # 检查价格逻辑
    invalid_ohlc = (data['high'] < data['low']) |                    (data['high'] < data['open']) |                    (data['high'] < data['close']) |                    (data['low'] > data['open']) |                    (data['low'] > data['close'])
    
    if invalid_ohlc.any():
        issues.append("发现不合理的OHLC价格关系")
    
    # 检查异常波动
    returns = data['close'].pct_change()
    extreme_returns = returns.abs() > 0.5  # 50%的单日波动
    if extreme_returns.any():
        issues.append(f"发现异常波动: {extreme_returns.sum()} 个数据点")
    
    return issues

2. 避免前瞻偏差

class BacktestEngine:
    def __init__(self):
        self.current_time = None
        self.available_data = {}
    
    def get_historical_data(self, symbol, end_time, periods):
        # 只返回当前时间之前的数据
        historical_data = self.data[self.data.index < end_time].tail(periods)
        return historical_data
    
    def calculate_indicator(self, data, indicator_type, **params):
        # 确保指标计算不使用未来数据
        if self.current_time is not None:
            data = data[data.index <= self.current_time]
        
        return self.indicator_calculator.calculate(data, indicator_type, **params)

3. 交易成本建模

class TradingCostModel:
    def __init__(self, commission_rate=0.001, slippage_rate=0.0005):
        self.commission_rate = commission_rate
        self.slippage_rate = slippage_rate
    
    def calculate_execution_price(self, order_price, order_side, market_impact=0):
        # 滑点影响
        if order_side == 'BUY':
            execution_price = order_price * (1 + self.slippage_rate + market_impact)
        else:
            execution_price = order_price * (1 - self.slippage_rate - market_impact)
        
        return execution_price
    
    def calculate_commission(self, trade_value):
        return trade_value * self.commission_rate
    
    def calculate_total_cost(self, trade_value, order_side, market_impact=0):
        commission = self.calculate_commission(trade_value)
        slippage_cost = trade_value * (self.slippage_rate + market_impact)
        return commission + slippage_cost

4. 样本外验证

def walk_forward_analysis(strategy_class, data, optimization_window=252, 
                         test_window=63, step_size=21):
    results = []
    
    for start_idx in range(0, len(data) - optimization_window - test_window, step_size):
        # 优化期数据
        opt_end_idx = start_idx + optimization_window
        optimization_data = data.iloc[start_idx:opt_end_idx]
        
        # 测试期数据
        test_end_idx = opt_end_idx + test_window
        test_data = data.iloc[opt_end_idx:test_end_idx]
        
        # 在优化期数据上优化参数
        best_params = optimize_parameters(strategy_class, optimization_data)
        
        # 在测试期数据上验证策略
        strategy = strategy_class(**best_params)
        test_result = run_backtest(strategy, test_data)
        
        results.append({
            'optimization_period': (start_idx, opt_end_idx),
            'test_period': (opt_end_idx, test_end_idx),
            'best_params': best_params,
            'test_result': test_result
        })
    
    return results

常见回测陷阱

1. 过度拟合

问题: 参数过度优化,在历史数据上表现完美但实盘表现差

解决方案:

  • 使用样本外数据验证
  • 限制参数优化的复杂度
  • 使用交叉验证
  • 关注参数稳定性

2. 生存偏差

问题: 只使用仍在交易的品种数据,忽略了退市的品种

解决方案:

  • 使用完整的历史数据集
  • 考虑品种的生命周期
  • 分析退市品种的影响

3. 数据窥探偏差

问题: 反复查看数据和结果,无意中调整策略

解决方案:

  • 预先定义策略规则
  • 限制回测次数
  • 使用盲测数据集

下一步

掌握了回测系统后,建议您:

  1. 学习风险管理 - 完善策略的风险控制
  2. 了解数据分析 - 提高数据处理能力
  3. 查看API接入 - 将策略部署到实盘
  4. 参与社区讨论 - 与其他用户交流回测经验

重要提示: 回测结果不代表未来表现。历史数据的优异表现不能保证实盘交易的成功。请务必进行充分的样本外验证,并从小额资金开始实盘测试。