特征工程
特征工程是机器学习和量化交易中的关键步骤,通过从原始数据中提取和构造有意义的特征,可以显著提升模型的预测能力。
特征工程概述
特征工程包含以下主要步骤:
- 特征提取: 从原始数据中提取基础特征
- 特征构造: 基于现有特征创建新特征
- 特征选择: 选择最有价值的特征
- 特征变换: 对特征进行标准化和归一化
- 特征验证: 验证特征的有效性和稳定性
价格特征
1. 基础价格特征
import pandas as pd
import numpy as np
class PriceFeatures:
def __init__(self):
self.features = {}
def extract_basic_features(self, df):
"""提取基础价格特征"""
features = pd.DataFrame(index=df.index)
# 价格变化特征
features['price_change'] = df['close'].pct_change()
features['price_change_abs'] = features['price_change'].abs()
features['log_return'] = np.log(df['close'] / df['close'].shift(1))
# 价格位置特征
features['hl_ratio'] = (df['high'] - df['low']) / df['close']
features['oc_ratio'] = (df['close'] - df['open']) / df['open']
features['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
# 影线特征
features['upper_shadow'] = (df['high'] - np.maximum(df['open'], df['close'])) / df['close']
features['lower_shadow'] = (np.minimum(df['open'], df['close']) - df['low']) / df['close']
features['body_size'] = abs(df['close'] - df['open']) / df['close']
return features
def extract_volatility_features(self, df, windows=[5, 10, 20]):
"""提取波动率特征"""
features = pd.DataFrame(index=df.index)
returns = df['close'].pct_change()
for window in windows:
# 历史波动率
features[f'volatility_{window}'] = returns.rolling(window).std() * np.sqrt(252)
# 已实现波动率
features[f'realized_vol_{window}'] = np.sqrt(
(returns ** 2).rolling(window).sum() * 252
)
# 波动率比率
if window > 5:
features[f'vol_ratio_{window}_5'] = (
features[f'volatility_{window}'] / features['volatility_5']
)
return features
def extract_momentum_features(self, df, windows=[5, 10, 20, 50]):
"""提取动量特征"""
features = pd.DataFrame(index=df.index)
for window in windows:
# 价格动量
features[f'momentum_{window}'] = df['close'] / df['close'].shift(window) - 1
# 累积收益
features[f'cumret_{window}'] = df['close'].pct_change().rolling(window).sum()
# 动量强度
returns = df['close'].pct_change()
up_days = (returns > 0).rolling(window).sum()
features[f'momentum_strength_{window}'] = up_days / window
return features
# 使用示例
price_features = PriceFeatures()
# 假设有OHLCV数据
df = pd.DataFrame({
'open': [100, 101, 102, 103, 104],
'high': [102, 103, 104, 105, 106],
'low': [99, 100, 101, 102, 103],
'close': [101, 102, 103, 104, 105],
'volume': [1000, 1100, 1200, 1300, 1400]
})
basic_features = price_features.extract_basic_features(df)
volatility_features = price_features.extract_volatility_features(df)
momentum_features = price_features.extract_momentum_features(df)
2. 技术指标特征
class TechnicalFeatures:
def __init__(self):
pass
def extract_ma_features(self, df, windows=[5, 10, 20, 50, 200]):
"""提取移动平均线特征"""
features = pd.DataFrame(index=df.index)
for window in windows:
# 简单移动平均
ma = df['close'].rolling(window).mean()
features[f'sma_{window}'] = ma
features[f'price_to_sma_{window}'] = df['close'] / ma - 1
# 指数移动平均
ema = df['close'].ewm(span=window).mean()
features[f'ema_{window}'] = ema
features[f'price_to_ema_{window}'] = df['close'] / ema - 1
# MA交叉特征
features['sma_5_20_cross'] = np.where(
features['sma_5'] > features['sma_20'], 1, 0
)
features['sma_10_50_cross'] = np.where(
features['sma_10'] > features['sma_50'], 1, 0
)
return features
def extract_oscillator_features(self, df):
"""提取震荡指标特征"""
features = pd.DataFrame(index=df.index)
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
features['rsi'] = 100 - (100 / (1 + rs))
features['rsi_overbought'] = (features['rsi'] > 70).astype(int)
features['rsi_oversold'] = (features['rsi'] < 30).astype(int)
# 布林带
sma_20 = df['close'].rolling(20).mean()
std_20 = df['close'].rolling(20).std()
features['bb_upper'] = sma_20 + (std_20 * 2)
features['bb_lower'] = sma_20 - (std_20 * 2)
features['bb_position'] = (df['close'] - features['bb_lower']) / (
features['bb_upper'] - features['bb_lower']
)
features['bb_squeeze'] = (features['bb_upper'] - features['bb_lower']) / sma_20
# MACD
ema_12 = df['close'].ewm(span=12).mean()
ema_26 = df['close'].ewm(span=26).mean()
features['macd'] = ema_12 - ema_26
features['macd_signal'] = features['macd'].ewm(span=9).mean()
features['macd_histogram'] = features['macd'] - features['macd_signal']
features['macd_bullish'] = (features['macd'] > features['macd_signal']).astype(int)
return features
def extract_volume_features(self, df):
"""提取成交量特征"""
features = pd.DataFrame(index=df.index)
# 基础成交量特征
features['volume_sma_20'] = df['volume'].rolling(20).mean()
features['volume_ratio'] = df['volume'] / features['volume_sma_20']
features['volume_change'] = df['volume'].pct_change()
# 价量关系
price_change = df['close'].pct_change()
features['price_volume_corr'] = price_change.rolling(20).corr(df['volume'].pct_change())
# OBV (On Balance Volume)
obv = np.where(df['close'] > df['close'].shift(1), df['volume'],
np.where(df['close'] < df['close'].shift(1), -df['volume'], 0))
features['obv'] = obv.cumsum()
features['obv_sma'] = features['obv'].rolling(20).mean()
# 成交量加权平均价格 (VWAP)
typical_price = (df['high'] + df['low'] + df['close']) / 3
features['vwap'] = (typical_price * df['volume']).rolling(20).sum() / df['volume'].rolling(20).sum()
features['price_to_vwap'] = df['close'] / features['vwap'] - 1
return features
时间特征
1. 周期性特征
class TimeFeatures:
def __init__(self):
pass
def extract_cyclical_features(self, df):
"""提取周期性时间特征"""
features = pd.DataFrame(index=df.index)
# 确保索引是datetime类型
if not isinstance(df.index, pd.DatetimeIndex):
df.index = pd.to_datetime(df.index)
# 小时特征(对于小时级数据)
features['hour'] = df.index.hour
features['hour_sin'] = np.sin(2 * np.pi * features['hour'] / 24)
features['hour_cos'] = np.cos(2 * np.pi * features['hour'] / 24)
# 星期特征
features['dayofweek'] = df.index.dayofweek
features['dow_sin'] = np.sin(2 * np.pi * features['dayofweek'] / 7)
features['dow_cos'] = np.cos(2 * np.pi * features['dayofweek'] / 7)
# 月份特征
features['month'] = df.index.month
features['month_sin'] = np.sin(2 * np.pi * features['month'] / 12)
features['month_cos'] = np.cos(2 * np.pi * features['month'] / 12)
# 季度特征
features['quarter'] = df.index.quarter
features['quarter_sin'] = np.sin(2 * np.pi * features['quarter'] / 4)
features['quarter_cos'] = np.cos(2 * np.pi * features['quarter'] / 4)
return features
def extract_market_session_features(self, df):
"""提取市场时段特征"""
features = pd.DataFrame(index=df.index)
# 加密货币市场24小时开放,但可以定义活跃时段
hour = df.index.hour
# 亚洲时段 (UTC 0-8)
features['asian_session'] = ((hour >= 0) & (hour < 8)).astype(int)
# 欧洲时段 (UTC 8-16)
features['european_session'] = ((hour >= 8) & (hour < 16)).astype(int)
# 美洲时段 (UTC 16-24)
features['american_session'] = ((hour >= 16) & (hour < 24)).astype(int)
# 周末特征
features['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
return features
def extract_lag_features(self, df, lags=[1, 2, 3, 5, 10]):
"""提取滞后特征"""
features = pd.DataFrame(index=df.index)
# 价格滞后特征
for lag in lags:
features[f'close_lag_{lag}'] = df['close'].shift(lag)
features[f'return_lag_{lag}'] = df['close'].pct_change().shift(lag)
features[f'volume_lag_{lag}'] = df['volume'].shift(lag)
# 滚动统计特征
returns = df['close'].pct_change()
for window in [5, 10, 20]:
features[f'return_mean_{window}'] = returns.rolling(window).mean()
features[f'return_std_{window}'] = returns.rolling(window).std()
features[f'return_skew_{window}'] = returns.rolling(window).skew()
features[f'return_kurt_{window}'] = returns.rolling(window).kurt()
return features
2. 趋势特征
class TrendFeatures:
def __init__(self):
pass
def extract_trend_features(self, df, windows=[10, 20, 50]):
"""提取趋势特征"""
features = pd.DataFrame(index=df.index)
for window in windows:
# 线性趋势
features[f'trend_slope_{window}'] = self.calculate_trend_slope(df['close'], window)
# 趋势强度
features[f'trend_strength_{window}'] = self.calculate_trend_strength(df['close'], window)
# 趋势一致性
features[f'trend_consistency_{window}'] = self.calculate_trend_consistency(df['close'], window)
return features
def calculate_trend_slope(self, series, window):
"""计算趋势斜率"""
def slope(y):
if len(y) < 2:
return 0
x = np.arange(len(y))
return np.polyfit(x, y, 1)[0]
return series.rolling(window).apply(slope, raw=False)
def calculate_trend_strength(self, series, window):
"""计算趋势强度"""
def r_squared(y):
if len(y) < 2:
return 0
x = np.arange(len(y))
try:
slope, intercept = np.polyfit(x, y, 1)
y_pred = slope * x + intercept
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
return 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
except:
return 0
return series.rolling(window).apply(r_squared, raw=False)
def calculate_trend_consistency(self, series, window):
"""计算趋势一致性"""
returns = series.pct_change()
def consistency(ret):
if len(ret) < 2:
return 0
positive_count = (ret > 0).sum()
return abs(positive_count / len(ret) - 0.5) * 2
return returns.rolling(window).apply(consistency, raw=False)
高级特征
1. 统计特征
class StatisticalFeatures:
def __init__(self):
pass
def extract_distribution_features(self, df, windows=[20, 50]):
"""提取分布特征"""
features = pd.DataFrame(index=df.index)
returns = df['close'].pct_change()
for window in windows:
# 基础统计量
features[f'return_mean_{window}'] = returns.rolling(window).mean()
features[f'return_std_{window}'] = returns.rolling(window).std()
features[f'return_skew_{window}'] = returns.rolling(window).skew()
features[f'return_kurt_{window}'] = returns.rolling(window).kurt()
# 分位数特征
features[f'return_q25_{window}'] = returns.rolling(window).quantile(0.25)
features[f'return_q75_{window}'] = returns.rolling(window).quantile(0.75)
features[f'return_iqr_{window}'] = (
features[f'return_q75_{window}'] - features[f'return_q25_{window}']
)
# 极值特征
features[f'return_min_{window}'] = returns.rolling(window).min()
features[f'return_max_{window}'] = returns.rolling(window).max()
features[f'return_range_{window}'] = (
features[f'return_max_{window}'] - features[f'return_min_{window}']
)
return features
def extract_entropy_features(self, df, windows=[20, 50]):
"""提取熵特征"""
features = pd.DataFrame(index=df.index)
returns = df['close'].pct_change()
for window in windows:
# 近似熵
features[f'approx_entropy_{window}'] = self.calculate_approximate_entropy(
returns, window
)
# 样本熵
features[f'sample_entropy_{window}'] = self.calculate_sample_entropy(
returns, window
)
return features
def calculate_approximate_entropy(self, series, window, m=2, r=0.2):
"""计算近似熵"""
def approx_entropy(data):
if len(data) < m + 1:
return 0
def _maxdist(xi, xj, m):
return max([abs(ua - va) for ua, va in zip(xi, xj)])
def _phi(m):
patterns = np.array([data[i:i + m] for i in range(len(data) - m + 1)])
C = np.zeros(len(patterns))
for i in range(len(patterns)):
template = patterns[i]
matches = sum([1 for pattern in patterns
if _maxdist(template, pattern, m) <= r])
C[i] = matches / float(len(patterns))
phi = np.mean(np.log(C))
return phi
return _phi(m) - _phi(m + 1)
return series.rolling(window).apply(approx_entropy, raw=False)
def calculate_sample_entropy(self, series, window, m=2, r=0.2):
"""计算样本熵"""
# 简化版本的样本熵计算
def sample_entropy(data):
if len(data) < m + 1:
return 0
# 这里使用简化的计算方法
# 实际应用中可能需要更复杂的实现
return np.std(data) / (np.mean(np.abs(data)) + 1e-8)
return series.rolling(window).apply(sample_entropy, raw=False)
2. 频域特征
class FrequencyFeatures:
def __init__(self):
pass
def extract_fft_features(self, df, window=50):
"""提取FFT特征"""
features = pd.DataFrame(index=df.index)
returns = df['close'].pct_change().fillna(0)
def fft_features(data):
if len(data) < window:
return [0, 0, 0, 0]
# 计算FFT
fft_values = np.fft.fft(data)
fft_freq = np.fft.fftfreq(len(data))
# 功率谱密度
power_spectrum = np.abs(fft_values) ** 2
# 主要频率成分
dominant_freq_idx = np.argmax(power_spectrum[1:len(power_spectrum)//2]) + 1
dominant_freq = fft_freq[dominant_freq_idx]
dominant_power = power_spectrum[dominant_freq_idx]
# 频谱质心
freqs = fft_freq[:len(fft_freq)//2]
powers = power_spectrum[:len(power_spectrum)//2]
spectral_centroid = np.sum(freqs * powers) / np.sum(powers) if np.sum(powers) > 0 else 0
# 频谱带宽
spectral_bandwidth = np.sqrt(
np.sum(((freqs - spectral_centroid) ** 2) * powers) / np.sum(powers)
) if np.sum(powers) > 0 else 0
return [dominant_freq, dominant_power, spectral_centroid, spectral_bandwidth]
# 滚动计算FFT特征
fft_results = returns.rolling(window).apply(
lambda x: fft_features(x.values), raw=False, result_type='expand'
)
if fft_results is not None and len(fft_results.columns) >= 4:
features['dominant_frequency'] = fft_results.iloc[:, 0]
features['dominant_power'] = fft_results.iloc[:, 1]
features['spectral_centroid'] = fft_results.iloc[:, 2]
features['spectral_bandwidth'] = fft_results.iloc[:, 3]
return features
def extract_wavelet_features(self, df, window=50):
"""提取小波特征"""
features = pd.DataFrame(index=df.index)
try:
import pywt
returns = df['close'].pct_change().fillna(0)
def wavelet_features(data):
if len(data) < window:
return [0, 0, 0, 0]
# 小波分解
coeffs = pywt.wavedec(data, 'db4', level=3)
# 提取特征
cA3, cD3, cD2, cD1 = coeffs
# 各层能量
energy_cA3 = np.sum(cA3 ** 2)
energy_cD3 = np.sum(cD3 ** 2)
energy_cD2 = np.sum(cD2 ** 2)
energy_cD1 = np.sum(cD1 ** 2)
total_energy = energy_cA3 + energy_cD3 + energy_cD2 + energy_cD1
if total_energy > 0:
return [
energy_cA3 / total_energy,
energy_cD3 / total_energy,
energy_cD2 / total_energy,
energy_cD1 / total_energy
]
else:
return [0, 0, 0, 0]
# 滚动计算小波特征
wavelet_results = returns.rolling(window).apply(
lambda x: wavelet_features(x.values), raw=False, result_type='expand'
)
if wavelet_results is not None and len(wavelet_results.columns) >= 4:
features['wavelet_cA3'] = wavelet_results.iloc[:, 0]
features['wavelet_cD3'] = wavelet_results.iloc[:, 1]
features['wavelet_cD2'] = wavelet_results.iloc[:, 2]
features['wavelet_cD1'] = wavelet_results.iloc[:, 3]
except ImportError:
print("PyWavelets not installed. Skipping wavelet features.")
return features
特征选择
1. 统计方法特征选择
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
from sklearn.ensemble import RandomForestRegressor
import scipy.stats as stats
class FeatureSelector:
def __init__(self):
self.selected_features = []
self.feature_scores = {}
def correlation_filter(self, X, y, threshold=0.05):
"""基于相关性的特征选择"""
correlations = {}
for column in X.columns:
# 计算与目标变量的相关性
corr, p_value = stats.pearsonr(X[column].dropna(), y[X[column].dropna().index])
correlations[column] = {'correlation': abs(corr), 'p_value': p_value}
# 选择显著相关的特征
selected = [col for col, stats in correlations.items()
if stats['p_value'] < threshold]
self.feature_scores['correlation'] = correlations
return selected
def mutual_information_selection(self, X, y, k=50):
"""基于互信息的特征选择"""
# 处理缺失值
X_clean = X.fillna(X.mean())
y_clean = y[X_clean.index]
# 计算互信息
mi_scores = mutual_info_regression(X_clean, y_clean)
# 选择前k个特征
feature_mi = dict(zip(X.columns, mi_scores))
selected = sorted(feature_mi.items(), key=lambda x: x[1], reverse=True)[:k]
self.feature_scores['mutual_information'] = feature_mi
return [feature[0] for feature in selected]
def random_forest_selection(self, X, y, n_estimators=100, max_features=50):
"""基于随机森林重要性的特征选择"""
# 处理缺失值
X_clean = X.fillna(X.mean())
y_clean = y[X_clean.index]
# 训练随机森林
rf = RandomForestRegressor(n_estimators=n_estimators, random_state=42)
rf.fit(X_clean, y_clean)
# 获取特征重要性
feature_importance = dict(zip(X.columns, rf.feature_importances_))
# 选择最重要的特征
selected = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:max_features]
self.feature_scores['random_forest'] = feature_importance
return [feature[0] for feature in selected]
def remove_highly_correlated_features(self, X, threshold=0.95):
"""移除高度相关的特征"""
# 计算相关性矩阵
corr_matrix = X.corr().abs()
# 找到高度相关的特征对
upper_triangle = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
# 找到需要删除的特征
to_drop = [column for column in upper_triangle.columns
if any(upper_triangle[column] > threshold)]
return [col for col in X.columns if col not in to_drop]
def ensemble_feature_selection(self, X, y, methods=['correlation', 'mutual_info', 'random_forest']):
"""集成多种方法的特征选择"""
all_selected = {}
if 'correlation' in methods:
corr_features = self.correlation_filter(X, y)
for feature in corr_features:
all_selected[feature] = all_selected.get(feature, 0) + 1
if 'mutual_info' in methods:
mi_features = self.mutual_information_selection(X, y)
for feature in mi_features:
all_selected[feature] = all_selected.get(feature, 0) + 1
if 'random_forest' in methods:
rf_features = self.random_forest_selection(X, y)
for feature in rf_features:
all_selected[feature] = all_selected.get(feature, 0) + 1
# 选择被多种方法选中的特征
min_votes = len(methods) // 2 + 1 # 至少被一半以上的方法选中
final_selected = [feature for feature, votes in all_selected.items()
if votes >= min_votes]
return final_selected
2. 特征重要性分析
class FeatureImportanceAnalyzer:
def __init__(self):
self.importance_scores = {}
def analyze_feature_importance(self, X, y, method='all'):
"""分析特征重要性"""
results = {}
if method in ['all', 'permutation']:
results['permutation'] = self.permutation_importance(X, y)
if method in ['all', 'shap']:
results['shap'] = self.shap_importance(X, y)
if method in ['all', 'lime']:
results['lime'] = self.lime_importance(X, y)
return results
def permutation_importance(self, X, y):
"""排列重要性分析"""
from sklearn.inspection import permutation_importance
from sklearn.ensemble import RandomForestRegressor
# 训练基础模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
X_clean = X.fillna(X.mean())
y_clean = y[X_clean.index]
model.fit(X_clean, y_clean)
# 计算排列重要性
perm_importance = permutation_importance(
model, X_clean, y_clean, n_repeats=10, random_state=42
)
importance_dict = dict(zip(X.columns, perm_importance.importances_mean))
return sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)
def shap_importance(self, X, y):
"""SHAP重要性分析"""
try:
import shap
from sklearn.ensemble import RandomForestRegressor
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
X_clean = X.fillna(X.mean())
y_clean = y[X_clean.index]
model.fit(X_clean, y_clean)
# 计算SHAP值
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_clean.sample(min(1000, len(X_clean))))
# 计算特征重要性
importance_dict = dict(zip(X.columns, np.abs(shap_values).mean(0)))
return sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)
except ImportError:
print("SHAP not installed. Skipping SHAP analysis.")
return []
def create_importance_report(self, importance_results):
"""创建特征重要性报告"""
report = "特征重要性分析报告\n"
report += "=" * 50 + "\n\n"
for method, results in importance_results.items():
report += f"{method.upper()} 重要性排名:\n"
report += "-" * 30 + "\n"
for i, (feature, score) in enumerate(results[:20], 1):
report += f"{i:2d}. {feature:<30} {score:.6f}\n"
report += "\n"
return report
特征工程最佳实践
1. 特征工程流程
- 数据探索: 理解数据分布和特征
- 特征提取: 从原始数据提取基础特征
- 特征构造: 创建组合和变换特征
- 特征选择: 选择最有价值的特征
- 特征验证: 验证特征的稳定性和有效性
2. 常见陷阱
陷阱 | 描述 | 解决方案 |
---|---|---|
前瞻偏差 | 使用未来信息构造特征 | 严格控制时间顺序 |
过拟合 | 特征过多导致模型过拟合 | 特征选择和交叉验证 |
数据泄露 | 特征包含目标变量信息 | 仔细检查特征构造逻辑 |
特征不稳定 | 特征在不同时期表现差异大 | 滚动验证和稳定性测试 |
3. 性能优化
class FeatureEngineering:
def __init__(self):
self.feature_cache = {}
self.feature_pipeline = []
def build_feature_pipeline(self, feature_extractors):
"""构建特征工程流水线"""
self.feature_pipeline = feature_extractors
def extract_all_features(self, df, use_cache=True):
"""提取所有特征"""
if use_cache and 'all_features' in self.feature_cache:
return self.feature_cache['all_features']
all_features = pd.DataFrame(index=df.index)
for extractor in self.feature_pipeline:
features = extractor.extract_features(df)
all_features = pd.concat([all_features, features], axis=1)
if use_cache:
self.feature_cache['all_features'] = all_features
return all_features
def incremental_feature_update(self, new_data):
"""增量更新特征"""
# 只计算新数据的特征,提高效率
new_features = self.extract_all_features(new_data, use_cache=False)
# 更新缓存
if 'all_features' in self.feature_cache:
self.feature_cache['all_features'] = pd.concat([
self.feature_cache['all_features'], new_features
])
return new_features
下一步
掌握了特征工程后,建议您:
重要提示: 特征工程是一个迭代过程,需要结合领域知识和数据洞察。好的特征往往比复杂的模型更重要,请花时间深入理解数据和市场机制。