예를 위해 위 Github에서 0과 1로 이루이진 이진 클래스 데이터 셋이 있다. 이 글은 데이터를 라벨에 맞게 골고루 분리하는 것이므로 데이터 분석은 따로 하지 않는다.
csv 파일은 7개이며, 총 데이터의 개수는 42000개이다.
sklearn 라이브러리에서 train_test_split을 사용하면 간단히 데이터를 분할할 수 있다. 아래 코드에서 간단히 데이터셋을 불러올 수 있다. 이 데이터셋의 경우 nlos가 클래스(혹은 정답 데이터)이다. 아래의 그림을 보게 되면 0, 1, 0, 1, 1...로 무작위로 정렬돼었다. 각 클래스의 개수를 출력하면, 21000개를 확인할 수 있다.
import uwb_dataset
import pandas as pd
columns, data = uwb_dataset.import_from_files()
for item in data:
item[15:] = item[15:]/float(item[2])
print("\nColumns :", columns.shape, sep=" ")
print("Data :", data.shape, sep=" ")
print(type(data))
df_uwb = pd.DataFrame(data=data, columns=columns)
print(df_uwb.head(10))
los_count = df_uwb.query("NLOS == 0")["NLOS"].count()
nlos_count = df_uwb.query("NLOS == 1")["NLOS"].count()
print("Line of Sight Count :", los_count)
print("Non Line of Sight Count :", nlos_count)
"""
결과
./dataset/uwb_dataset_part5.csv
./dataset/uwb_dataset_part7.csv
./dataset/uwb_dataset_part2.csv
./dataset/uwb_dataset_part3.csv
./dataset/uwb_dataset_part4.csv
./dataset/uwb_dataset_part1.csv
./dataset/uwb_dataset_part6.csv
Columns : (1031,)
Data : (42000, 1031)
<class 'numpy.ndarray'>
Line of Sight Count : 21000
Non Line of Sight Count : 21000
"""
파이썬으로 개발을 하면서 느낀 것은 최대한 반복문을 지양하고 라이브러리를 사용하는 것이다. 파이썬은 빠르게 데모 프로그램 구현에 많이 사용하는데 라이브러리가 이를 가능케 한다. 그리고 가장 큰 이유는 내가 만든 코드는 절대 라이브러리를 이길 수 없다는 것이다. 이번에 사용하고자 하는 train_test_split은 sklearn 라이브러리의 함수이다. sklearn는 머신러닝 대표 라이브러리이지만 딥러닝 사용에 있어 문제가 없다.
train_test_split : 배열 또는 행렬을 무작위 학습 및 테스트 하위 집합으로 분할.
-test_size : 0~1 사이의 값으로, 기본 test size는 0.25로 자동으로 train size는 0.75이다. 아래의 그림처럼 데이터를 분할.
- random_state : shuffle 제어하는 인자이다. 모델의 성능을 향상시키기 위해 다양한 방법으로 모델을 디자인한다. 하지만 공정한 검증을 위해 똑같은 데이터셋을 필요로 한다. 그 때 사용하는 인자이다.
import numpy as np
import pandas as pd
import pandas_datareader.data as pdr
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
no module pandas_datareaderno module named 'pandas_datareader'
pandas가 깔려 있는데, 위 문구가 뜬다면 pip install pandas_datareader로 다운로드합니다.
그리고 학습된 모델이 성능을 확인하기 위해서 위 데이터(현재 약 5296개)를 Train(학습하고자 하는 데이터)를 0부터 4499까지, Test(성능 테스트하는 데이터)는 4500부터 5295개 까지 데이터로 분류합니다.
오늘자 대략, 노란색 선 정도까지 데이터를 가지고 학습을 하고, 노란색 선 이후부터 예측을 할 것입니다.
과연 내려가고 올라가는 포인트를 잘 예측할 수 있을지 궁금합니다.
3. 데이터셋 준비하기
"""
저도 주식을 잘 모르기 때문에 참고해주시면 좋을 것 같습니다.
open 시가
high 고가
low 저가
close 종가
volume 거래량
Adj Close 주식의 분할, 배당, 배분 등을 고려해 조정한 종가
확실한건 거래량(Volume)은 데이터에서 제하는 것이 중요하고,
Y 데이터를 Adj Close로 정합니다. (종가로 해도 된다고 생각합니다.)
"""
X = df.drop(columns='Volume')
y = df.iloc[:, 5:6]
print(X)
print(y)
"""
학습이 잘되기 위해 데이터 정규화
StandardScaler 각 특징의 평균을 0, 분산을 1이 되도록 변경
MinMaxScaler 최대/최소값이 각각 1, 0이 되도록 변경
"""
from sklearn.preprocessing import StandardScaler, MinMaxScaler
mm = MinMaxScaler()
ss = StandardScaler()
X_ss = ss.fit_transform(X)
y_mm = mm.fit_transform(y)
# Train Data
X_train = X_ss[:4500, :]
X_test = X_ss[4500:, :]
# Test Data
"""
( 굳이 없어도 된다. 하지만 얼마나 예측데이터와 실제 데이터의 정확도를 확인하기 위해
from sklearn.metrics import accuracy_score 를 통해 정확한 값으로 확인할 수 있다. )
"""
y_train = y_mm[:4500, :]
y_test = y_mm[4500:, :]
print("Training Shape", X_train.shape, y_train.shape)
print("Testing Shape", X_test.shape, y_test.shape)
"""
torch Variable에는 3개의 형태가 있다.
data, grad, grad_fn 한 번 구글에 찾아서 공부해보길 바랍니다.
"""
X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))
y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test))
X_train_tensors_final = torch.reshape(X_train_tensors, (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))
X_test_tensors_final = torch.reshape(X_test_tensors, (X_test_tensors.shape[0], 1, X_test_tensors.shape[1]))
print("Training Shape", X_train_tensors_final.shape, y_train_tensors.shape)
print("Testing Shape", X_test_tensors_final.shape, y_test_tensors.shape)
4. GPU 준비하기 (없으면 CPU로 돌리면 됩니다.)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # device
print(torch.cuda.get_device_name(0))
5. LSTM 네트워크 구성하기
class LSTM1(nn.Module):
def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length):
super(LSTM1, self).__init__()
self.num_classes = num_classes #number of classes
self.num_layers = num_layers #number of layers
self.input_size = input_size #input size
self.hidden_size = hidden_size #hidden state
self.seq_length = seq_length #sequence length
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers, batch_first=True) #lstm
self.fc_1 = nn.Linear(hidden_size, 128) #fully connected 1
self.fc = nn.Linear(128, num_classes) #fully connected last layer
self.relu = nn.ReLU()
def forward(self,x):
h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device) #hidden state
c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device) #internal state
# Propagate input through LSTM
output, (hn, cn) = self.lstm(x, (h_0, c_0)) #lstm with input, hidden, and internal state
hn = hn.view(-1, self.hidden_size) #reshaping the data for Dense layer next
out = self.relu(hn)
out = self.fc_1(out) #first Dense
out = self.relu(out) #relu
out = self.fc(out) #Final Output
return out
위 코드는 복잡해 보이지만, 실상 하나씩 확인해보면 굉장히 연산이 적은 네트워크입니다.
시계열 데이터이지만, 간단한 구성을 위해 Sequence Length도 1이고, LSTM Layer도 1이기 때문에 굉장히 빨리 끝납니다. 아마 본문 작성자가 CPU환경에서도 쉽게 따라 할 수 있게 간단하게 작성한 것 같습니다.
num_epochs = 30000 #1000 epochs
learning_rate = 0.00001 #0.001 lr
input_size = 5 #number of features
hidden_size = 2 #number of features in hidden state
num_layers = 1 #number of stacked lstm layers
num_classes = 1 #number of output classes
for epoch in range(num_epochs):
outputs = lstm1.forward(X_train_tensors_final.to(device)) #forward pass
optimizer.zero_grad() #caluclate the gradient, manually setting to 0
# obtain the loss function
loss = loss_function(outputs, y_train_tensors.to(device))
loss.backward() #calculates the loss of the loss function
optimizer.step() #improve from loss, i.e backprop
if epoch % 100 == 0:
print("Epoch: %d, loss: %1.5f" % (epoch, loss.item()))