import matplotlib.pyplot as plt
from meteostat import Stations, Daily
plt.rcParams['font.sans-serif'] = 'SimHei' # 設(shè)置中文顯示
plt.rcParams['axes.unicode_minus'] = False
chongqing = (29.56301, 106.55156)
# 查找重慶附近的氣象站
stations = Stations()
nearby_stations = stations.nearby(*chongqing).fetch(5)
# 獲取最近的氣象站的ID
station_id = nearby_stations.index[0]
# 設(shè)置時(shí)間范圍
start = datetime(2008, 1, 1)
end = datetime(2024, 5, 25)
# 獲取每日數(shù)據(jù)
data = Daily(station_id, start, end)
data = data.fetch()
data.head()
利用python meteostat庫(kù)對(duì)重慶進(jìn)行氣象數(shù)據(jù)訪問(wèn),詳細(xì)的調(diào)取方法參考往期文章利用python meteostat庫(kù)對(duì)全球氣象數(shù)據(jù)訪問(wèn),獲取歷史氣象數(shù)據(jù)
import pandas as pd
df = pd.DataFrame()
df['tavg'] = data['tavg']
# 定義劃分比例
train_ratio = 0.7
val_ratio = 0.1
test_ratio = 0.2
# 計(jì)算劃分的索引
train_split = int(train_ratio * len(df))
val_split = int((train_ratio + val_ratio) * len(df))
# 劃分?jǐn)?shù)據(jù)集
train_set = df.iloc[:train_split]
val_set = df.iloc[train_split:val_split]
test_set = df.iloc[val_split:]
plt.figure(figsize=(15, 10))
plt.subplot(3,1,1)
plt.plot(train_set, color='c', alpha=0.3)
plt.title('train時(shí)序圖')
plt.subplot(3,1,2)
plt.plot(val_set, color='b', alpha=0.3)
plt.title('val時(shí)序圖')
plt.subplot(3,1,3)
plt.plot(test_set, color='r', alpha=0.3)
plt.title('test時(shí)序圖')
plt.xticks(rotation=45)
plt.show()
在這里進(jìn)行單序列的時(shí)序建模,對(duì)tavg列構(gòu)建Attention+lstm模型
from sklearn.preprocessing import MinMaxScaler
def normalize_dataframe(train_set, val_set, test_set):
scaler = MinMaxScaler()
scaler.fit(train_set) # 在訓(xùn)練集上擬合歸一化模型
train = pd.DataFrame(scaler.transform(train_set), columns=train_set.columns, index = train_set.index)
val = pd.DataFrame(scaler.transform(val_set), columns=val_set.columns, index = val_set.index)
test = pd.DataFrame(scaler.transform(test_set), columns=test_set.columns, index = test_set.index)
return train, val, test
train, val, test = normalize_dataframe(train_set, val_set, test_set)
對(duì)測(cè)試集、驗(yàn)證集、測(cè)試集采取歸一化處理:
這里采用歸一化是因?yàn)樵紨?shù)據(jù)本身不存在量綱問(wèn)題,且為單序列不要考慮特征之間的尺度差異,但是通常來(lái)說(shuō)標(biāo)準(zhǔn)化更適用于Attention,讓數(shù)據(jù)具有零均值和單位方差為什么標(biāo)準(zhǔn)化更適合注意力機(jī)制?
處理不同量綱的數(shù)據(jù):注意力機(jī)制通常會(huì)處理不同特征的組合,標(biāo)準(zhǔn)化可以消除不同特征量綱的影響,使得特征在相同的尺度上
優(yōu)化收斂:標(biāo)準(zhǔn)化的數(shù)據(jù)通常收斂更快,因?yàn)樗鼈兊臄?shù)值范圍在訓(xùn)練過(guò)程中更加穩(wěn)定對(duì)梯度的影響:標(biāo)準(zhǔn)化后的數(shù)據(jù)通常會(huì)使梯度分布更加均勻,從而避免梯度消失或爆炸的問(wèn)題
標(biāo)準(zhǔn)化公式:
這里公式參數(shù)的詳解請(qǐng)移步文章特征工程——數(shù)據(jù)轉(zhuǎn)換,當(dāng)采用標(biāo)準(zhǔn)化時(shí)代碼如下
# 數(shù)據(jù)標(biāo)準(zhǔn)化
# 計(jì)算均值和標(biāo)準(zhǔn)差
mean = train_set.mean()
std = train_set.std()
# 對(duì)數(shù)據(jù)進(jìn)行標(biāo)準(zhǔn)化
train = (train_set - mean) / std
val = (val_set - mean) / std
test = (test_set - mean) / std
對(duì)于這個(gè)數(shù)據(jù)作者也采用了標(biāo)準(zhǔn)化后進(jìn)行后續(xù)的模型構(gòu)建,在測(cè)試集上所得評(píng)價(jià)指標(biāo)如下,方便與后續(xù)采用的歸一化后進(jìn)行模型構(gòu)建所得評(píng)價(jià)指標(biāo)進(jìn)行比較【實(shí)際上最后得出的結(jié)果是在該數(shù)據(jù)集上歸一化優(yōu)于標(biāo)準(zhǔn)化】
import numpy as np
def prepare_data(data, win_size):
X = []
y = []
for i in range(len(data) - win_size):
temp_x = data[i:i + win_size]
temp_y = data[i + win_size]
X.append(temp_x)
y.append(temp_y)
X = np.asarray(X)
y = np.asarray(y)
X = np.expand_dims(X, axis=-1)
return X, y
win_size = 30
# 訓(xùn)練集
X_train, y_train= prepare_data(train['tavg'].values, win_size)
# 驗(yàn)證集
X_val, y_val= prepare_data(val['tavg'].values, win_size)
# 測(cè)試集
X_test, y_test = prepare_data(test['tavg'].values, win_size)
print("訓(xùn)練集形狀:", X_train.shape, y_train.shape)
print("驗(yàn)證集形狀:", X_val.shape, y_val.shape)
print("測(cè)試集形狀:", X_test.shape, y_test.shape)
不做過(guò)多贅述時(shí)間窗口劃分:時(shí)序預(yù)測(cè)模型的多種形式解析
數(shù)據(jù)集轉(zhuǎn)換為 PyTorch 張量
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Subset
#device 表示了一個(gè)用于存儲(chǔ)和計(jì)算張量的設(shè)備。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 檢查是否有可用的 GPU
# 將NumPy數(shù)組轉(zhuǎn)換為PyTorch張量
#將 numpy 數(shù)組 X_train_ts 轉(zhuǎn)換為 PyTorch 的張量,并指定數(shù)據(jù)類型為 torch.float32,將張亮放置在指定的設(shè)備上進(jìn)行存儲(chǔ)和計(jì)算
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_validation_tensor=torch.tensor(X_val, dtype=torch.float32).to(device)
y_validation_tensor= torch.tensor(y_val,dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
# 創(chuàng)建訓(xùn)練集、驗(yàn)證集和測(cè)試集數(shù)據(jù)集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
validation_dataset = TensorDataset(X_validation_tensor, y_validation_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
# 定義批量大小
batch_size = 64 #批量大小,算力越強(qiáng),可以設(shè)置越大,可自定義 ,常見(jiàn)的批量大小通常在32到256之間
# 創(chuàng)建數(shù)據(jù)加載器 shuffle=True 表示在每個(gè) epoch 開(kāi)始時(shí)將數(shù)據(jù)打亂
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 打印訓(xùn)練數(shù)據(jù)形狀
dataiter = iter(train_loader)
sample_x, sample_y = next(dataiter) # 修改這里,使用next方法手動(dòng)獲取一個(gè)批次的數(shù)據(jù)
print('Sample input shape: ', sample_x.shape)
print('Sample output shape: ', sample_y.shape)
數(shù)據(jù)集轉(zhuǎn)換為 PyTorch 張量,并創(chuàng)建了對(duì)應(yīng)的數(shù)據(jù)加載器,方便后續(xù)模型的訓(xùn)練、驗(yàn)證和測(cè)試
# 定義模型參數(shù)字典
model_params = {
'lstm': {
'input_size': X_train.shape[2], # 輸入特征維度
'hidden_size': 256, # LSTM隱藏層維度
'num_layers': 1, # LSTM層數(shù)
'output_size': 1 # 輸出維度
},
'attention': {
'num_heads': 8 # 注意力頭數(shù)
}
}
# 定義多頭注意力層
class MultiHeadAttention(nn.Module):
def __init__(self, hidden_size, num_heads):
super(MultiHeadAttention, self).__init__()
# 定義多頭注意力層
self.attention = nn.MultiheadAttention(hidden_size, num_heads)
def forward(self, lstm_output):
# lstm_output 形狀: (batch_size, seq_length, hidden_size)
# MultiheadAttention 期望的輸入形狀: (seq_length, batch_size, hidden_size)
lstm_output = lstm_output.permute(1, 0, 2) # 轉(zhuǎn)置維度
attn_output, attn_weights = self.attention(lstm_output, lstm_output, lstm_output)
attn_output = attn_output.permute(1, 0, 2) # 轉(zhuǎn)置回原來(lái)的維度
return attn_output, attn_weights
# 定義 Attention_LSTM 模型
class Attention_LSTM(nn.Module):
def __init__(self, lstm_params, attention_params):
super(Attention_LSTM, self).__init__()
self.hidden_size = lstm_params['hidden_size']
self.num_layers = lstm_params['num_layers']
# 定義LSTM層
self.lstm = nn.LSTM(lstm_params['input_size'], lstm_params['hidden_size'], lstm_params['num_layers'], batch_first=True)
# 定義多頭注意力層
self.attention = MultiHeadAttention(lstm_params['hidden_size'], attention_params['num_heads'])
# 定義全連接層
self.fc1 = nn.Linear(lstm_params['hidden_size'], 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 32)
self.fc4 = nn.Linear(32, 16)
self.fc5 = nn.Linear(16, lstm_params['output_size'])
self.relu = nn.ReLU() # 激活函數(shù)ReLU
def forward(self, x):
# 初始化隱藏狀態(tài)和細(xì)胞狀態(tài)
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# LSTM前向傳播
lstm_out, _ = self.lstm(x, (h0, c0))
# 應(yīng)用多頭注意力層
attn_out, _ = self.attention(lstm_out)
# 取最后一個(gè)時(shí)間步的輸出
out = self.relu(attn_out[:, -1, :])
# 全連接層前向傳播
out = self.relu(self.fc1(out)) # 全連接層1
out = self.relu(self.fc2(out)) # 全連接層2
out = self.relu(self.fc3(out)) # 全連接層3
out = self.relu(self.fc4(out)) # 全連接層4
out = self.fc5(out) # 輸出層
return out
# 模型參數(shù)
lstm_params = model_params['lstm']
attention_params = model_params['attention']
# 實(shí)例化模型
model = Attention_LSTM(lstm_params, attention_params).to(device)
print(model)
模型是一個(gè)具有注意力機(jī)制的長(zhǎng)短期記憶網(wǎng)絡(luò)(LSTM),下面是這個(gè)模型的各個(gè)組成部分的解釋:
在這里需要注意Attention的參數(shù)num_heads,使用注意力頭數(shù)時(shí)存在一些技巧:
有時(shí)為了避免使用Attention時(shí)產(chǎn)生過(guò)擬合往往會(huì)在其后面加上隨機(jī)失活層,當(dāng)然也可以通過(guò)優(yōu)化器添加 L2 正則化項(xiàng)來(lái)防止模型過(guò)擬合,如下代碼采用隨機(jī)失活層Dropout()
# 定義多頭注意力層
class MultiHeadAttention(nn.Module):
def __init__(self, hidden_size, num_heads):
super(MultiHeadAttention, self).__init__()
# 定義多頭注意力層
self.attention = nn.MultiheadAttention(hidden_size, num_heads)
self.dropout = nn.Dropout(p=0.1) # Dropout層,防止過(guò)擬合
def forward(self, lstm_output):
# lstm_output 形狀: (batch_size, seq_length, hidden_size)
# MultiheadAttention 期望的輸入形狀: (seq_length, batch_size, hidden_size)
lstm_output = lstm_output.permute(1, 0, 2) # 轉(zhuǎn)置維度
attn_output, attn_weights = self.attention(lstm_output, lstm_output, lstm_output)
attn_output = self.dropout(attn_output) # 應(yīng)用Dropout
attn_output = attn_output.permute(1, 0, 2) # 轉(zhuǎn)置回原來(lái)的維度
return attn_output, attn_weights
模型訓(xùn)練
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 訓(xùn)練模型
num_epochs = 150
train_losses = []
val_losses = []
for epoch in range(num_epochs):
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
train_losses.append(train_loss)
model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
outputs = model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
val_loss += loss.item()
val_loss /= len(val_loader)
val_losses.append(val_loss)
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}')
# 繪制損失曲線
plt.figure()
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend()
plt.show()
模型預(yù)測(cè)評(píng)價(jià)
# 保存模型
torch.save(model.state_dict(), 'Attention+lstm.pth')
# 調(diào)用模型
lstm_model = Attention_LSTM(lstm_params, attention_params).to(device)
lstm_model.load_state_dict(torch.load('Attention+lstm.pth'))
lstm_model.eval()
# 在測(cè)試集上進(jìn)行預(yù)測(cè)
predictions = []
lstm_model.eval()
with torch.no_grad():
for inputs, _ in test_loader:
outputs = lstm_model(inputs)
predictions.extend(outputs.cpu().numpy())
predictions = np.array(predictions)
from sklearn import metrics
mse = metrics.mean_squared_error(y_test, np.array([i for arr in predictions for i in arr]))
rmse = np.sqrt(mse)
mae = metrics.mean_absolute_error(y_test, np.array([i for arr in predictions for i in arr]))
from sklearn.metrics import r2_score
r2 = r2_score(y_test, np.array([i for arr in predictions for i in arr]))
print("均方誤差 (MSE):", mse)
print("均方根誤差 (RMSE):", rmse)
print("平均絕對(duì)誤差 (MAE):", mae)
print("擬合優(yōu)度:", r2)
可視化展示
df_max = np.max(train_set)
df_min = np.min(train_set)
plt.figure(figsize=(15,4), dpi =300)
plt.subplot(2,1,1)
plt.plot(train_set, color = 'c', label = '訓(xùn)練集')
plt.plot(val_set, color = 'r', label = '驗(yàn)證集')
plt.plot(test_set, color = 'b', label = '測(cè)試集')
plt.plot(pd.date_range(start='2021-03-15', end='2024-05-25', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '測(cè)試集預(yù)測(cè)')
plt.legend()
plt.subplot(2,1,2)
plt.plot(test_set, color = 'b', label = '測(cè)試集')
plt.plot(pd.date_range(start='2021-03-15', end='2024-05-25', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '測(cè)試集預(yù)測(cè)')
plt.legend()
plt.show()
文章轉(zhuǎn)自微信公眾號(hào)@Python機(jī)器學(xué)習(xí)AI
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)