特征工程

特征工程是机器学习和量化交易中的关键步骤,通过从原始数据中提取和构造有意义的特征,可以显著提升模型的预测能力。

特征工程概述

特征工程包含以下主要步骤:

  • 特征提取: 从原始数据中提取基础特征
  • 特征构造: 基于现有特征创建新特征
  • 特征选择: 选择最有价值的特征
  • 特征变换: 对特征进行标准化和归一化
  • 特征验证: 验证特征的有效性和稳定性

价格特征

1. 基础价格特征

import pandas as pd
import numpy as np

class PriceFeatures:
    def __init__(self):
        self.features = {}
    
    def extract_basic_features(self, df):
        """提取基础价格特征"""
        features = pd.DataFrame(index=df.index)
        
        # 价格变化特征
        features['price_change'] = df['close'].pct_change()
        features['price_change_abs'] = features['price_change'].abs()
        features['log_return'] = np.log(df['close'] / df['close'].shift(1))
        
        # 价格位置特征
        features['hl_ratio'] = (df['high'] - df['low']) / df['close']
        features['oc_ratio'] = (df['close'] - df['open']) / df['open']
        features['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
        
        # 影线特征
        features['upper_shadow'] = (df['high'] - np.maximum(df['open'], df['close'])) / df['close']
        features['lower_shadow'] = (np.minimum(df['open'], df['close']) - df['low']) / df['close']
        features['body_size'] = abs(df['close'] - df['open']) / df['close']
        
        return features
    
    def extract_volatility_features(self, df, windows=[5, 10, 20]):
        """提取波动率特征"""
        features = pd.DataFrame(index=df.index)
        
        returns = df['close'].pct_change()
        
        for window in windows:
            # 历史波动率
            features[f'volatility_{window}'] = returns.rolling(window).std() * np.sqrt(252)
            
            # 已实现波动率
            features[f'realized_vol_{window}'] = np.sqrt(
                (returns ** 2).rolling(window).sum() * 252
            )
            
            # 波动率比率
            if window > 5:
                features[f'vol_ratio_{window}_5'] = (
                    features[f'volatility_{window}'] / features['volatility_5']
                )
        
        return features
    
    def extract_momentum_features(self, df, windows=[5, 10, 20, 50]):
        """提取动量特征"""
        features = pd.DataFrame(index=df.index)
        
        for window in windows:
            # 价格动量
            features[f'momentum_{window}'] = df['close'] / df['close'].shift(window) - 1
            
            # 累积收益
            features[f'cumret_{window}'] = df['close'].pct_change().rolling(window).sum()
            
            # 动量强度
            returns = df['close'].pct_change()
            up_days = (returns > 0).rolling(window).sum()
            features[f'momentum_strength_{window}'] = up_days / window
        
        return features

# 使用示例
price_features = PriceFeatures()

# 假设有OHLCV数据
df = pd.DataFrame({
    'open': [100, 101, 102, 103, 104],
    'high': [102, 103, 104, 105, 106],
    'low': [99, 100, 101, 102, 103],
    'close': [101, 102, 103, 104, 105],
    'volume': [1000, 1100, 1200, 1300, 1400]
})

basic_features = price_features.extract_basic_features(df)
volatility_features = price_features.extract_volatility_features(df)
momentum_features = price_features.extract_momentum_features(df)

2. 技术指标特征

class TechnicalFeatures:
    def __init__(self):
        pass
    
    def extract_ma_features(self, df, windows=[5, 10, 20, 50, 200]):
        """提取移动平均线特征"""
        features = pd.DataFrame(index=df.index)
        
        for window in windows:
            # 简单移动平均
            ma = df['close'].rolling(window).mean()
            features[f'sma_{window}'] = ma
            features[f'price_to_sma_{window}'] = df['close'] / ma - 1
            
            # 指数移动平均
            ema = df['close'].ewm(span=window).mean()
            features[f'ema_{window}'] = ema
            features[f'price_to_ema_{window}'] = df['close'] / ema - 1
        
        # MA交叉特征
        features['sma_5_20_cross'] = np.where(
            features['sma_5'] > features['sma_20'], 1, 0
        )
        features['sma_10_50_cross'] = np.where(
            features['sma_10'] > features['sma_50'], 1, 0
        )
        
        return features
    
    def extract_oscillator_features(self, df):
        """提取震荡指标特征"""
        features = pd.DataFrame(index=df.index)
        
        # RSI
        delta = df['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        features['rsi'] = 100 - (100 / (1 + rs))
        features['rsi_overbought'] = (features['rsi'] > 70).astype(int)
        features['rsi_oversold'] = (features['rsi'] < 30).astype(int)
        
        # 布林带
        sma_20 = df['close'].rolling(20).mean()
        std_20 = df['close'].rolling(20).std()
        features['bb_upper'] = sma_20 + (std_20 * 2)
        features['bb_lower'] = sma_20 - (std_20 * 2)
        features['bb_position'] = (df['close'] - features['bb_lower']) / (
            features['bb_upper'] - features['bb_lower']
        )
        features['bb_squeeze'] = (features['bb_upper'] - features['bb_lower']) / sma_20
        
        # MACD
        ema_12 = df['close'].ewm(span=12).mean()
        ema_26 = df['close'].ewm(span=26).mean()
        features['macd'] = ema_12 - ema_26
        features['macd_signal'] = features['macd'].ewm(span=9).mean()
        features['macd_histogram'] = features['macd'] - features['macd_signal']
        features['macd_bullish'] = (features['macd'] > features['macd_signal']).astype(int)
        
        return features
    
    def extract_volume_features(self, df):
        """提取成交量特征"""
        features = pd.DataFrame(index=df.index)
        
        # 基础成交量特征
        features['volume_sma_20'] = df['volume'].rolling(20).mean()
        features['volume_ratio'] = df['volume'] / features['volume_sma_20']
        features['volume_change'] = df['volume'].pct_change()
        
        # 价量关系
        price_change = df['close'].pct_change()
        features['price_volume_corr'] = price_change.rolling(20).corr(df['volume'].pct_change())
        
        # OBV (On Balance Volume)
        obv = np.where(df['close'] > df['close'].shift(1), df['volume'],
                      np.where(df['close'] < df['close'].shift(1), -df['volume'], 0))
        features['obv'] = obv.cumsum()
        features['obv_sma'] = features['obv'].rolling(20).mean()
        
        # 成交量加权平均价格 (VWAP)
        typical_price = (df['high'] + df['low'] + df['close']) / 3
        features['vwap'] = (typical_price * df['volume']).rolling(20).sum() / df['volume'].rolling(20).sum()
        features['price_to_vwap'] = df['close'] / features['vwap'] - 1
        
        return features

时间特征

1. 周期性特征

class TimeFeatures:
    def __init__(self):
        pass
    
    def extract_cyclical_features(self, df):
        """提取周期性时间特征"""
        features = pd.DataFrame(index=df.index)
        
        # 确保索引是datetime类型
        if not isinstance(df.index, pd.DatetimeIndex):
            df.index = pd.to_datetime(df.index)
        
        # 小时特征(对于小时级数据)
        features['hour'] = df.index.hour
        features['hour_sin'] = np.sin(2 * np.pi * features['hour'] / 24)
        features['hour_cos'] = np.cos(2 * np.pi * features['hour'] / 24)
        
        # 星期特征
        features['dayofweek'] = df.index.dayofweek
        features['dow_sin'] = np.sin(2 * np.pi * features['dayofweek'] / 7)
        features['dow_cos'] = np.cos(2 * np.pi * features['dayofweek'] / 7)
        
        # 月份特征
        features['month'] = df.index.month
        features['month_sin'] = np.sin(2 * np.pi * features['month'] / 12)
        features['month_cos'] = np.cos(2 * np.pi * features['month'] / 12)
        
        # 季度特征
        features['quarter'] = df.index.quarter
        features['quarter_sin'] = np.sin(2 * np.pi * features['quarter'] / 4)
        features['quarter_cos'] = np.cos(2 * np.pi * features['quarter'] / 4)
        
        return features
    
    def extract_market_session_features(self, df):
        """提取市场时段特征"""
        features = pd.DataFrame(index=df.index)
        
        # 加密货币市场24小时开放,但可以定义活跃时段
        hour = df.index.hour
        
        # 亚洲时段 (UTC 0-8)
        features['asian_session'] = ((hour >= 0) & (hour < 8)).astype(int)
        
        # 欧洲时段 (UTC 8-16)
        features['european_session'] = ((hour >= 8) & (hour < 16)).astype(int)
        
        # 美洲时段 (UTC 16-24)
        features['american_session'] = ((hour >= 16) & (hour < 24)).astype(int)
        
        # 周末特征
        features['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
        
        return features
    
    def extract_lag_features(self, df, lags=[1, 2, 3, 5, 10]):
        """提取滞后特征"""
        features = pd.DataFrame(index=df.index)
        
        # 价格滞后特征
        for lag in lags:
            features[f'close_lag_{lag}'] = df['close'].shift(lag)
            features[f'return_lag_{lag}'] = df['close'].pct_change().shift(lag)
            features[f'volume_lag_{lag}'] = df['volume'].shift(lag)
        
        # 滚动统计特征
        returns = df['close'].pct_change()
        for window in [5, 10, 20]:
            features[f'return_mean_{window}'] = returns.rolling(window).mean()
            features[f'return_std_{window}'] = returns.rolling(window).std()
            features[f'return_skew_{window}'] = returns.rolling(window).skew()
            features[f'return_kurt_{window}'] = returns.rolling(window).kurt()
        
        return features

2. 趋势特征

class TrendFeatures:
    def __init__(self):
        pass
    
    def extract_trend_features(self, df, windows=[10, 20, 50]):
        """提取趋势特征"""
        features = pd.DataFrame(index=df.index)
        
        for window in windows:
            # 线性趋势
            features[f'trend_slope_{window}'] = self.calculate_trend_slope(df['close'], window)
            
            # 趋势强度
            features[f'trend_strength_{window}'] = self.calculate_trend_strength(df['close'], window)
            
            # 趋势一致性
            features[f'trend_consistency_{window}'] = self.calculate_trend_consistency(df['close'], window)
        
        return features
    
    def calculate_trend_slope(self, series, window):
        """计算趋势斜率"""
        def slope(y):
            if len(y) < 2:
                return 0
            x = np.arange(len(y))
            return np.polyfit(x, y, 1)[0]
        
        return series.rolling(window).apply(slope, raw=False)
    
    def calculate_trend_strength(self, series, window):
        """计算趋势强度"""
        def r_squared(y):
            if len(y) < 2:
                return 0
            x = np.arange(len(y))
            try:
                slope, intercept = np.polyfit(x, y, 1)
                y_pred = slope * x + intercept
                ss_res = np.sum((y - y_pred) ** 2)
                ss_tot = np.sum((y - np.mean(y)) ** 2)
                return 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
            except:
                return 0
        
        return series.rolling(window).apply(r_squared, raw=False)
    
    def calculate_trend_consistency(self, series, window):
        """计算趋势一致性"""
        returns = series.pct_change()
        
        def consistency(ret):
            if len(ret) < 2:
                return 0
            positive_count = (ret > 0).sum()
            return abs(positive_count / len(ret) - 0.5) * 2
        
        return returns.rolling(window).apply(consistency, raw=False)

高级特征

1. 统计特征

class StatisticalFeatures:
    def __init__(self):
        pass
    
    def extract_distribution_features(self, df, windows=[20, 50]):
        """提取分布特征"""
        features = pd.DataFrame(index=df.index)
        
        returns = df['close'].pct_change()
        
        for window in windows:
            # 基础统计量
            features[f'return_mean_{window}'] = returns.rolling(window).mean()
            features[f'return_std_{window}'] = returns.rolling(window).std()
            features[f'return_skew_{window}'] = returns.rolling(window).skew()
            features[f'return_kurt_{window}'] = returns.rolling(window).kurt()
            
            # 分位数特征
            features[f'return_q25_{window}'] = returns.rolling(window).quantile(0.25)
            features[f'return_q75_{window}'] = returns.rolling(window).quantile(0.75)
            features[f'return_iqr_{window}'] = (
                features[f'return_q75_{window}'] - features[f'return_q25_{window}']
            )
            
            # 极值特征
            features[f'return_min_{window}'] = returns.rolling(window).min()
            features[f'return_max_{window}'] = returns.rolling(window).max()
            features[f'return_range_{window}'] = (
                features[f'return_max_{window}'] - features[f'return_min_{window}']
            )
        
        return features
    
    def extract_entropy_features(self, df, windows=[20, 50]):
        """提取熵特征"""
        features = pd.DataFrame(index=df.index)
        
        returns = df['close'].pct_change()
        
        for window in windows:
            # 近似熵
            features[f'approx_entropy_{window}'] = self.calculate_approximate_entropy(
                returns, window
            )
            
            # 样本熵
            features[f'sample_entropy_{window}'] = self.calculate_sample_entropy(
                returns, window
            )
        
        return features
    
    def calculate_approximate_entropy(self, series, window, m=2, r=0.2):
        """计算近似熵"""
        def approx_entropy(data):
            if len(data) < m + 1:
                return 0
            
            def _maxdist(xi, xj, m):
                return max([abs(ua - va) for ua, va in zip(xi, xj)])
            
            def _phi(m):
                patterns = np.array([data[i:i + m] for i in range(len(data) - m + 1)])
                C = np.zeros(len(patterns))
                
                for i in range(len(patterns)):
                    template = patterns[i]
                    matches = sum([1 for pattern in patterns 
                                 if _maxdist(template, pattern, m) <= r])
                    C[i] = matches / float(len(patterns))
                
                phi = np.mean(np.log(C))
                return phi
            
            return _phi(m) - _phi(m + 1)
        
        return series.rolling(window).apply(approx_entropy, raw=False)
    
    def calculate_sample_entropy(self, series, window, m=2, r=0.2):
        """计算样本熵"""
        # 简化版本的样本熵计算
        def sample_entropy(data):
            if len(data) < m + 1:
                return 0
            
            # 这里使用简化的计算方法
            # 实际应用中可能需要更复杂的实现
            return np.std(data) / (np.mean(np.abs(data)) + 1e-8)
        
        return series.rolling(window).apply(sample_entropy, raw=False)

2. 频域特征

class FrequencyFeatures:
    def __init__(self):
        pass
    
    def extract_fft_features(self, df, window=50):
        """提取FFT特征"""
        features = pd.DataFrame(index=df.index)
        
        returns = df['close'].pct_change().fillna(0)
        
        def fft_features(data):
            if len(data) < window:
                return [0, 0, 0, 0]
            
            # 计算FFT
            fft_values = np.fft.fft(data)
            fft_freq = np.fft.fftfreq(len(data))
            
            # 功率谱密度
            power_spectrum = np.abs(fft_values) ** 2
            
            # 主要频率成分
            dominant_freq_idx = np.argmax(power_spectrum[1:len(power_spectrum)//2]) + 1
            dominant_freq = fft_freq[dominant_freq_idx]
            dominant_power = power_spectrum[dominant_freq_idx]
            
            # 频谱质心
            freqs = fft_freq[:len(fft_freq)//2]
            powers = power_spectrum[:len(power_spectrum)//2]
            spectral_centroid = np.sum(freqs * powers) / np.sum(powers) if np.sum(powers) > 0 else 0
            
            # 频谱带宽
            spectral_bandwidth = np.sqrt(
                np.sum(((freqs - spectral_centroid) ** 2) * powers) / np.sum(powers)
            ) if np.sum(powers) > 0 else 0
            
            return [dominant_freq, dominant_power, spectral_centroid, spectral_bandwidth]
        
        # 滚动计算FFT特征
        fft_results = returns.rolling(window).apply(
            lambda x: fft_features(x.values), raw=False, result_type='expand'
        )
        
        if fft_results is not None and len(fft_results.columns) >= 4:
            features['dominant_frequency'] = fft_results.iloc[:, 0]
            features['dominant_power'] = fft_results.iloc[:, 1]
            features['spectral_centroid'] = fft_results.iloc[:, 2]
            features['spectral_bandwidth'] = fft_results.iloc[:, 3]
        
        return features
    
    def extract_wavelet_features(self, df, window=50):
        """提取小波特征"""
        features = pd.DataFrame(index=df.index)
        
        try:
            import pywt
            
            returns = df['close'].pct_change().fillna(0)
            
            def wavelet_features(data):
                if len(data) < window:
                    return [0, 0, 0, 0]
                
                # 小波分解
                coeffs = pywt.wavedec(data, 'db4', level=3)
                
                # 提取特征
                cA3, cD3, cD2, cD1 = coeffs
                
                # 各层能量
                energy_cA3 = np.sum(cA3 ** 2)
                energy_cD3 = np.sum(cD3 ** 2)
                energy_cD2 = np.sum(cD2 ** 2)
                energy_cD1 = np.sum(cD1 ** 2)
                
                total_energy = energy_cA3 + energy_cD3 + energy_cD2 + energy_cD1
                
                if total_energy > 0:
                    return [
                        energy_cA3 / total_energy,
                        energy_cD3 / total_energy,
                        energy_cD2 / total_energy,
                        energy_cD1 / total_energy
                    ]
                else:
                    return [0, 0, 0, 0]
            
            # 滚动计算小波特征
            wavelet_results = returns.rolling(window).apply(
                lambda x: wavelet_features(x.values), raw=False, result_type='expand'
            )
            
            if wavelet_results is not None and len(wavelet_results.columns) >= 4:
                features['wavelet_cA3'] = wavelet_results.iloc[:, 0]
                features['wavelet_cD3'] = wavelet_results.iloc[:, 1]
                features['wavelet_cD2'] = wavelet_results.iloc[:, 2]
                features['wavelet_cD1'] = wavelet_results.iloc[:, 3]
        
        except ImportError:
            print("PyWavelets not installed. Skipping wavelet features.")
        
        return features

特征选择

1. 统计方法特征选择

from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
from sklearn.ensemble import RandomForestRegressor
import scipy.stats as stats

class FeatureSelector:
    def __init__(self):
        self.selected_features = []
        self.feature_scores = {}
    
    def correlation_filter(self, X, y, threshold=0.05):
        """基于相关性的特征选择"""
        correlations = {}
        
        for column in X.columns:
            # 计算与目标变量的相关性
            corr, p_value = stats.pearsonr(X[column].dropna(), y[X[column].dropna().index])
            correlations[column] = {'correlation': abs(corr), 'p_value': p_value}
        
        # 选择显著相关的特征
        selected = [col for col, stats in correlations.items() 
                   if stats['p_value'] < threshold]
        
        self.feature_scores['correlation'] = correlations
        return selected
    
    def mutual_information_selection(self, X, y, k=50):
        """基于互信息的特征选择"""
        # 处理缺失值
        X_clean = X.fillna(X.mean())
        y_clean = y[X_clean.index]
        
        # 计算互信息
        mi_scores = mutual_info_regression(X_clean, y_clean)
        
        # 选择前k个特征
        feature_mi = dict(zip(X.columns, mi_scores))
        selected = sorted(feature_mi.items(), key=lambda x: x[1], reverse=True)[:k]
        
        self.feature_scores['mutual_information'] = feature_mi
        return [feature[0] for feature in selected]
    
    def random_forest_selection(self, X, y, n_estimators=100, max_features=50):
        """基于随机森林重要性的特征选择"""
        # 处理缺失值
        X_clean = X.fillna(X.mean())
        y_clean = y[X_clean.index]
        
        # 训练随机森林
        rf = RandomForestRegressor(n_estimators=n_estimators, random_state=42)
        rf.fit(X_clean, y_clean)
        
        # 获取特征重要性
        feature_importance = dict(zip(X.columns, rf.feature_importances_))
        
        # 选择最重要的特征
        selected = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:max_features]
        
        self.feature_scores['random_forest'] = feature_importance
        return [feature[0] for feature in selected]
    
    def remove_highly_correlated_features(self, X, threshold=0.95):
        """移除高度相关的特征"""
        # 计算相关性矩阵
        corr_matrix = X.corr().abs()
        
        # 找到高度相关的特征对
        upper_triangle = corr_matrix.where(
            np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
        )
        
        # 找到需要删除的特征
        to_drop = [column for column in upper_triangle.columns 
                  if any(upper_triangle[column] > threshold)]
        
        return [col for col in X.columns if col not in to_drop]
    
    def ensemble_feature_selection(self, X, y, methods=['correlation', 'mutual_info', 'random_forest']):
        """集成多种方法的特征选择"""
        all_selected = {}
        
        if 'correlation' in methods:
            corr_features = self.correlation_filter(X, y)
            for feature in corr_features:
                all_selected[feature] = all_selected.get(feature, 0) + 1
        
        if 'mutual_info' in methods:
            mi_features = self.mutual_information_selection(X, y)
            for feature in mi_features:
                all_selected[feature] = all_selected.get(feature, 0) + 1
        
        if 'random_forest' in methods:
            rf_features = self.random_forest_selection(X, y)
            for feature in rf_features:
                all_selected[feature] = all_selected.get(feature, 0) + 1
        
        # 选择被多种方法选中的特征
        min_votes = len(methods) // 2 + 1  # 至少被一半以上的方法选中
        final_selected = [feature for feature, votes in all_selected.items() 
                         if votes >= min_votes]
        
        return final_selected

2. 特征重要性分析

class FeatureImportanceAnalyzer:
    def __init__(self):
        self.importance_scores = {}
    
    def analyze_feature_importance(self, X, y, method='all'):
        """分析特征重要性"""
        results = {}
        
        if method in ['all', 'permutation']:
            results['permutation'] = self.permutation_importance(X, y)
        
        if method in ['all', 'shap']:
            results['shap'] = self.shap_importance(X, y)
        
        if method in ['all', 'lime']:
            results['lime'] = self.lime_importance(X, y)
        
        return results
    
    def permutation_importance(self, X, y):
        """排列重要性分析"""
        from sklearn.inspection import permutation_importance
        from sklearn.ensemble import RandomForestRegressor
        
        # 训练基础模型
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        X_clean = X.fillna(X.mean())
        y_clean = y[X_clean.index]
        
        model.fit(X_clean, y_clean)
        
        # 计算排列重要性
        perm_importance = permutation_importance(
            model, X_clean, y_clean, n_repeats=10, random_state=42
        )
        
        importance_dict = dict(zip(X.columns, perm_importance.importances_mean))
        return sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)
    
    def shap_importance(self, X, y):
        """SHAP重要性分析"""
        try:
            import shap
            from sklearn.ensemble import RandomForestRegressor
            
            # 训练模型
            model = RandomForestRegressor(n_estimators=100, random_state=42)
            X_clean = X.fillna(X.mean())
            y_clean = y[X_clean.index]
            
            model.fit(X_clean, y_clean)
            
            # 计算SHAP值
            explainer = shap.TreeExplainer(model)
            shap_values = explainer.shap_values(X_clean.sample(min(1000, len(X_clean))))
            
            # 计算特征重要性
            importance_dict = dict(zip(X.columns, np.abs(shap_values).mean(0)))
            return sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)
        
        except ImportError:
            print("SHAP not installed. Skipping SHAP analysis.")
            return []
    
    def create_importance_report(self, importance_results):
        """创建特征重要性报告"""
        report = "特征重要性分析报告\n"
        report += "=" * 50 + "\n\n"
        
        for method, results in importance_results.items():
            report += f"{method.upper()} 重要性排名:\n"
            report += "-" * 30 + "\n"
            
            for i, (feature, score) in enumerate(results[:20], 1):
                report += f"{i:2d}. {feature:<30} {score:.6f}\n"
            
            report += "\n"
        
        return report

特征工程最佳实践

1. 特征工程流程

  1. 数据探索: 理解数据分布和特征
  2. 特征提取: 从原始数据提取基础特征
  3. 特征构造: 创建组合和变换特征
  4. 特征选择: 选择最有价值的特征
  5. 特征验证: 验证特征的稳定性和有效性

2. 常见陷阱

陷阱描述解决方案
前瞻偏差使用未来信息构造特征严格控制时间顺序
过拟合特征过多导致模型过拟合特征选择和交叉验证
数据泄露特征包含目标变量信息仔细检查特征构造逻辑
特征不稳定特征在不同时期表现差异大滚动验证和稳定性测试

3. 性能优化

class FeatureEngineering:
    def __init__(self):
        self.feature_cache = {}
        self.feature_pipeline = []
    
    def build_feature_pipeline(self, feature_extractors):
        """构建特征工程流水线"""
        self.feature_pipeline = feature_extractors
    
    def extract_all_features(self, df, use_cache=True):
        """提取所有特征"""
        if use_cache and 'all_features' in self.feature_cache:
            return self.feature_cache['all_features']
        
        all_features = pd.DataFrame(index=df.index)
        
        for extractor in self.feature_pipeline:
            features = extractor.extract_features(df)
            all_features = pd.concat([all_features, features], axis=1)
        
        if use_cache:
            self.feature_cache['all_features'] = all_features
        
        return all_features
    
    def incremental_feature_update(self, new_data):
        """增量更新特征"""
        # 只计算新数据的特征,提高效率
        new_features = self.extract_all_features(new_data, use_cache=False)
        
        # 更新缓存
        if 'all_features' in self.feature_cache:
            self.feature_cache['all_features'] = pd.concat([
                self.feature_cache['all_features'], new_features
            ])
        
        return new_features

下一步

掌握了特征工程后,建议您:

  1. 学习模型训练 - 使用特征训练机器学习模型
  2. 了解模型部署 - 将模型部署到生产环境

重要提示: 特征工程是一个迭代过程,需要结合领域知识和数据洞察。好的特征往往比复杂的模型更重要,请花时间深入理解数据和市场机制。