模型训练

机器学习模型训练是量化交易中的高级技术,通过训练模型来预测价格走势或识别交易机会,可以显著提升策略的表现。

模型训练概述

模型训练包含以下关键步骤:

  • 数据准备: 清洗和预处理训练数据
  • 特征工程: 构造和选择有效特征
  • 模型选择: 选择合适的机器学习算法
  • 训练优化: 调整超参数和优化模型
  • 模型验证: 评估模型性能和泛化能力
  • 模型部署: 将模型集成到交易系统

数据准备

1. 数据预处理

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split, TimeSeriesSplit

class DataPreprocessor:
    def __init__(self):
        self.scalers = {}
        self.feature_columns = []
        self.target_column = None
    
    def prepare_training_data(self, df, target_column, feature_columns=None, 
                            lookback_window=20, prediction_horizon=1):
        """准备训练数据"""
        # 如果没有指定特征列,使用所有数值列
        if feature_columns is None:
            feature_columns = df.select_dtypes(include=[np.number]).columns.tolist()
            if target_column in feature_columns:
                feature_columns.remove(target_column)
        
        self.feature_columns = feature_columns
        self.target_column = target_column
        
        # 创建滞后特征
        X = self.create_lagged_features(df[feature_columns], lookback_window)
        
        # 创建目标变量
        y = self.create_target_variable(df[target_column], prediction_horizon)
        
        # 对齐数据
        min_length = min(len(X), len(y))
        X = X.iloc[-min_length:]
        y = y.iloc[-min_length:]
        
        # 移除缺失值
        valid_indices = ~(X.isnull().any(axis=1) | y.isnull())
        X = X[valid_indices]
        y = y[valid_indices]
        
        return X, y
    
    def create_lagged_features(self, df, lookback_window):
        """创建滞后特征"""
        lagged_features = []
        
        for col in df.columns:
            for lag in range(1, lookback_window + 1):
                lagged_col = f"{col}_lag_{lag}"
                lagged_features.append(df[col].shift(lag).rename(lagged_col))
        
        return pd.concat(lagged_features, axis=1)
    
    def create_target_variable(self, series, prediction_horizon):
        """创建目标变量"""
        # 预测未来收益率
        future_returns = series.pct_change(prediction_horizon).shift(-prediction_horizon)
        return future_returns
    
    def scale_features(self, X_train, X_test=None, method='standard'):
        """特征缩放"""
        if method == 'standard':
            scaler = StandardScaler()
        elif method == 'minmax':
            scaler = MinMaxScaler()
        else:
            raise ValueError(f"Unsupported scaling method: {method}")
        
        # 拟合训练数据
        X_train_scaled = pd.DataFrame(
            scaler.fit_transform(X_train),
            columns=X_train.columns,
            index=X_train.index
        )
        
        self.scalers['features'] = scaler
        
        if X_test is not None:
            X_test_scaled = pd.DataFrame(
                scaler.transform(X_test),
                columns=X_test.columns,
                index=X_test.index
            )
            return X_train_scaled, X_test_scaled
        
        return X_train_scaled
    
    def create_time_series_splits(self, X, y, n_splits=5, test_size=0.2):
        """创建时间序列交叉验证分割"""
        tscv = TimeSeriesSplit(n_splits=n_splits, test_size=int(len(X) * test_size))
        
        splits = []
        for train_idx, test_idx in tscv.split(X):
            splits.append({
                'X_train': X.iloc[train_idx],
                'X_test': X.iloc[test_idx],
                'y_train': y.iloc[train_idx],
                'y_test': y.iloc[test_idx]
            })
        
        return splits

# 使用示例
preprocessor = DataPreprocessor()

# 假设有价格数据
df = pd.DataFrame({
    'close': np.random.randn(1000).cumsum() + 100,
    'volume': np.random.randint(1000, 10000, 1000),
    'rsi': np.random.uniform(20, 80, 1000)
})

# 准备训练数据
X, y = preprocessor.prepare_training_data(
    df, 
    target_column='close',
    lookback_window=10,
    prediction_horizon=1
)

print(f"特征矩阵形状: {X.shape}")
print(f"目标变量形状: {y.shape}")

2. 数据增强

class DataAugmentation:
    def __init__(self):
        pass
    
    def add_noise(self, X, noise_level=0.01):
        """添加噪声增强数据"""
        noise = np.random.normal(0, noise_level, X.shape)
        return X + noise
    
    def time_warping(self, X, warping_factor=0.1):
        """时间扭曲增强"""
        # 简化版本的时间扭曲
        warped_X = X.copy()
        
        for i in range(len(X)):
            if np.random.random() < warping_factor:
                # 随机选择一个时间点进行微调
                shift = np.random.randint(-2, 3)
                if 0 <= i + shift < len(X):
                    warped_X.iloc[i] = X.iloc[i + shift]
        
        return warped_X
    
    def bootstrap_sampling(self, X, y, n_samples=None):
        """自助采样增强"""
        if n_samples is None:
            n_samples = len(X)
        
        indices = np.random.choice(len(X), n_samples, replace=True)
        return X.iloc[indices], y.iloc[indices]
    
    def synthetic_minority_oversampling(self, X, y, threshold=0.1):
        """合成少数类过采样(SMOTE简化版)"""
        # 识别少数类样本
        positive_samples = y > threshold
        negative_samples = y < -threshold
        
        if positive_samples.sum() < negative_samples.sum():
            minority_class = positive_samples
        else:
            minority_class = negative_samples
        
        # 为少数类生成合成样本
        minority_X = X[minority_class]
        minority_y = y[minority_class]
        
        synthetic_X = []
        synthetic_y = []
        
        for i in range(len(minority_X)):
            # 找到最近邻
            distances = np.sum((minority_X.values - minority_X.iloc[i].values) ** 2, axis=1)
            nearest_idx = np.argsort(distances)[1]  # 排除自己
            
            # 生成合成样本
            alpha = np.random.random()
            synthetic_sample = (minority_X.iloc[i] + 
                              alpha * (minority_X.iloc[nearest_idx] - minority_X.iloc[i]))
            synthetic_target = (minority_y.iloc[i] + 
                              alpha * (minority_y.iloc[nearest_idx] - minority_y.iloc[i]))
            
            synthetic_X.append(synthetic_sample)
            synthetic_y.append(synthetic_target)
        
        # 合并原始数据和合成数据
        augmented_X = pd.concat([X, pd.DataFrame(synthetic_X)], ignore_index=True)
        augmented_y = pd.concat([y, pd.Series(synthetic_y)], ignore_index=True)
        
        return augmented_X, augmented_y

模型选择

1. 传统机器学习模型

from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import xgboost as xgb
import lightgbm as lgb

class TraditionalMLModels:
    def __init__(self):
        self.models = {}
        self.trained_models = {}
    
    def initialize_models(self):
        """初始化模型"""
        self.models = {
            'linear_regression': LinearRegression(),
            'ridge': Ridge(alpha=1.0),
            'lasso': Lasso(alpha=1.0),
            'random_forest': RandomForestRegressor(
                n_estimators=100, 
                max_depth=10, 
                random_state=42
            ),
            'gradient_boosting': GradientBoostingRegressor(
                n_estimators=100, 
                max_depth=6, 
                random_state=42
            ),
            'xgboost': xgb.XGBRegressor(
                n_estimators=100,
                max_depth=6,
                learning_rate=0.1,
                random_state=42
            ),
            'lightgbm': lgb.LGBMRegressor(
                n_estimators=100,
                max_depth=6,
                learning_rate=0.1,
                random_state=42
            ),
            'svr': SVR(kernel='rbf', C=1.0, gamma='scale')
        }
    
    def train_model(self, model_name, X_train, y_train):
        """训练单个模型"""
        if model_name not in self.models:
            raise ValueError(f"Model {model_name} not found")
        
        model = self.models[model_name]
        model.fit(X_train, y_train)
        self.trained_models[model_name] = model
        
        return model
    
    def train_all_models(self, X_train, y_train):
        """训练所有模型"""
        self.initialize_models()
        
        for model_name in self.models:
            print(f"Training {model_name}...")
            self.train_model(model_name, X_train, y_train)
        
        print("All models trained successfully!")
    
    def evaluate_model(self, model_name, X_test, y_test):
        """评估模型性能"""
        if model_name not in self.trained_models:
            raise ValueError(f"Model {model_name} not trained")
        
        model = self.trained_models[model_name]
        y_pred = model.predict(X_test)
        
        metrics = {
            'mse': mean_squared_error(y_test, y_pred),
            'mae': mean_absolute_error(y_test, y_pred),
            'r2': r2_score(y_test, y_pred),
            'rmse': np.sqrt(mean_squared_error(y_test, y_pred))
        }
        
        return metrics, y_pred
    
    def compare_models(self, X_test, y_test):
        """比较所有模型性能"""
        results = {}
        
        for model_name in self.trained_models:
            metrics, _ = self.evaluate_model(model_name, X_test, y_test)
            results[model_name] = metrics
        
        # 创建比较表
        comparison_df = pd.DataFrame(results).T
        comparison_df = comparison_df.sort_values('r2', ascending=False)
        
        return comparison_df
    
    def get_feature_importance(self, model_name, feature_names):
        """获取特征重要性"""
        if model_name not in self.trained_models:
            raise ValueError(f"Model {model_name} not trained")
        
        model = self.trained_models[model_name]
        
        if hasattr(model, 'feature_importances_'):
            importance = model.feature_importances_
        elif hasattr(model, 'coef_'):
            importance = np.abs(model.coef_)
        else:
            return None
        
        feature_importance = pd.DataFrame({
            'feature': feature_names,
            'importance': importance
        }).sort_values('importance', ascending=False)
        
        return feature_importance

# 使用示例
ml_models = TraditionalMLModels()

# 训练所有模型
ml_models.train_all_models(X_train, y_train)

# 比较模型性能
comparison = ml_models.compare_models(X_test, y_test)
print(comparison)

2. 深度学习模型

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, GRU, Dropout, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

class DeepLearningModels:
    def __init__(self):
        self.models = {}
        self.history = {}
    
    def create_mlp_model(self, input_shape, hidden_layers=[128, 64, 32]):
        """创建多层感知机模型"""
        model = Sequential()
        model.add(Dense(hidden_layers[0], activation='relu', input_shape=(input_shape,)))
        model.add(Dropout(0.2))
        
        for units in hidden_layers[1:]:
            model.add(Dense(units, activation='relu'))
            model.add(Dropout(0.2))
        
        model.add(Dense(1, activation='linear'))
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )
        
        return model
    
    def create_lstm_model(self, input_shape, lstm_units=[50, 50]):
        """创建LSTM模型"""
        model = Sequential()
        
        # 第一个LSTM层
        model.add(LSTM(
            lstm_units[0], 
            return_sequences=len(lstm_units) > 1,
            input_shape=input_shape
        ))
        model.add(Dropout(0.2))
        
        # 额外的LSTM层
        for i, units in enumerate(lstm_units[1:], 1):
            return_sequences = i < len(lstm_units) - 1
            model.add(LSTM(units, return_sequences=return_sequences))
            model.add(Dropout(0.2))
        
        # 输出层
        model.add(Dense(1, activation='linear'))
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )
        
        return model
    
    def create_cnn_model(self, input_shape, filters=[32, 64], kernel_size=3):
        """创建CNN模型"""
        model = Sequential()
        
        # 卷积层
        model.add(Conv1D(
            filters[0], 
            kernel_size, 
            activation='relu', 
            input_shape=input_shape
        ))
        model.add(MaxPooling1D(pool_size=2))
        
        for f in filters[1:]:
            model.add(Conv1D(f, kernel_size, activation='relu'))
            model.add(MaxPooling1D(pool_size=2))
        
        # 展平和全连接层
        model.add(Flatten())
        model.add(Dense(50, activation='relu'))
        model.add(Dropout(0.2))
        model.add(Dense(1, activation='linear'))
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )
        
        return model
    
    def prepare_lstm_data(self, X, y, sequence_length=20):
        """为LSTM准备序列数据"""
        X_sequences = []
        y_sequences = []
        
        for i in range(sequence_length, len(X)):
            X_sequences.append(X.iloc[i-sequence_length:i].values)
            y_sequences.append(y.iloc[i])
        
        return np.array(X_sequences), np.array(y_sequences)
    
    def train_model(self, model_name, model, X_train, y_train, 
                   X_val=None, y_val=None, epochs=100, batch_size=32):
        """训练深度学习模型"""
        
        # 设置回调函数
        callbacks = [
            EarlyStopping(
                monitor='val_loss' if X_val is not None else 'loss',
                patience=10,
                restore_best_weights=True
            ),
            ReduceLROnPlateau(
                monitor='val_loss' if X_val is not None else 'loss',
                factor=0.5,
                patience=5,
                min_lr=1e-7
            )
        ]
        
        # 训练模型
        if X_val is not None and y_val is not None:
            history = model.fit(
                X_train, y_train,
                validation_data=(X_val, y_val),
                epochs=epochs,
                batch_size=batch_size,
                callbacks=callbacks,
                verbose=1
            )
        else:
            history = model.fit(
                X_train, y_train,
                epochs=epochs,
                batch_size=batch_size,
                callbacks=callbacks,
                verbose=1
            )
        
        self.models[model_name] = model
        self.history[model_name] = history
        
        return model, history
    
    def evaluate_model(self, model_name, X_test, y_test):
        """评估深度学习模型"""
        if model_name not in self.models:
            raise ValueError(f"Model {model_name} not found")
        
        model = self.models[model_name]
        
        # 预测
        y_pred = model.predict(X_test)
        
        # 计算指标
        mse = mean_squared_error(y_test, y_pred)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        return {
            'mse': mse,
            'mae': mae,
            'r2': r2,
            'rmse': np.sqrt(mse)
        }, y_pred.flatten()
    
    def plot_training_history(self, model_name):
        """绘制训练历史"""
        if model_name not in self.history:
            raise ValueError(f"No training history for model {model_name}")
        
        history = self.history[model_name]
        
        import matplotlib.pyplot as plt
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        
        # 损失曲线
        ax1.plot(history.history['loss'], label='Training Loss')
        if 'val_loss' in history.history:
            ax1.plot(history.history['val_loss'], label='Validation Loss')
        ax1.set_title('Model Loss')
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Loss')
        ax1.legend()
        
        # MAE曲线
        ax2.plot(history.history['mae'], label='Training MAE')
        if 'val_mae' in history.history:
            ax2.plot(history.history['val_mae'], label='Validation MAE')
        ax2.set_title('Model MAE')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('MAE')
        ax2.legend()
        
        plt.tight_layout()
        return fig

# 使用示例
dl_models = DeepLearningModels()

# 创建和训练MLP模型
mlp_model = dl_models.create_mlp_model(X_train.shape[1])
dl_models.train_model('mlp', mlp_model, X_train, y_train, X_test, y_test)

# 为LSTM准备数据
X_lstm, y_lstm = dl_models.prepare_lstm_data(X, y, sequence_length=20)
X_train_lstm, X_test_lstm, y_train_lstm, y_test_lstm = train_test_split(
    X_lstm, y_lstm, test_size=0.2, shuffle=False
)

# 创建和训练LSTM模型
lstm_model = dl_models.create_lstm_model((20, X_train.shape[1]))
dl_models.train_model('lstm', lstm_model, X_train_lstm, y_train_lstm, 
                     X_test_lstm, y_test_lstm)

超参数优化

1. 网格搜索

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import make_scorer

class HyperparameterOptimizer:
    def __init__(self):
        self.best_params = {}
        self.best_scores = {}
    
    def grid_search_optimization(self, model, param_grid, X_train, y_train, 
                               cv=5, scoring='neg_mean_squared_error'):
        """网格搜索优化"""
        
        # 创建时间序列交叉验证
        tscv = TimeSeriesSplit(n_splits=cv)
        
        # 执行网格搜索
        grid_search = GridSearchCV(
            estimator=model,
            param_grid=param_grid,
            cv=tscv,
            scoring=scoring,
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        
        return grid_search.best_params_, grid_search.best_score_, grid_search
    
    def random_search_optimization(self, model, param_distributions, X_train, y_train,
                                 n_iter=100, cv=5, scoring='neg_mean_squared_error'):
        """随机搜索优化"""
        
        tscv = TimeSeriesSplit(n_splits=cv)
        
        random_search = RandomizedSearchCV(
            estimator=model,
            param_distributions=param_distributions,
            n_iter=n_iter,
            cv=tscv,
            scoring=scoring,
            n_jobs=-1,
            verbose=1,
            random_state=42
        )
        
        random_search.fit(X_train, y_train)
        
        return random_search.best_params_, random_search.best_score_, random_search
    
    def bayesian_optimization(self, model_func, param_bounds, X_train, y_train, n_calls=50):
        """贝叶斯优化"""
        try:
            from skopt import gp_minimize
            from skopt.space import Real, Integer
            from skopt.utils import use_named_args
            
            # 定义目标函数
            @use_named_args(param_bounds)
            def objective(**params):
                model = model_func(**params)
                
                # 使用交叉验证评估
                tscv = TimeSeriesSplit(n_splits=3)
                scores = []
                
                for train_idx, val_idx in tscv.split(X_train):
                    X_tr, X_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
                    y_tr, y_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
                    
                    model.fit(X_tr, y_tr)
                    y_pred = model.predict(X_val)
                    score = mean_squared_error(y_val, y_pred)
                    scores.append(score)
                
                return np.mean(scores)
            
            # 执行贝叶斯优化
            result = gp_minimize(
                func=objective,
                dimensions=param_bounds,
                n_calls=n_calls,
                random_state=42
            )
            
            # 提取最佳参数
            best_params = {}
            for i, param in enumerate(param_bounds):
                best_params[param.name] = result.x[i]
            
            return best_params, result.fun, result
            
        except ImportError:
            print("scikit-optimize not installed. Please install it for Bayesian optimization.")
            return None, None, None

# 使用示例
optimizer = HyperparameterOptimizer()

# XGBoost参数网格
xgb_param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [3, 6, 9],
    'learning_rate': [0.01, 0.1, 0.2],
    'subsample': [0.8, 0.9, 1.0]
}

# 执行网格搜索
best_params, best_score, grid_search = optimizer.grid_search_optimization(
    xgb.XGBRegressor(random_state=42),
    xgb_param_grid,
    X_train, y_train
)

print(f"Best parameters: {best_params}")
print(f"Best score: {best_score}")

2. 自动机器学习 (AutoML)

class AutoMLPipeline:
    def __init__(self):
        self.best_model = None
        self.best_score = float('-inf')
        self.model_results = {}
    
    def auto_feature_selection(self, X, y, max_features=50):
        """自动特征选择"""
        from sklearn.feature_selection import SelectKBest, f_regression
        
        selector = SelectKBest(score_func=f_regression, k=min(max_features, X.shape[1]))
        X_selected = selector.fit_transform(X, y)
        
        selected_features = X.columns[selector.get_support()]
        
        return pd.DataFrame(X_selected, columns=selected_features, index=X.index)
    
    def auto_model_selection(self, X_train, y_train, X_test, y_test):
        """自动模型选择"""
        
        # 定义候选模型
        models = {
            'linear_regression': LinearRegression(),
            'ridge': Ridge(),
            'random_forest': RandomForestRegressor(n_estimators=100, random_state=42),
            'xgboost': xgb.XGBRegressor(n_estimators=100, random_state=42),
            'lightgbm': lgb.LGBMRegressor(n_estimators=100, random_state=42)
        }
        
        # 训练和评估每个模型
        for name, model in models.items():
            print(f"Training {name}...")
            
            # 训练模型
            model.fit(X_train, y_train)
            
            # 预测和评估
            y_pred = model.predict(X_test)
            score = r2_score(y_test, y_pred)
            
            self.model_results[name] = {
                'model': model,
                'score': score,
                'predictions': y_pred
            }
            
            # 更新最佳模型
            if score > self.best_score:
                self.best_score = score
                self.best_model = model
        
        return self.best_model, self.best_score
    
    def auto_ensemble(self, X_train, y_train, X_test, y_test):
        """自动集成学习"""
        from sklearn.ensemble import VotingRegressor
        
        # 选择表现最好的几个模型
        sorted_models = sorted(self.model_results.items(), 
                             key=lambda x: x[1]['score'], reverse=True)
        
        top_models = sorted_models[:3]  # 选择前3个模型
        
        # 创建投票回归器
        estimators = [(name, result['model']) for name, result in top_models]
        ensemble = VotingRegressor(estimators=estimators)
        
        # 训练集成模型
        ensemble.fit(X_train, y_train)
        
        # 评估集成模型
        y_pred_ensemble = ensemble.predict(X_test)
        ensemble_score = r2_score(y_test, y_pred_ensemble)
        
        print(f"Ensemble R² score: {ensemble_score:.4f}")
        
        return ensemble, ensemble_score
    
    def run_automl_pipeline(self, X, y, test_size=0.2):
        """运行完整的AutoML流水线"""
        print("Starting AutoML Pipeline...")
        
        # 1. 数据分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, shuffle=False
        )
        
        # 2. 自动特征选择
        print("Performing automatic feature selection...")
        X_train_selected = self.auto_feature_selection(X_train, y_train)
        X_test_selected = X_test[X_train_selected.columns]
        
        # 3. 自动模型选择
        print("Performing automatic model selection...")
        best_model, best_score = self.auto_model_selection(
            X_train_selected, y_train, X_test_selected, y_test
        )
        
        # 4. 自动集成
        print("Creating ensemble model...")
        ensemble_model, ensemble_score = self.auto_ensemble(
            X_train_selected, y_train, X_test_selected, y_test
        )
        
        # 5. 选择最终模型
        if ensemble_score > best_score:
            final_model = ensemble_model
            final_score = ensemble_score
            print(f"Ensemble model selected with R² score: {final_score:.4f}")
        else:
            final_model = best_model
            final_score = best_score
            print(f"Best individual model selected with R² score: {final_score:.4f}")
        
        return final_model, final_score, X_train_selected.columns.tolist()

# 使用示例
automl = AutoMLPipeline()
final_model, final_score, selected_features = automl.run_automl_pipeline(X, y)

模型验证

1. 交叉验证

class ModelValidator:
    def __init__(self):
        self.validation_results = {}
    
    def time_series_cross_validation(self, model, X, y, n_splits=5, test_size=0.2):
        """时间序列交叉验证"""
        tscv = TimeSeriesSplit(n_splits=n_splits, test_size=int(len(X) * test_size))
        
        scores = []
        predictions = []
        
        for fold, (train_idx, test_idx) in enumerate(tscv.split(X)):
            print(f"Fold {fold + 1}/{n_splits}")
            
            # 分割数据
            X_train_fold = X.iloc[train_idx]
            X_test_fold = X.iloc[test_idx]
            y_train_fold = y.iloc[train_idx]
            y_test_fold = y.iloc[test_idx]
            
            # 训练模型
            model_copy = clone(model)
            model_copy.fit(X_train_fold, y_train_fold)
            
            # 预测和评估
            y_pred_fold = model_copy.predict(X_test_fold)
            score = r2_score(y_test_fold, y_pred_fold)
            
            scores.append(score)
            predictions.extend(list(zip(test_idx, y_test_fold, y_pred_fold)))
        
        return {
            'scores': scores,
            'mean_score': np.mean(scores),
            'std_score': np.std(scores),
            'predictions': predictions
        }
    
    def walk_forward_validation(self, model, X, y, initial_window=252, step_size=21):
        """滚动窗口验证"""
        scores = []
        predictions = []
        
        for start in range(0, len(X) - initial_window, step_size):
            end_train = start + initial_window
            end_test = min(end_train + step_size, len(X))
            
            if end_test <= end_train:
                break
            
            # 分割数据
            X_train = X.iloc[start:end_train]
            X_test = X.iloc[end_train:end_test]
            y_train = y.iloc[start:end_train]
            y_test = y.iloc[end_train:end_test]
            
            # 训练和预测
            model_copy = clone(model)
            model_copy.fit(X_train, y_train)
            y_pred = model_copy.predict(X_test)
            
            # 评估
            score = r2_score(y_test, y_pred)
            scores.append(score)
            
            # 保存预测结果
            for i, (true_val, pred_val) in enumerate(zip(y_test, y_pred)):
                predictions.append((end_train + i, true_val, pred_val))
        
        return {
            'scores': scores,
            'mean_score': np.mean(scores),
            'std_score': np.std(scores),
            'predictions': predictions
        }
    
    def bootstrap_validation(self, model, X, y, n_bootstrap=100, sample_ratio=0.8):
        """自助法验证"""
        scores = []
        
        for i in range(n_bootstrap):
            # 自助采样
            n_samples = int(len(X) * sample_ratio)
            indices = np.random.choice(len(X), n_samples, replace=True)
            
            X_bootstrap = X.iloc[indices]
            y_bootstrap = y.iloc[indices]
            
            # 创建训练集和测试集
            out_of_bag_indices = list(set(range(len(X))) - set(indices))
            if len(out_of_bag_indices) == 0:
                continue
            
            X_test = X.iloc[out_of_bag_indices]
            y_test = y.iloc[out_of_bag_indices]
            
            # 训练和评估
            model_copy = clone(model)
            model_copy.fit(X_bootstrap, y_bootstrap)
            y_pred = model_copy.predict(X_test)
            
            score = r2_score(y_test, y_pred)
            scores.append(score)
        
        return {
            'scores': scores,
            'mean_score': np.mean(scores),
            'std_score': np.std(scores),
            'confidence_interval': np.percentile(scores, [2.5, 97.5])
        }

# 使用示例
validator = ModelValidator()

# 时间序列交叉验证
tscv_results = validator.time_series_cross_validation(
    RandomForestRegressor(n_estimators=100, random_state=42),
    X, y, n_splits=5
)

print(f"Time Series CV Score: {tscv_results['mean_score']:.4f} ± {tscv_results['std_score']:.4f}")

模型部署

1. 模型保存和加载

import joblib
import pickle
import json
from datetime import datetime

class ModelDeployment:
    def __init__(self):
        self.model_metadata = {}
    
    def save_model(self, model, model_name, feature_columns, scaler=None, 
                  model_type='sklearn', save_path='models/'):
        """保存模型"""
        import os
        
        # 创建保存目录
        os.makedirs(save_path, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        model_filename = f"{model_name}_{timestamp}"
        
        # 保存模型
        if model_type == 'sklearn':
            model_path = os.path.join(save_path, f"{model_filename}.joblib")
            joblib.dump(model, model_path)
        elif model_type == 'tensorflow':
            model_path = os.path.join(save_path, f"{model_filename}.h5")
            model.save(model_path)
        else:
            model_path = os.path.join(save_path, f"{model_filename}.pkl")
            with open(model_path, 'wb') as f:
                pickle.dump(model, f)
        
        # 保存预处理器
        if scaler is not None:
            scaler_path = os.path.join(save_path, f"{model_filename}_scaler.joblib")
            joblib.dump(scaler, scaler_path)
        
        # 保存元数据
        metadata = {
            'model_name': model_name,
            'model_type': model_type,
            'feature_columns': feature_columns,
            'timestamp': timestamp,
            'model_path': model_path,
            'scaler_path': scaler_path if scaler else None
        }
        
        metadata_path = os.path.join(save_path, f"{model_filename}_metadata.json")
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
        
        self.model_metadata[model_name] = metadata
        
        return model_path, metadata_path
    
    def load_model(self, metadata_path):
        """加载模型"""
        # 加载元数据
        with open(metadata_path, 'r') as f:
            metadata = json.load(f)
        
        # 加载模型
        model_type = metadata['model_type']
        model_path = metadata['model_path']
        
        if model_type == 'sklearn':
            model = joblib.load(model_path)
        elif model_type == 'tensorflow':
            model = tf.keras.models.load_model(model_path)
        else:
            with open(model_path, 'rb') as f:
                model = pickle.load(f)
        
        # 加载预处理器
        scaler = None
        if metadata['scaler_path']:
            scaler = joblib.load(metadata['scaler_path'])
        
        return model, scaler, metadata
    
    def create_prediction_pipeline(self, model, scaler, feature_columns):
        """创建预测流水线"""
        def predict(X):
            # 确保特征顺序正确
            X_ordered = X[feature_columns]
            
            # 应用预处理
            if scaler:
                X_scaled = scaler.transform(X_ordered)
                X_processed = pd.DataFrame(X_scaled, columns=feature_columns, index=X.index)
            else:
                X_processed = X_ordered
            
            # 预测
            predictions = model.predict(X_processed)
            
            return predictions
        
        return predict
    
    def model_monitoring(self, model, X_new, y_new, baseline_metrics):
        """模型监控"""
        # 预测新数据
        y_pred_new = model.predict(X_new)
        
        # 计算新指标
        new_metrics = {
            'mse': mean_squared_error(y_new, y_pred_new),
            'mae': mean_absolute_error(y_new, y_pred_new),
            'r2': r2_score(y_new, y_pred_new)
        }
        
        # 检查性能退化
        alerts = []
        for metric, new_value in new_metrics.items():
            baseline_value = baseline_metrics.get(metric, 0)
            
            if metric in ['mse', 'mae']:  # 越小越好
                if new_value > baseline_value * 1.2:  # 性能下降20%
                    alerts.append(f"{metric} increased by {((new_value/baseline_value - 1) * 100):.1f}%")
            else:  # r2越大越好
                if new_value < baseline_value * 0.8:  # 性能下降20%
                    alerts.append(f"{metric} decreased by {((1 - new_value/baseline_value) * 100):.1f}%")
        
        return new_metrics, alerts

# 使用示例
deployment = ModelDeployment()

# 保存模型
model_path, metadata_path = deployment.save_model(
    final_model, 
    'price_prediction_model',
    selected_features,
    scaler=preprocessor.scalers.get('features'),
    model_type='sklearn'
)

# 加载模型
loaded_model, loaded_scaler, metadata = deployment.load_model(metadata_path)

# 创建预测流水线
predict_pipeline = deployment.create_prediction_pipeline(
    loaded_model, loaded_scaler, metadata['feature_columns']
)

# 使用流水线进行预测
new_predictions = predict_pipeline(X_test)

模型训练最佳实践

1. 训练流程

  1. 数据质量检查: 确保数据的准确性和完整性
  2. 特征工程: 构造有意义的特征
  3. 模型选择: 选择合适的算法
  4. 超参数优化: 调整模型参数
  5. 交叉验证: 评估模型泛化能力
  6. 模型集成: 结合多个模型提高性能
  7. 部署监控: 监控模型在生产环境的表现

2. 常见陷阱

陷阱描述解决方案
数据泄露使用未来信息训练模型严格控制时间顺序
过拟合模型在训练集上表现好但泛化差交叉验证和正则化
样本偏差训练数据不代表真实分布使用更多样化的数据
特征漂移特征分布随时间变化定期重训练模型

3. 性能优化

  • 数据并行: 使用多进程处理数据
  • 模型并行: 分布式训练大型模型
  • 增量学习: 在线更新模型参数
  • 模型压缩: 减少模型大小和推理时间

下一步

掌握了模型训练后,建议您:

  1. 回到策略框架 - 将模型整合到交易策略中
  2. 学习API接入 - 将模型部署到实盘交易
  3. 了解风险管理 - 结合模型进行风险控制
  4. 参与社区讨论 - 与其他用户交流机器学习经验

重要提示: 机器学习模型在金融市场中的应用需要特别谨慎。市场环境会发生变化,模型需要定期重训练和验证。请始终结合传统的风险管理方法,不要完全依赖模型预测。