CNN은 convolution layer, pooling layer, fully connected layer로 주로 구성된다. 그 중 convolution layer와 pooling layer는 두 개의 특수 신경망 레이어로 주로 유효 특징 추출을 담당한다. 원본 데이터에서 벡터를 추출하고 원본 기능의 공간적 정보를 마이닝할 수 있다. 가속도계와 같은 1차원 데이터를 1차원 컨볼루션 신경망(Conv1D)을 사용하여 서로 다른 변수를 결합하고 변수 간의 공간적 상관 관계를 추출한다.
Conv1D는 그림 2와 같이 한 차원에 대해 커널 슬라이딩을 통해 공간적 상관 관계를 추출한다.
2. LSTM
LSTM은 시계열 데이터를 처리하기 위한 고전적인 딥 러닝 네트워크이다. 순환 신경망이 긴 시계열을 어느 정도 처리할 때 기울기 소실(Vanishing gradient) 문제를 해결하는 순환 신경망의 변형입니다. 장기 및 단기 기억 네트워크의 셀 구조는 그림 3과 같이 망각 게이트, 입력 게이트 및 출력 게이트가 있다.
3. Conv1D + LSTM
Conv1D + LSTM 모델은 그림 1과 같이 Conv1D 기반의 특징 융합 레이어, LSTM 기반 시계열 예측 레이어, output layer로 구성된다. Input layer에는 그림 0과 같은 시공간적 특성행렬이 입력된다. 각 변수는 CNN에 의해 가중치가 부여되고 변수 간의 정보가 결합된다. 과적합(Overfitting)을 피하기 위해 dropout layer가 네트워크에 추가됩니다. 모델 파라미터는 그림 4와 같이 구성한다.
모델을 구성하게 되면 아래의 코드와 같이 구현할 수 있다.
import torch.nn as nn
class Conv1d_LSTM(nn.Module):
def __init__(self, in_channel=3, out_channel=1):
super(Conv1d_LSTM, self).__init__()
self.conv1d_1 = nn.Conv1d(in_channels=in_channel,
out_channels=16,
kernel_size=3,
stride=1,
padding=1)
self.conv1d_2 = nn.Conv1d(in_channels=16,
out_channels=32,
kernel_size=3,
stride=1,
padding=1)
self.lstm = nn.LSTM(input_size=32,
hidden_size=50,
num_layers=1,
bias=True,
bidirectional=False,
batch_first=True)
self.dropout = nn.Dropout(0.5)
self.dense1 = nn.Linear(50, 32)
self.dense2 = nn.Linear(32, out_channel)
def forward(self, x):
# Raw x shape : (B, S, F) => (B, 10, 3)
# Shape : (B, F, S) => (B, 3, 10)
x = x.transpose(1, 2)
# Shape : (B, F, S) == (B, C, S) // C = channel => (B, 16, 10)
x = self.conv1d_1(x)
# Shape : (B, C, S) => (B, 32, 10)
x = self.conv1d_2(x)
# Shape : (B, S, C) == (B, S, F) => (B, 10, 32)
x = x.transpose(1, 2)
self.lstm.flatten_parameters()
# Shape : (B, S, H) // H = hidden_size => (B, 10, 50)
_, (hidden, _) = self.lstm(x)
# Shape : (B, H) // -1 means the last sequence => (B, 50)
x = hidden[-1]
# Shape : (B, H) => (B, 50)
x = self.dropout(x)
# Shape : (B, 32)
x = self.fc_layer1(x)
# Shape : (B, O) // O = output => (B, 1)
x = self.fc_layer2(x)
return x
CNN에서 CIR Featur을 추출, Redundant information을 제거하고, LSTM을 이용하여 분류합니다.
( CNN+stacked-LSTM Accuracy : 82.14% )
Implemnet ( Dataset : df_uwb_data 준비 )
1. Import
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import random
import uwb_dataset
print("Pytorch Version :", torch.__version__) # Pytorch Version : 1.7.1+cu110
writer = SummaryWriter('runs/UWB_CIR_Classfication')
%matplotlib inline
import numpy as np
import pandas as pd
import pandas_datareader.data as pdr
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
no module pandas_datareaderno module named 'pandas_datareader'
pandas가 깔려 있는데, 위 문구가 뜬다면 pip install pandas_datareader로 다운로드합니다.
그리고 학습된 모델이 성능을 확인하기 위해서 위 데이터(현재 약 5296개)를 Train(학습하고자 하는 데이터)를 0부터 4499까지, Test(성능 테스트하는 데이터)는 4500부터 5295개 까지 데이터로 분류합니다.
오늘자 대략, 노란색 선 정도까지 데이터를 가지고 학습을 하고, 노란색 선 이후부터 예측을 할 것입니다.
과연 내려가고 올라가는 포인트를 잘 예측할 수 있을지 궁금합니다.
3. 데이터셋 준비하기
"""
저도 주식을 잘 모르기 때문에 참고해주시면 좋을 것 같습니다.
open 시가
high 고가
low 저가
close 종가
volume 거래량
Adj Close 주식의 분할, 배당, 배분 등을 고려해 조정한 종가
확실한건 거래량(Volume)은 데이터에서 제하는 것이 중요하고,
Y 데이터를 Adj Close로 정합니다. (종가로 해도 된다고 생각합니다.)
"""
X = df.drop(columns='Volume')
y = df.iloc[:, 5:6]
print(X)
print(y)
"""
학습이 잘되기 위해 데이터 정규화
StandardScaler 각 특징의 평균을 0, 분산을 1이 되도록 변경
MinMaxScaler 최대/최소값이 각각 1, 0이 되도록 변경
"""
from sklearn.preprocessing import StandardScaler, MinMaxScaler
mm = MinMaxScaler()
ss = StandardScaler()
X_ss = ss.fit_transform(X)
y_mm = mm.fit_transform(y)
# Train Data
X_train = X_ss[:4500, :]
X_test = X_ss[4500:, :]
# Test Data
"""
( 굳이 없어도 된다. 하지만 얼마나 예측데이터와 실제 데이터의 정확도를 확인하기 위해
from sklearn.metrics import accuracy_score 를 통해 정확한 값으로 확인할 수 있다. )
"""
y_train = y_mm[:4500, :]
y_test = y_mm[4500:, :]
print("Training Shape", X_train.shape, y_train.shape)
print("Testing Shape", X_test.shape, y_test.shape)
"""
torch Variable에는 3개의 형태가 있다.
data, grad, grad_fn 한 번 구글에 찾아서 공부해보길 바랍니다.
"""
X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))
y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test))
X_train_tensors_final = torch.reshape(X_train_tensors, (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))
X_test_tensors_final = torch.reshape(X_test_tensors, (X_test_tensors.shape[0], 1, X_test_tensors.shape[1]))
print("Training Shape", X_train_tensors_final.shape, y_train_tensors.shape)
print("Testing Shape", X_test_tensors_final.shape, y_test_tensors.shape)
4. GPU 준비하기 (없으면 CPU로 돌리면 됩니다.)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # device
print(torch.cuda.get_device_name(0))
5. LSTM 네트워크 구성하기
class LSTM1(nn.Module):
def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length):
super(LSTM1, self).__init__()
self.num_classes = num_classes #number of classes
self.num_layers = num_layers #number of layers
self.input_size = input_size #input size
self.hidden_size = hidden_size #hidden state
self.seq_length = seq_length #sequence length
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers, batch_first=True) #lstm
self.fc_1 = nn.Linear(hidden_size, 128) #fully connected 1
self.fc = nn.Linear(128, num_classes) #fully connected last layer
self.relu = nn.ReLU()
def forward(self,x):
h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device) #hidden state
c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device) #internal state
# Propagate input through LSTM
output, (hn, cn) = self.lstm(x, (h_0, c_0)) #lstm with input, hidden, and internal state
hn = hn.view(-1, self.hidden_size) #reshaping the data for Dense layer next
out = self.relu(hn)
out = self.fc_1(out) #first Dense
out = self.relu(out) #relu
out = self.fc(out) #Final Output
return out
위 코드는 복잡해 보이지만, 실상 하나씩 확인해보면 굉장히 연산이 적은 네트워크입니다.
시계열 데이터이지만, 간단한 구성을 위해 Sequence Length도 1이고, LSTM Layer도 1이기 때문에 굉장히 빨리 끝납니다. 아마 본문 작성자가 CPU환경에서도 쉽게 따라 할 수 있게 간단하게 작성한 것 같습니다.
num_epochs = 30000 #1000 epochs
learning_rate = 0.00001 #0.001 lr
input_size = 5 #number of features
hidden_size = 2 #number of features in hidden state
num_layers = 1 #number of stacked lstm layers
num_classes = 1 #number of output classes
for epoch in range(num_epochs):
outputs = lstm1.forward(X_train_tensors_final.to(device)) #forward pass
optimizer.zero_grad() #caluclate the gradient, manually setting to 0
# obtain the loss function
loss = loss_function(outputs, y_train_tensors.to(device))
loss.backward() #calculates the loss of the loss function
optimizer.step() #improve from loss, i.e backprop
if epoch % 100 == 0:
print("Epoch: %d, loss: %1.5f" % (epoch, loss.item()))