模型训练
机器学习模型训练是量化交易中的高级技术,通过训练模型来预测价格走势或识别交易机会,可以显著提升策略的表现。
模型训练概述
模型训练包含以下关键步骤:
- 数据准备: 清洗和预处理训练数据
- 特征工程: 构造和选择有效特征
- 模型选择: 选择合适的机器学习算法
- 训练优化: 调整超参数和优化模型
- 模型验证: 评估模型性能和泛化能力
- 模型部署: 将模型集成到交易系统
数据准备
1. 数据预处理
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split, TimeSeriesSplit
class DataPreprocessor:
def __init__(self):
self.scalers = {}
self.feature_columns = []
self.target_column = None
def prepare_training_data(self, df, target_column, feature_columns=None,
lookback_window=20, prediction_horizon=1):
"""准备训练数据"""
# 如果没有指定特征列,使用所有数值列
if feature_columns is None:
feature_columns = df.select_dtypes(include=[np.number]).columns.tolist()
if target_column in feature_columns:
feature_columns.remove(target_column)
self.feature_columns = feature_columns
self.target_column = target_column
# 创建滞后特征
X = self.create_lagged_features(df[feature_columns], lookback_window)
# 创建目标变量
y = self.create_target_variable(df[target_column], prediction_horizon)
# 对齐数据
min_length = min(len(X), len(y))
X = X.iloc[-min_length:]
y = y.iloc[-min_length:]
# 移除缺失值
valid_indices = ~(X.isnull().any(axis=1) | y.isnull())
X = X[valid_indices]
y = y[valid_indices]
return X, y
def create_lagged_features(self, df, lookback_window):
"""创建滞后特征"""
lagged_features = []
for col in df.columns:
for lag in range(1, lookback_window + 1):
lagged_col = f"{col}_lag_{lag}"
lagged_features.append(df[col].shift(lag).rename(lagged_col))
return pd.concat(lagged_features, axis=1)
def create_target_variable(self, series, prediction_horizon):
"""创建目标变量"""
# 预测未来收益率
future_returns = series.pct_change(prediction_horizon).shift(-prediction_horizon)
return future_returns
def scale_features(self, X_train, X_test=None, method='standard'):
"""特征缩放"""
if method == 'standard':
scaler = StandardScaler()
elif method == 'minmax':
scaler = MinMaxScaler()
else:
raise ValueError(f"Unsupported scaling method: {method}")
# 拟合训练数据
X_train_scaled = pd.DataFrame(
scaler.fit_transform(X_train),
columns=X_train.columns,
index=X_train.index
)
self.scalers['features'] = scaler
if X_test is not None:
X_test_scaled = pd.DataFrame(
scaler.transform(X_test),
columns=X_test.columns,
index=X_test.index
)
return X_train_scaled, X_test_scaled
return X_train_scaled
def create_time_series_splits(self, X, y, n_splits=5, test_size=0.2):
"""创建时间序列交叉验证分割"""
tscv = TimeSeriesSplit(n_splits=n_splits, test_size=int(len(X) * test_size))
splits = []
for train_idx, test_idx in tscv.split(X):
splits.append({
'X_train': X.iloc[train_idx],
'X_test': X.iloc[test_idx],
'y_train': y.iloc[train_idx],
'y_test': y.iloc[test_idx]
})
return splits
# 使用示例
preprocessor = DataPreprocessor()
# 假设有价格数据
df = pd.DataFrame({
'close': np.random.randn(1000).cumsum() + 100,
'volume': np.random.randint(1000, 10000, 1000),
'rsi': np.random.uniform(20, 80, 1000)
})
# 准备训练数据
X, y = preprocessor.prepare_training_data(
df,
target_column='close',
lookback_window=10,
prediction_horizon=1
)
print(f"特征矩阵形状: {X.shape}")
print(f"目标变量形状: {y.shape}")
2. 数据增强
class DataAugmentation:
def __init__(self):
pass
def add_noise(self, X, noise_level=0.01):
"""添加噪声增强数据"""
noise = np.random.normal(0, noise_level, X.shape)
return X + noise
def time_warping(self, X, warping_factor=0.1):
"""时间扭曲增强"""
# 简化版本的时间扭曲
warped_X = X.copy()
for i in range(len(X)):
if np.random.random() < warping_factor:
# 随机选择一个时间点进行微调
shift = np.random.randint(-2, 3)
if 0 <= i + shift < len(X):
warped_X.iloc[i] = X.iloc[i + shift]
return warped_X
def bootstrap_sampling(self, X, y, n_samples=None):
"""自助采样增强"""
if n_samples is None:
n_samples = len(X)
indices = np.random.choice(len(X), n_samples, replace=True)
return X.iloc[indices], y.iloc[indices]
def synthetic_minority_oversampling(self, X, y, threshold=0.1):
"""合成少数类过采样(SMOTE简化版)"""
# 识别少数类样本
positive_samples = y > threshold
negative_samples = y < -threshold
if positive_samples.sum() < negative_samples.sum():
minority_class = positive_samples
else:
minority_class = negative_samples
# 为少数类生成合成样本
minority_X = X[minority_class]
minority_y = y[minority_class]
synthetic_X = []
synthetic_y = []
for i in range(len(minority_X)):
# 找到最近邻
distances = np.sum((minority_X.values - minority_X.iloc[i].values) ** 2, axis=1)
nearest_idx = np.argsort(distances)[1] # 排除自己
# 生成合成样本
alpha = np.random.random()
synthetic_sample = (minority_X.iloc[i] +
alpha * (minority_X.iloc[nearest_idx] - minority_X.iloc[i]))
synthetic_target = (minority_y.iloc[i] +
alpha * (minority_y.iloc[nearest_idx] - minority_y.iloc[i]))
synthetic_X.append(synthetic_sample)
synthetic_y.append(synthetic_target)
# 合并原始数据和合成数据
augmented_X = pd.concat([X, pd.DataFrame(synthetic_X)], ignore_index=True)
augmented_y = pd.concat([y, pd.Series(synthetic_y)], ignore_index=True)
return augmented_X, augmented_y
模型选择
1. 传统机器学习模型
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import xgboost as xgb
import lightgbm as lgb
class TraditionalMLModels:
def __init__(self):
self.models = {}
self.trained_models = {}
def initialize_models(self):
"""初始化模型"""
self.models = {
'linear_regression': LinearRegression(),
'ridge': Ridge(alpha=1.0),
'lasso': Lasso(alpha=1.0),
'random_forest': RandomForestRegressor(
n_estimators=100,
max_depth=10,
random_state=42
),
'gradient_boosting': GradientBoostingRegressor(
n_estimators=100,
max_depth=6,
random_state=42
),
'xgboost': xgb.XGBRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42
),
'lightgbm': lgb.LGBMRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42
),
'svr': SVR(kernel='rbf', C=1.0, gamma='scale')
}
def train_model(self, model_name, X_train, y_train):
"""训练单个模型"""
if model_name not in self.models:
raise ValueError(f"Model {model_name} not found")
model = self.models[model_name]
model.fit(X_train, y_train)
self.trained_models[model_name] = model
return model
def train_all_models(self, X_train, y_train):
"""训练所有模型"""
self.initialize_models()
for model_name in self.models:
print(f"Training {model_name}...")
self.train_model(model_name, X_train, y_train)
print("All models trained successfully!")
def evaluate_model(self, model_name, X_test, y_test):
"""评估模型性能"""
if model_name not in self.trained_models:
raise ValueError(f"Model {model_name} not trained")
model = self.trained_models[model_name]
y_pred = model.predict(X_test)
metrics = {
'mse': mean_squared_error(y_test, y_pred),
'mae': mean_absolute_error(y_test, y_pred),
'r2': r2_score(y_test, y_pred),
'rmse': np.sqrt(mean_squared_error(y_test, y_pred))
}
return metrics, y_pred
def compare_models(self, X_test, y_test):
"""比较所有模型性能"""
results = {}
for model_name in self.trained_models:
metrics, _ = self.evaluate_model(model_name, X_test, y_test)
results[model_name] = metrics
# 创建比较表
comparison_df = pd.DataFrame(results).T
comparison_df = comparison_df.sort_values('r2', ascending=False)
return comparison_df
def get_feature_importance(self, model_name, feature_names):
"""获取特征重要性"""
if model_name not in self.trained_models:
raise ValueError(f"Model {model_name} not trained")
model = self.trained_models[model_name]
if hasattr(model, 'feature_importances_'):
importance = model.feature_importances_
elif hasattr(model, 'coef_'):
importance = np.abs(model.coef_)
else:
return None
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': importance
}).sort_values('importance', ascending=False)
return feature_importance
# 使用示例
ml_models = TraditionalMLModels()
# 训练所有模型
ml_models.train_all_models(X_train, y_train)
# 比较模型性能
comparison = ml_models.compare_models(X_test, y_test)
print(comparison)
2. 深度学习模型
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, GRU, Dropout, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
class DeepLearningModels:
def __init__(self):
self.models = {}
self.history = {}
def create_mlp_model(self, input_shape, hidden_layers=[128, 64, 32]):
"""创建多层感知机模型"""
model = Sequential()
model.add(Dense(hidden_layers[0], activation='relu', input_shape=(input_shape,)))
model.add(Dropout(0.2))
for units in hidden_layers[1:]:
model.add(Dense(units, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(1, activation='linear'))
model.compile(
optimizer=Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
def create_lstm_model(self, input_shape, lstm_units=[50, 50]):
"""创建LSTM模型"""
model = Sequential()
# 第一个LSTM层
model.add(LSTM(
lstm_units[0],
return_sequences=len(lstm_units) > 1,
input_shape=input_shape
))
model.add(Dropout(0.2))
# 额外的LSTM层
for i, units in enumerate(lstm_units[1:], 1):
return_sequences = i < len(lstm_units) - 1
model.add(LSTM(units, return_sequences=return_sequences))
model.add(Dropout(0.2))
# 输出层
model.add(Dense(1, activation='linear'))
model.compile(
optimizer=Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
def create_cnn_model(self, input_shape, filters=[32, 64], kernel_size=3):
"""创建CNN模型"""
model = Sequential()
# 卷积层
model.add(Conv1D(
filters[0],
kernel_size,
activation='relu',
input_shape=input_shape
))
model.add(MaxPooling1D(pool_size=2))
for f in filters[1:]:
model.add(Conv1D(f, kernel_size, activation='relu'))
model.add(MaxPooling1D(pool_size=2))
# 展平和全连接层
model.add(Flatten())
model.add(Dense(50, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(1, activation='linear'))
model.compile(
optimizer=Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
def prepare_lstm_data(self, X, y, sequence_length=20):
"""为LSTM准备序列数据"""
X_sequences = []
y_sequences = []
for i in range(sequence_length, len(X)):
X_sequences.append(X.iloc[i-sequence_length:i].values)
y_sequences.append(y.iloc[i])
return np.array(X_sequences), np.array(y_sequences)
def train_model(self, model_name, model, X_train, y_train,
X_val=None, y_val=None, epochs=100, batch_size=32):
"""训练深度学习模型"""
# 设置回调函数
callbacks = [
EarlyStopping(
monitor='val_loss' if X_val is not None else 'loss',
patience=10,
restore_best_weights=True
),
ReduceLROnPlateau(
monitor='val_loss' if X_val is not None else 'loss',
factor=0.5,
patience=5,
min_lr=1e-7
)
]
# 训练模型
if X_val is not None and y_val is not None:
history = model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=1
)
else:
history = model.fit(
X_train, y_train,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=1
)
self.models[model_name] = model
self.history[model_name] = history
return model, history
def evaluate_model(self, model_name, X_test, y_test):
"""评估深度学习模型"""
if model_name not in self.models:
raise ValueError(f"Model {model_name} not found")
model = self.models[model_name]
# 预测
y_pred = model.predict(X_test)
# 计算指标
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
return {
'mse': mse,
'mae': mae,
'r2': r2,
'rmse': np.sqrt(mse)
}, y_pred.flatten()
def plot_training_history(self, model_name):
"""绘制训练历史"""
if model_name not in self.history:
raise ValueError(f"No training history for model {model_name}")
history = self.history[model_name]
import matplotlib.pyplot as plt
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# 损失曲线
ax1.plot(history.history['loss'], label='Training Loss')
if 'val_loss' in history.history:
ax1.plot(history.history['val_loss'], label='Validation Loss')
ax1.set_title('Model Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# MAE曲线
ax2.plot(history.history['mae'], label='Training MAE')
if 'val_mae' in history.history:
ax2.plot(history.history['val_mae'], label='Validation MAE')
ax2.set_title('Model MAE')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('MAE')
ax2.legend()
plt.tight_layout()
return fig
# 使用示例
dl_models = DeepLearningModels()
# 创建和训练MLP模型
mlp_model = dl_models.create_mlp_model(X_train.shape[1])
dl_models.train_model('mlp', mlp_model, X_train, y_train, X_test, y_test)
# 为LSTM准备数据
X_lstm, y_lstm = dl_models.prepare_lstm_data(X, y, sequence_length=20)
X_train_lstm, X_test_lstm, y_train_lstm, y_test_lstm = train_test_split(
X_lstm, y_lstm, test_size=0.2, shuffle=False
)
# 创建和训练LSTM模型
lstm_model = dl_models.create_lstm_model((20, X_train.shape[1]))
dl_models.train_model('lstm', lstm_model, X_train_lstm, y_train_lstm,
X_test_lstm, y_test_lstm)
超参数优化
1. 网格搜索
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import make_scorer
class HyperparameterOptimizer:
def __init__(self):
self.best_params = {}
self.best_scores = {}
def grid_search_optimization(self, model, param_grid, X_train, y_train,
cv=5, scoring='neg_mean_squared_error'):
"""网格搜索优化"""
# 创建时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=cv)
# 执行网格搜索
grid_search = GridSearchCV(
estimator=model,
param_grid=param_grid,
cv=tscv,
scoring=scoring,
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
return grid_search.best_params_, grid_search.best_score_, grid_search
def random_search_optimization(self, model, param_distributions, X_train, y_train,
n_iter=100, cv=5, scoring='neg_mean_squared_error'):
"""随机搜索优化"""
tscv = TimeSeriesSplit(n_splits=cv)
random_search = RandomizedSearchCV(
estimator=model,
param_distributions=param_distributions,
n_iter=n_iter,
cv=tscv,
scoring=scoring,
n_jobs=-1,
verbose=1,
random_state=42
)
random_search.fit(X_train, y_train)
return random_search.best_params_, random_search.best_score_, random_search
def bayesian_optimization(self, model_func, param_bounds, X_train, y_train, n_calls=50):
"""贝叶斯优化"""
try:
from skopt import gp_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args
# 定义目标函数
@use_named_args(param_bounds)
def objective(**params):
model = model_func(**params)
# 使用交叉验证评估
tscv = TimeSeriesSplit(n_splits=3)
scores = []
for train_idx, val_idx in tscv.split(X_train):
X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
model.fit(X_tr, y_tr)
y_pred = model.predict(X_val)
score = mean_squared_error(y_val, y_pred)
scores.append(score)
return np.mean(scores)
# 执行贝叶斯优化
result = gp_minimize(
func=objective,
dimensions=param_bounds,
n_calls=n_calls,
random_state=42
)
# 提取最佳参数
best_params = {}
for i, param in enumerate(param_bounds):
best_params[param.name] = result.x[i]
return best_params, result.fun, result
except ImportError:
print("scikit-optimize not installed. Please install it for Bayesian optimization.")
return None, None, None
# 使用示例
optimizer = HyperparameterOptimizer()
# XGBoost参数网格
xgb_param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 0.9, 1.0]
}
# 执行网格搜索
best_params, best_score, grid_search = optimizer.grid_search_optimization(
xgb.XGBRegressor(random_state=42),
xgb_param_grid,
X_train, y_train
)
print(f"Best parameters: {best_params}")
print(f"Best score: {best_score}")
2. 自动机器学习 (AutoML)
class AutoMLPipeline:
def __init__(self):
self.best_model = None
self.best_score = float('-inf')
self.model_results = {}
def auto_feature_selection(self, X, y, max_features=50):
"""自动特征选择"""
from sklearn.feature_selection import SelectKBest, f_regression
selector = SelectKBest(score_func=f_regression, k=min(max_features, X.shape[1]))
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
return pd.DataFrame(X_selected, columns=selected_features, index=X.index)
def auto_model_selection(self, X_train, y_train, X_test, y_test):
"""自动模型选择"""
# 定义候选模型
models = {
'linear_regression': LinearRegression(),
'ridge': Ridge(),
'random_forest': RandomForestRegressor(n_estimators=100, random_state=42),
'xgboost': xgb.XGBRegressor(n_estimators=100, random_state=42),
'lightgbm': lgb.LGBMRegressor(n_estimators=100, random_state=42)
}
# 训练和评估每个模型
for name, model in models.items():
print(f"Training {name}...")
# 训练模型
model.fit(X_train, y_train)
# 预测和评估
y_pred = model.predict(X_test)
score = r2_score(y_test, y_pred)
self.model_results[name] = {
'model': model,
'score': score,
'predictions': y_pred
}
# 更新最佳模型
if score > self.best_score:
self.best_score = score
self.best_model = model
return self.best_model, self.best_score
def auto_ensemble(self, X_train, y_train, X_test, y_test):
"""自动集成学习"""
from sklearn.ensemble import VotingRegressor
# 选择表现最好的几个模型
sorted_models = sorted(self.model_results.items(),
key=lambda x: x[1]['score'], reverse=True)
top_models = sorted_models[:3] # 选择前3个模型
# 创建投票回归器
estimators = [(name, result['model']) for name, result in top_models]
ensemble = VotingRegressor(estimators=estimators)
# 训练集成模型
ensemble.fit(X_train, y_train)
# 评估集成模型
y_pred_ensemble = ensemble.predict(X_test)
ensemble_score = r2_score(y_test, y_pred_ensemble)
print(f"Ensemble R² score: {ensemble_score:.4f}")
return ensemble, ensemble_score
def run_automl_pipeline(self, X, y, test_size=0.2):
"""运行完整的AutoML流水线"""
print("Starting AutoML Pipeline...")
# 1. 数据分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, shuffle=False
)
# 2. 自动特征选择
print("Performing automatic feature selection...")
X_train_selected = self.auto_feature_selection(X_train, y_train)
X_test_selected = X_test[X_train_selected.columns]
# 3. 自动模型选择
print("Performing automatic model selection...")
best_model, best_score = self.auto_model_selection(
X_train_selected, y_train, X_test_selected, y_test
)
# 4. 自动集成
print("Creating ensemble model...")
ensemble_model, ensemble_score = self.auto_ensemble(
X_train_selected, y_train, X_test_selected, y_test
)
# 5. 选择最终模型
if ensemble_score > best_score:
final_model = ensemble_model
final_score = ensemble_score
print(f"Ensemble model selected with R² score: {final_score:.4f}")
else:
final_model = best_model
final_score = best_score
print(f"Best individual model selected with R² score: {final_score:.4f}")
return final_model, final_score, X_train_selected.columns.tolist()
# 使用示例
automl = AutoMLPipeline()
final_model, final_score, selected_features = automl.run_automl_pipeline(X, y)
模型验证
1. 交叉验证
class ModelValidator:
def __init__(self):
self.validation_results = {}
def time_series_cross_validation(self, model, X, y, n_splits=5, test_size=0.2):
"""时间序列交叉验证"""
tscv = TimeSeriesSplit(n_splits=n_splits, test_size=int(len(X) * test_size))
scores = []
predictions = []
for fold, (train_idx, test_idx) in enumerate(tscv.split(X)):
print(f"Fold {fold + 1}/{n_splits}")
# 分割数据
X_train_fold = X.iloc[train_idx]
X_test_fold = X.iloc[test_idx]
y_train_fold = y.iloc[train_idx]
y_test_fold = y.iloc[test_idx]
# 训练模型
model_copy = clone(model)
model_copy.fit(X_train_fold, y_train_fold)
# 预测和评估
y_pred_fold = model_copy.predict(X_test_fold)
score = r2_score(y_test_fold, y_pred_fold)
scores.append(score)
predictions.extend(list(zip(test_idx, y_test_fold, y_pred_fold)))
return {
'scores': scores,
'mean_score': np.mean(scores),
'std_score': np.std(scores),
'predictions': predictions
}
def walk_forward_validation(self, model, X, y, initial_window=252, step_size=21):
"""滚动窗口验证"""
scores = []
predictions = []
for start in range(0, len(X) - initial_window, step_size):
end_train = start + initial_window
end_test = min(end_train + step_size, len(X))
if end_test <= end_train:
break
# 分割数据
X_train = X.iloc[start:end_train]
X_test = X.iloc[end_train:end_test]
y_train = y.iloc[start:end_train]
y_test = y.iloc[end_train:end_test]
# 训练和预测
model_copy = clone(model)
model_copy.fit(X_train, y_train)
y_pred = model_copy.predict(X_test)
# 评估
score = r2_score(y_test, y_pred)
scores.append(score)
# 保存预测结果
for i, (true_val, pred_val) in enumerate(zip(y_test, y_pred)):
predictions.append((end_train + i, true_val, pred_val))
return {
'scores': scores,
'mean_score': np.mean(scores),
'std_score': np.std(scores),
'predictions': predictions
}
def bootstrap_validation(self, model, X, y, n_bootstrap=100, sample_ratio=0.8):
"""自助法验证"""
scores = []
for i in range(n_bootstrap):
# 自助采样
n_samples = int(len(X) * sample_ratio)
indices = np.random.choice(len(X), n_samples, replace=True)
X_bootstrap = X.iloc[indices]
y_bootstrap = y.iloc[indices]
# 创建训练集和测试集
out_of_bag_indices = list(set(range(len(X))) - set(indices))
if len(out_of_bag_indices) == 0:
continue
X_test = X.iloc[out_of_bag_indices]
y_test = y.iloc[out_of_bag_indices]
# 训练和评估
model_copy = clone(model)
model_copy.fit(X_bootstrap, y_bootstrap)
y_pred = model_copy.predict(X_test)
score = r2_score(y_test, y_pred)
scores.append(score)
return {
'scores': scores,
'mean_score': np.mean(scores),
'std_score': np.std(scores),
'confidence_interval': np.percentile(scores, [2.5, 97.5])
}
# 使用示例
validator = ModelValidator()
# 时间序列交叉验证
tscv_results = validator.time_series_cross_validation(
RandomForestRegressor(n_estimators=100, random_state=42),
X, y, n_splits=5
)
print(f"Time Series CV Score: {tscv_results['mean_score']:.4f} ± {tscv_results['std_score']:.4f}")
模型部署
1. 模型保存和加载
import joblib
import pickle
import json
from datetime import datetime
class ModelDeployment:
def __init__(self):
self.model_metadata = {}
def save_model(self, model, model_name, feature_columns, scaler=None,
model_type='sklearn', save_path='models/'):
"""保存模型"""
import os
# 创建保存目录
os.makedirs(save_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
model_filename = f"{model_name}_{timestamp}"
# 保存模型
if model_type == 'sklearn':
model_path = os.path.join(save_path, f"{model_filename}.joblib")
joblib.dump(model, model_path)
elif model_type == 'tensorflow':
model_path = os.path.join(save_path, f"{model_filename}.h5")
model.save(model_path)
else:
model_path = os.path.join(save_path, f"{model_filename}.pkl")
with open(model_path, 'wb') as f:
pickle.dump(model, f)
# 保存预处理器
if scaler is not None:
scaler_path = os.path.join(save_path, f"{model_filename}_scaler.joblib")
joblib.dump(scaler, scaler_path)
# 保存元数据
metadata = {
'model_name': model_name,
'model_type': model_type,
'feature_columns': feature_columns,
'timestamp': timestamp,
'model_path': model_path,
'scaler_path': scaler_path if scaler else None
}
metadata_path = os.path.join(save_path, f"{model_filename}_metadata.json")
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
self.model_metadata[model_name] = metadata
return model_path, metadata_path
def load_model(self, metadata_path):
"""加载模型"""
# 加载元数据
with open(metadata_path, 'r') as f:
metadata = json.load(f)
# 加载模型
model_type = metadata['model_type']
model_path = metadata['model_path']
if model_type == 'sklearn':
model = joblib.load(model_path)
elif model_type == 'tensorflow':
model = tf.keras.models.load_model(model_path)
else:
with open(model_path, 'rb') as f:
model = pickle.load(f)
# 加载预处理器
scaler = None
if metadata['scaler_path']:
scaler = joblib.load(metadata['scaler_path'])
return model, scaler, metadata
def create_prediction_pipeline(self, model, scaler, feature_columns):
"""创建预测流水线"""
def predict(X):
# 确保特征顺序正确
X_ordered = X[feature_columns]
# 应用预处理
if scaler:
X_scaled = scaler.transform(X_ordered)
X_processed = pd.DataFrame(X_scaled, columns=feature_columns, index=X.index)
else:
X_processed = X_ordered
# 预测
predictions = model.predict(X_processed)
return predictions
return predict
def model_monitoring(self, model, X_new, y_new, baseline_metrics):
"""模型监控"""
# 预测新数据
y_pred_new = model.predict(X_new)
# 计算新指标
new_metrics = {
'mse': mean_squared_error(y_new, y_pred_new),
'mae': mean_absolute_error(y_new, y_pred_new),
'r2': r2_score(y_new, y_pred_new)
}
# 检查性能退化
alerts = []
for metric, new_value in new_metrics.items():
baseline_value = baseline_metrics.get(metric, 0)
if metric in ['mse', 'mae']: # 越小越好
if new_value > baseline_value * 1.2: # 性能下降20%
alerts.append(f"{metric} increased by {((new_value/baseline_value - 1) * 100):.1f}%")
else: # r2越大越好
if new_value < baseline_value * 0.8: # 性能下降20%
alerts.append(f"{metric} decreased by {((1 - new_value/baseline_value) * 100):.1f}%")
return new_metrics, alerts
# 使用示例
deployment = ModelDeployment()
# 保存模型
model_path, metadata_path = deployment.save_model(
final_model,
'price_prediction_model',
selected_features,
scaler=preprocessor.scalers.get('features'),
model_type='sklearn'
)
# 加载模型
loaded_model, loaded_scaler, metadata = deployment.load_model(metadata_path)
# 创建预测流水线
predict_pipeline = deployment.create_prediction_pipeline(
loaded_model, loaded_scaler, metadata['feature_columns']
)
# 使用流水线进行预测
new_predictions = predict_pipeline(X_test)
模型训练最佳实践
1. 训练流程
- 数据质量检查: 确保数据的准确性和完整性
- 特征工程: 构造有意义的特征
- 模型选择: 选择合适的算法
- 超参数优化: 调整模型参数
- 交叉验证: 评估模型泛化能力
- 模型集成: 结合多个模型提高性能
- 部署监控: 监控模型在生产环境的表现
2. 常见陷阱
陷阱 | 描述 | 解决方案 |
---|---|---|
数据泄露 | 使用未来信息训练模型 | 严格控制时间顺序 |
过拟合 | 模型在训练集上表现好但泛化差 | 交叉验证和正则化 |
样本偏差 | 训练数据不代表真实分布 | 使用更多样化的数据 |
特征漂移 | 特征分布随时间变化 | 定期重训练模型 |
3. 性能优化
- 数据并行: 使用多进程处理数据
- 模型并行: 分布式训练大型模型
- 增量学习: 在线更新模型参数
- 模型压缩: 减少模型大小和推理时间
下一步
掌握了模型训练后,建议您:
重要提示: 机器学习模型在金融市场中的应用需要特别谨慎。市场环境会发生变化,模型需要定期重训练和验证。请始终结合传统的风险管理方法,不要完全依赖模型预测。