回测系统
回测是量化交易策略开发的核心环节,通过历史数据验证策略的有效性。宽图量化提供了强大的回测系统,帮助您全面评估策略表现。
回测系统概述
我们的回测系统具有以下特点:
- 高精度模拟: 精确模拟真实交易环境
- 多维度分析: 全面的性能指标分析
- 可视化报告: 直观的图表和报告
- 参数优化: 自动寻找最优参数组合
- 风险分析: 详细的风险评估
回测流程
1. 准备阶段
在开始回测之前,需要准备以下内容:
# 回测配置
backtest_config = {
# 基础设置
'symbol': 'BTCUSDT',
'start_date': '2023-01-01',
'end_date': '2023-12-31',
'timeframe': '1h',
# 资金设置
'initial_capital': 10000,
'currency': 'USDT',
# 交易成本
'commission': 0.001, # 0.1% 手续费
'slippage': 0.0005, # 0.05% 滑点
# 其他设置
'benchmark': 'BTCUSDT', # 基准对比
'risk_free_rate': 0.02 # 无风险利率
}
2. 策略实现
class BacktestStrategy(Strategy):
def __init__(self):
super().__init__()
self.setup_parameters()
self.setup_indicators()
def setup_parameters(self):
# 可优化参数
self.short_ma = self.add_parameter('short_ma', 20, 10, 50)
self.long_ma = self.add_parameter('long_ma', 50, 30, 100)
self.stop_loss = self.add_parameter('stop_loss', 0.02, 0.01, 0.05)
self.take_profit = self.add_parameter('take_profit', 0.06, 0.03, 0.10)
def setup_indicators(self):
self.sma_short = self.add_indicator('SMA', period=self.short_ma)
self.sma_long = self.add_indicator('SMA', period=self.long_ma)
self.rsi = self.add_indicator('RSI', period=14)
self.atr = self.add_indicator('ATR', period=14)
def on_bar(self, bar):
# 策略逻辑
if self.should_buy(bar):
self.buy_signal(bar)
elif self.should_sell(bar):
self.sell_signal(bar)
# 风险管理
self.manage_positions(bar)
def should_buy(self, bar):
return (self.sma_short.value > self.sma_long.value and
self.rsi.value < 70 and
self.get_position() == 0)
def should_sell(self, bar):
return (self.sma_short.value < self.sma_long.value and
self.rsi.value > 30 and
self.get_position() > 0)
3. 运行回测
# 创建回测引擎
backtest_engine = BacktestEngine(config=backtest_config)
# 添加策略
strategy = BacktestStrategy()
backtest_engine.add_strategy(strategy)
# 运行回测
results = backtest_engine.run()
# 输出结果
print("回测完成!")
print(f"总收益率: {results['total_return']:.2%}")
print(f"年化收益率: {results['annual_return']:.2%}")
print(f"最大回撤: {results['max_drawdown']:.2%}")
print(f"夏普比率: {results['sharpe_ratio']:.2f}")
性能指标
1. 收益指标
总收益率
def calculate_total_return(initial_capital, final_capital):
return (final_capital - initial_capital) / initial_capital
# 示例
initial_capital = 10000
final_capital = 12500
total_return = calculate_total_return(initial_capital, final_capital)
print(f"总收益率: {total_return:.2%}") # 25.00%
年化收益率
def calculate_annual_return(total_return, days):
return (1 + total_return) ** (365 / days) - 1
# 示例
total_return = 0.25
days = 365
annual_return = calculate_annual_return(total_return, days)
print(f"年化收益率: {annual_return:.2%}") # 25.00%
累计收益曲线
def calculate_cumulative_returns(daily_returns):
cumulative_returns = []
cumulative = 1.0
for daily_return in daily_returns:
cumulative *= (1 + daily_return)
cumulative_returns.append(cumulative - 1)
return cumulative_returns
2. 风险指标
最大回撤
def calculate_max_drawdown(equity_curve):
peak = equity_curve[0]
max_drawdown = 0
for value in equity_curve:
if value > peak:
peak = value
drawdown = (peak - value) / peak
if drawdown > max_drawdown:
max_drawdown = drawdown
return max_drawdown
# 示例
equity_curve = [10000, 11000, 10500, 12000, 11000, 13000]
max_dd = calculate_max_drawdown(equity_curve)
print(f"最大回撤: {max_dd:.2%}")
波动率
import math
def calculate_volatility(returns):
mean_return = sum(returns) / len(returns)
variance = sum((r - mean_return) ** 2 for r in returns) / len(returns)
volatility = math.sqrt(variance) * math.sqrt(252) # 年化波动率
return volatility
VaR (风险价值)
def calculate_var(returns, confidence_level=0.05):
sorted_returns = sorted(returns)
index = int(len(sorted_returns) * confidence_level)
return sorted_returns[index]
# 示例:95%置信度下的VaR
daily_returns = [-0.02, 0.01, -0.01, 0.03, -0.015, 0.02]
var_95 = calculate_var(daily_returns, 0.05)
print(f"95% VaR: {var_95:.2%}")
3. 风险调整收益指标
夏普比率
def calculate_sharpe_ratio(returns, risk_free_rate=0.02):
excess_returns = [r - risk_free_rate/252 for r in returns]
mean_excess_return = sum(excess_returns) / len(excess_returns)
volatility = calculate_volatility(returns) / math.sqrt(252)
if volatility == 0:
return 0
return mean_excess_return / volatility * math.sqrt(252)
索提诺比率
def calculate_sortino_ratio(returns, risk_free_rate=0.02):
excess_returns = [r - risk_free_rate/252 for r in returns]
mean_excess_return = sum(excess_returns) / len(excess_returns)
# 只考虑负收益的波动率
negative_returns = [r for r in excess_returns if r < 0]
if not negative_returns:
return float('inf')
downside_variance = sum(r ** 2 for r in negative_returns) / len(returns)
downside_volatility = math.sqrt(downside_variance) * math.sqrt(252)
return mean_excess_return * math.sqrt(252) / downside_volatility
卡尔马比率
def calculate_calmar_ratio(annual_return, max_drawdown):
if max_drawdown == 0:
return float('inf')
return annual_return / max_drawdown
4. 交易指标
胜率
def calculate_win_rate(trades):
winning_trades = sum(1 for trade in trades if trade.profit > 0)
return winning_trades / len(trades) if trades else 0
盈亏比
def calculate_profit_factor(trades):
gross_profit = sum(trade.profit for trade in trades if trade.profit > 0)
gross_loss = abs(sum(trade.profit for trade in trades if trade.profit < 0))
if gross_loss == 0:
return float('inf')
return gross_profit / gross_loss
平均持仓时间
def calculate_average_holding_period(trades):
if not trades:
return 0
total_holding_time = sum(trade.exit_time - trade.entry_time for trade in trades)
return total_holding_time / len(trades)
回测报告
1. 基础统计报告
def generate_basic_report(results):
report = f"""
==================== 回测报告 ====================
基础信息:
策略名称: {results['strategy_name']}
回测期间: {results['start_date']} - {results['end_date']}
交易对: {results['symbol']}
初始资金: {results['initial_capital']:,.2f} USDT
收益指标:
总收益率: {results['total_return']:.2%}
年化收益率: {results['annual_return']:.2%}
基准收益率: {results['benchmark_return']:.2%}
超额收益: {results['excess_return']:.2%}
风险指标:
最大回撤: {results['max_drawdown']:.2%}
波动率: {results['volatility']:.2%}
95% VaR: {results['var_95']:.2%}
风险调整收益:
夏普比率: {results['sharpe_ratio']:.2f}
索提诺比率: {results['sortino_ratio']:.2f}
卡尔马比率: {results['calmar_ratio']:.2f}
交易统计:
总交易次数: {results['total_trades']}
胜率: {results['win_rate']:.2%}
盈亏比: {results['profit_factor']:.2f}
平均持仓时间: {results['avg_holding_period']:.1f} 小时
==================================================
"""
return report
2. 详细分析报告
def generate_detailed_report(results):
# 月度收益分析
monthly_returns = calculate_monthly_returns(results['daily_returns'])
# 回撤分析
drawdown_periods = analyze_drawdown_periods(results['equity_curve'])
# 交易分析
trade_analysis = analyze_trades(results['trades'])
report = {
'summary': generate_basic_report(results),
'monthly_returns': monthly_returns,
'drawdown_analysis': drawdown_periods,
'trade_analysis': trade_analysis,
'risk_metrics': calculate_risk_metrics(results),
'performance_attribution': analyze_performance_attribution(results)
}
return report
3. 可视化报告
import matplotlib.pyplot as plt
import pandas as pd
def create_performance_charts(results):
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 累计收益曲线
axes[0, 0].plot(results['dates'], results['cumulative_returns'], label='策略')
axes[0, 0].plot(results['dates'], results['benchmark_returns'], label='基准')
axes[0, 0].set_title('累计收益曲线')
axes[0, 0].legend()
axes[0, 0].grid(True)
# 2. 回撤曲线
axes[0, 1].fill_between(results['dates'], results['drawdown'], 0,
color='red', alpha=0.3)
axes[0, 1].set_title('回撤曲线')
axes[0, 1].grid(True)
# 3. 月度收益热力图
monthly_returns = pd.Series(results['monthly_returns'])
monthly_returns.plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('月度收益')
axes[1, 0].tick_params(axis='x', rotation=45)
# 4. 收益分布直方图
axes[1, 1].hist(results['daily_returns'], bins=50, alpha=0.7)
axes[1, 1].set_title('日收益分布')
axes[1, 1].grid(True)
plt.tight_layout()
return fig
参数优化
1. 网格搜索优化
def grid_search_optimization(strategy_class, param_ranges, data):
best_params = None
best_score = -float('inf')
results = []
# 生成所有参数组合
param_combinations = generate_param_combinations(param_ranges)
for params in param_combinations:
# 创建策略实例
strategy = strategy_class(**params)
# 运行回测
backtest_result = run_backtest(strategy, data)
# 评估性能(使用夏普比率作为优化目标)
score = backtest_result['sharpe_ratio']
results.append({
'params': params,
'score': score,
'result': backtest_result
})
if score > best_score:
best_score = score
best_params = params
return best_params, results
# 使用示例
param_ranges = {
'short_ma': range(10, 31, 5),
'long_ma': range(40, 101, 10),
'stop_loss': [0.01, 0.02, 0.03],
'take_profit': [0.04, 0.06, 0.08]
}
best_params, all_results = grid_search_optimization(
MyStrategy, param_ranges, historical_data
)
2. 遗传算法优化
import random
class GeneticOptimizer:
def __init__(self, strategy_class, param_bounds, population_size=50):
self.strategy_class = strategy_class
self.param_bounds = param_bounds
self.population_size = population_size
def create_individual(self):
individual = {}
for param_name, (min_val, max_val) in self.param_bounds.items():
if isinstance(min_val, int):
individual[param_name] = random.randint(min_val, max_val)
else:
individual[param_name] = random.uniform(min_val, max_val)
return individual
def evaluate_fitness(self, individual, data):
strategy = self.strategy_class(**individual)
result = run_backtest(strategy, data)
return result['sharpe_ratio']
def crossover(self, parent1, parent2):
child = {}
for param_name in parent1:
if random.random() < 0.5:
child[param_name] = parent1[param_name]
else:
child[param_name] = parent2[param_name]
return child
def mutate(self, individual, mutation_rate=0.1):
for param_name, value in individual.items():
if random.random() < mutation_rate:
min_val, max_val = self.param_bounds[param_name]
if isinstance(min_val, int):
individual[param_name] = random.randint(min_val, max_val)
else:
individual[param_name] = random.uniform(min_val, max_val)
return individual
def optimize(self, data, generations=100):
# 初始化种群
population = [self.create_individual() for _ in range(self.population_size)]
for generation in range(generations):
# 评估适应度
fitness_scores = [self.evaluate_fitness(ind, data) for ind in population]
# 选择、交叉、变异
new_population = []
for _ in range(self.population_size):
parent1 = self.tournament_selection(population, fitness_scores)
parent2 = self.tournament_selection(population, fitness_scores)
child = self.crossover(parent1, parent2)
child = self.mutate(child)
new_population.append(child)
population = new_population
# 返回最优个体
final_fitness = [self.evaluate_fitness(ind, data) for ind in population]
best_index = final_fitness.index(max(final_fitness))
return population[best_index]
3. 贝叶斯优化
from skopt import gp_minimize
from skopt.space import Real, Integer
def bayesian_optimization(strategy_class, data):
# 定义搜索空间
space = [
Integer(10, 30, name='short_ma'),
Integer(40, 100, name='long_ma'),
Real(0.01, 0.05, name='stop_loss'),
Real(0.03, 0.10, name='take_profit')
]
# 定义目标函数
def objective(params):
short_ma, long_ma, stop_loss, take_profit = params
strategy = strategy_class(
short_ma=short_ma,
long_ma=long_ma,
stop_loss=stop_loss,
take_profit=take_profit
)
result = run_backtest(strategy, data)
return -result['sharpe_ratio'] # 最小化负夏普比率
# 执行优化
result = gp_minimize(
func=objective,
dimensions=space,
n_calls=100,
random_state=42
)
return {
'short_ma': result.x[0],
'long_ma': result.x[1],
'stop_loss': result.x[2],
'take_profit': result.x[3]
}
回测最佳实践
1. 数据质量控制
def validate_data_quality(data):
issues = []
# 检查缺失数据
missing_data = data.isnull().sum()
if missing_data.any():
issues.append(f"发现缺失数据: {missing_data}")
# 检查异常价格
for column in ['open', 'high', 'low', 'close']:
if (data[column] <= 0).any():
issues.append(f"{column} 列存在非正数价格")
# 检查价格逻辑
invalid_ohlc = (data['high'] < data['low']) | (data['high'] < data['open']) | (data['high'] < data['close']) | (data['low'] > data['open']) | (data['low'] > data['close'])
if invalid_ohlc.any():
issues.append("发现不合理的OHLC价格关系")
# 检查异常波动
returns = data['close'].pct_change()
extreme_returns = returns.abs() > 0.5 # 50%的单日波动
if extreme_returns.any():
issues.append(f"发现异常波动: {extreme_returns.sum()} 个数据点")
return issues
2. 避免前瞻偏差
class BacktestEngine:
def __init__(self):
self.current_time = None
self.available_data = {}
def get_historical_data(self, symbol, end_time, periods):
# 只返回当前时间之前的数据
historical_data = self.data[self.data.index < end_time].tail(periods)
return historical_data
def calculate_indicator(self, data, indicator_type, **params):
# 确保指标计算不使用未来数据
if self.current_time is not None:
data = data[data.index <= self.current_time]
return self.indicator_calculator.calculate(data, indicator_type, **params)
3. 交易成本建模
class TradingCostModel:
def __init__(self, commission_rate=0.001, slippage_rate=0.0005):
self.commission_rate = commission_rate
self.slippage_rate = slippage_rate
def calculate_execution_price(self, order_price, order_side, market_impact=0):
# 滑点影响
if order_side == 'BUY':
execution_price = order_price * (1 + self.slippage_rate + market_impact)
else:
execution_price = order_price * (1 - self.slippage_rate - market_impact)
return execution_price
def calculate_commission(self, trade_value):
return trade_value * self.commission_rate
def calculate_total_cost(self, trade_value, order_side, market_impact=0):
commission = self.calculate_commission(trade_value)
slippage_cost = trade_value * (self.slippage_rate + market_impact)
return commission + slippage_cost
4. 样本外验证
def walk_forward_analysis(strategy_class, data, optimization_window=252,
test_window=63, step_size=21):
results = []
for start_idx in range(0, len(data) - optimization_window - test_window, step_size):
# 优化期数据
opt_end_idx = start_idx + optimization_window
optimization_data = data.iloc[start_idx:opt_end_idx]
# 测试期数据
test_end_idx = opt_end_idx + test_window
test_data = data.iloc[opt_end_idx:test_end_idx]
# 在优化期数据上优化参数
best_params = optimize_parameters(strategy_class, optimization_data)
# 在测试期数据上验证策略
strategy = strategy_class(**best_params)
test_result = run_backtest(strategy, test_data)
results.append({
'optimization_period': (start_idx, opt_end_idx),
'test_period': (opt_end_idx, test_end_idx),
'best_params': best_params,
'test_result': test_result
})
return results
常见回测陷阱
1. 过度拟合
问题: 参数过度优化,在历史数据上表现完美但实盘表现差
解决方案:
- 使用样本外数据验证
- 限制参数优化的复杂度
- 使用交叉验证
- 关注参数稳定性
2. 生存偏差
问题: 只使用仍在交易的品种数据,忽略了退市的品种
解决方案:
- 使用完整的历史数据集
- 考虑品种的生命周期
- 分析退市品种的影响
3. 数据窥探偏差
问题: 反复查看数据和结果,无意中调整策略
解决方案:
- 预先定义策略规则
- 限制回测次数
- 使用盲测数据集
下一步
掌握了回测系统后,建议您:
重要提示: 回测结果不代表未来表现。历史数据的优异表现不能保证实盘交易的成功。请务必进行充分的样本外验证,并从小额资金开始实盘测试。