안녕하세요. WB LOS/NLOS Classification Using Deep Learning Method(1)에서 UWB CIR Dataset을 생성하였다면,
2편으로 논문에서 제시한 CNN_LSTM 네트워크를 약간 변형하여 구성하겠습니다.
1편을 보고 오시는 것을 추천드립니다. 이는 1편처럼 Dataset이 준비됐다는 가정 하에 진행됩니다.
Columns : 1016 (Sampling CIR)
Label : 42000(LOS : 21000, NLOS : 21000)
먼저 위 논문은 CNN-LSTM 구조로 LOS/NLOS를 학습하는 모델입니다.
(epoch : 10, learning rate : 0.001, dropout : 0.5, Train Sample : 35000, Test Sample : 7000)
CNN에서 CIR Featur을 추출, Redundant information을 제거하고, LSTM을 이용하여 분류합니다.
( CNN+stacked-LSTM Accuracy : 82.14% )
Implemnet ( Dataset : df_uwb_data 준비 )
1. Import
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import random
import uwb_dataset
print("Pytorch Version :", torch.__version__) # Pytorch Version : 1.7.1+cu110
writer = SummaryWriter('runs/UWB_CIR_Classfication')
%matplotlib inline
2. Hyper-Parameters
# random seed
random_seed = 42
num_epoch = 10
batch_size = 64
in_channels = 1
num_classes = 2
num_layers = 2
fully_connected = 128
lr = 0.001
weight_decay = 0.0
# Parameters
view_train_iter = 50
view_val_iter = 5
save_point = 0.90
3. Random Seed
def torch_random_seed(on_seed=False, random_seed=1):
if on_seed:
torch.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)
torch_random_seed(on_seed=True, random_seed=random_seed)
4. Model Evaluation Function
def get_clf_eval(y_true, y_pred, average='weighted'):
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average=average)
recall = recall_score(y_true, y_pred, average=average)
f1 = f1_score(y_true, y_pred, average=average)
return accuracy, precision, recall, f1
5. Split (Train, Validation, Test) X, label Data
# sklearn의 train_test_split은 stratify 파라미터를 통해 Label의 비율을 유지하면서 Split
x_train, x_test, y_train, y_test = train_test_split(df_uwb_data.values, df_uwb['NLOS'].values, test_size=0.1, random_state=42, stratify=df_uwb['NLOS'].values)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=random_seed, stratify=y_train)
print("x_train shape :", x_train.shape, y_train.shape)
print("x_val shape :", x_val.shape, y_val.shape)
print("x_test shape :", x_test.shape, y_test.shape)
print("Train NLOS 0 count :", len(y_train[y_train==0]))
print("Train NLOS 1 count :", len(y_train[y_train==1]))
print("Validation NLOS 0 count :", len(y_val[y_val==0]))
print("Validation NLOS 1 count :", len(y_val[y_val==1]))
print("Test NLOS 0 count :", len(y_test[y_test==0]))
print("Test NLOS 0 count :", len(y_test[y_test==1]))
7. Dataset & DataLoader
def generating_loader(x_data, y_data, batch_size=batch_size, shuffle=True, drop_last=True):
# preprocessing x_data
x_data = np.expand_dims(x_data, axis=1)
x_tensor = torch.tensor(x_data, dtype=torch.float32)
# preprocessing y_data
y_tensor = torch.tensor(y_data, dtype=torch.long).view(-1)
return DataLoader(TensorDataset(x_tensor, y_tensor), batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
trainloader = generating_loader(x_train, y_train, batch_size=batch_size, shuffle=True, drop_last=True)
validationloader = generating_loader(x_val, y_val, batch_size=batch_size, shuffle=False, drop_last=True)
testloader = generating_loader(x_val, y_val, batch_size=batch_size, shuffle=False, drop_last=True)
for x, label in trainloader:
print(x.shape, label.shape)
break
8. Create Model
class CNN_LSTM(nn.Module):
def __init__(self, in_channels, out_channels, batch_size, num_layers, fully_connected, device):
super(CNN_LSTM, self).__init__()
self.batch_size = batch_size
self.conv1d_layer = nn.Sequential(
nn.Conv1d(in_channels=in_channels, out_channels=10, kernel_size=4, stride=1, padding=0),
nn.ReLU(),
nn.Conv1d(in_channels=10, out_channels=20, kernel_size=5, stride=1, padding=0),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=2),
)
self.lstm = nn.LSTM(input_size = 504,
hidden_size = 32,
num_layers = num_layers,
bias = False,
dropout = 0.5,
bidirectional = True,
batch_first=True)
self.hidden_state, self.cell_state = self.init_hidden()
self.bn2 = nn.BatchNorm1d(20)
self.bn0 = nn.BatchNorm1d(64)
self.bn1 = nn.BatchNorm1d(128)
self.fc_layer = nn.Linear(64, 128)
self.relu = nn.ReLU()
self.fc_layer_class = nn.Linear(128, out_channels)
def init_hidden(self):
hidden_state = torch.zeros(num_layers*2, self.batch_size, 32).to(device)
cell_state = torch.zeros(num_layers*2, self.batch_size, 32).to(device)
return hidden_state, cell_state
def forward(self, x):
x = self.conv1d_layer(x)
x, _ = self.lstm(x,(self.hidden_state, self.cell_state))
x = x[:, -1 :].view(x.size(0), -1)
x = self.bn0(x)
x = self.fc_layer(x)
x = self.bn1(x)
x = self.relu(x)
x = self.fc_layer_class(x)
x = self.relu(x)
return x
9. Loss Function, Optimizer
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN_LSTM(
in_channels=in_channels,\
device=device,\
out_channels=num_classes,\
batch_size=batch_size,\
fully_connected=fully_connected,\
num_layers=num_layers).to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) # optimizer
# tensorboard
images, labels = next(iter(trainloader))
writer.add_graph(model, images.to(device))
# lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=[int(num_epoch * 0.5), int(num_epoch * 0.75)], gamma=0.1, last_epoch=-1)
10. Train, Validation
start = time.time()
correct = 0
total = 0
train_acc = []
tmp_acc = 0
loss_arr = []
print("*Train Start!!*")
if torch.cuda.device_count() == True:
print("epoch : {}, learing rate : {}, device : {}".format(num_epoch, lr, torch.cuda.get_device_name(0)))
print("Model : {}".format(model._get_name()))
print("Loss function : {}".format(loss_function._get_name()))
print("Optimizer : {}".format(str(optimizer).replace("\n", " ").replace(" ", ", ")))
else:
print("epoch : {}, learing rate : {}, device : {}".format(num_epoch, lr, device))
print("Model : {}".format(model._get_name()))
print("Loss function : {}".format(loss_function._get_name()))
print("Optimizer : {}".format(str(optimizer).replace("\n", " ").replace(" ", ", ")))
print("*"*100)
# train
for epoch in range(num_epoch):
epoch += 1
for train_iter, (train_x, train_y_true) in enumerate(trainloader):
model.train() # Train mode
model.zero_grad() # model zero initialize
optimizer.zero_grad() # optimizer zero initialize
train_x, train_y_true = train_x.to(device), train_y_true.to(device) # device(gpu)
train_y_pred = model.forward(train_x) # forward
loss = loss_function(train_y_pred, train_y_true) # loss function
loss.backward() # backward
optimizer.step() # optimizer
_, pred_index = torch.max(train_y_pred, 1)
if train_iter % view_train_iter == 0:
loss_arr.append(loss.item())
total += train_y_true.size(0) # y.size(0)
correct += (pred_index == train_y_true).sum().float() # correct
tmp_acc = correct / total # accuracy
train_acc.append(tmp_acc)
writer.add_scalar("Loss/train", loss, epoch)
writer.add_scalar("Accuracy/train",tmp_acc, epoch)
print("[Train] ({}, {}) Time={:.2f}[s], loss = {:.5f}, Accuracy = {:.4f}, lr={:.6f}".format(epoch, train_iter, time.time()-start, loss.item(), tmp_acc, optimizer.param_groups[0]['lr']))
# lr_scheduler.step()
# validation
if epoch % view_val_iter == 0:
val_acc_tmp, val_precision_tmp, val_recall_tmp, val_f1_tmp = [], [], [], []
val_acc_result, val_precision_result, val_recall_result, val_f1_result = [], [], [], []
val_time = time.time()
for val_iter, (val_x, val_y_true) in enumerate(validationloader):
model.eval()
val_x, val_y_true = val_x.to(device), val_y_true.to(device) # device(gpu)
val_y_pred = model.forward(val_x) # forward
_, val_pred_index = torch.max(val_y_pred, 1)
val_pred_index_cpu = val_pred_index.cpu().detach().numpy()
val_y_true_cpu = val_y_true.cpu().detach().numpy()
val_acc, val_precision, val_recall, val_f1 = get_clf_eval(val_y_true_cpu, val_pred_index_cpu)
val_acc_tmp.append(val_acc), val_acc_result.append(val_acc)
val_precision_tmp.append(val_precision), val_precision_result.append(val_precision)
val_recall_tmp.append(val_recall), val_recall_result.append(val_recall)
val_f1_tmp.append(val_f1), val_f1_result.append(val_f1)
val_acc_mean = sum(val_acc_tmp, 0.0)/len(val_acc_tmp)
val_precision_mean = sum(val_precision_tmp, 0.0)/len(val_precision_tmp)
val_recall_mean = sum(val_recall_tmp, 0.0)/len(val_recall_tmp)
val_f1_mean = sum(val_f1_tmp, 0.0)/len(val_f1_tmp)
print("-"*100)
print("| Validation {:.2f}[s], Accuracy : {:.4f}, Precision : {:.4f}, Recall : {:.4f}, F1 Score : {:.4f} |".format(
time.time()-val_time, val_acc_mean, val_precision_mean, val_recall_mean, val_f1_mean))
print("-"*100)
if val_acc_mean >= save_point:
epoch_str = str(epoch)
lr_str = str(lr)
batch_str= str(batch_size)
acc_str= str((int(val_acc_mean*100)))
model_name = "["+model._get_name()+"](epoch-"+epoch_str+")-"+"(init_lr-"+lr_str+")-"+"(batch-"+batch_str+")-"+"(acc-"+acc_str+").pt"
save_path = os.path.join(path, dir_ ,model_name)
parameters = {'epoch' : epoch, 'model_state_dict' : model.state_dict(), 'optimizer_state_dict' : optimizer.state_dict(), 'loss' : loss}
torch.save(parameters, save_path)
print('[INFO] Model Saved : '+ save_path)
writer.flush()
writer.close()
fig = plt.figure(figsize=[16, 8])
loss_plt = plt.subplot(2,1,1)
acc_plt = plt.subplot(2,1,2)
loss_plt.plot(loss_arr, color='red', marker="*")
loss_plt.set_title("Train - Loss", fontsize=15)
loss_plt.legend(['Train-Loss'])
loss_plt.grid(True, axis='y')
acc_plt.plot(train_acc, color='green', marker="*")
acc_plt.set_title("Train - Accuracy", fontsize=15)
acc_plt.legend(['Train-Accuracy'])
acc_plt.set_ylim((0.0, 1.05))
acc_plt.grid(True, axis='y')
plt.show()
11. Model Evaluation
test_start = time.time()
model.eval()
with torch.no_grad():
test_acc_tmp, test_precision_tmp, test_recall_tmp, test_f1_tmp = [], [], [], []
for test_iter, (test_x, test_y_true) in enumerate(testloader):
test_x, test_y_true = test_x.to(device), test_y_true.to(device)
test_y_pred = model.forward(test_x) # forward
_, test_pred_index = torch.max(test_y_pred, 1)
test_pred_index_cpu = test_pred_index.cpu().detach().numpy()
test_y_true_cpu = test_y_true.cpu().detach().numpy()
test_acc, test_precision, test_recall, test_f1 = get_clf_eval(test_y_true_cpu, test_pred_index_cpu)
test_acc_tmp.append(test_acc), test_precision_tmp.append(test_precision), test_recall_tmp.append(test_recall), test_f1_tmp.append(test_f1)
test_acc_mean = sum(test_acc_tmp, 0.0)/len(test_acc_tmp)
test_precision_mean = sum(test_precision_tmp, 0.0)/len(test_precision_tmp)
test_recall_mean = sum(test_recall_tmp, 0.0)/len(test_recall_tmp)
test_f1_mean = sum(test_f1_tmp, 0.0)/len(test_f1_tmp)
print("[Evaluation] {:.2f}[s], Test Accuracy : {:.4f}, Precision : {:.4f}, Recall : {:.4f}, F1 Score : {:.4f}".format(
time.time()-test_start, test_acc_mean, test_precision_mean, test_recall_mean, test_f1_mean))
print("[Model Performance] Model Performance : {:.5f}".format(test_acc_mean))
모델에 제시된 파라미터는 그대로 사용하고, 약간 변형하여 모델을 구축하였는데 높은 Accuracy를 보여줍니다.
하지만 LOS/NLOS의 분류를 통해 UWB 성능을 올리는 방법을 제시하였지만, 논문의 Limitations으로 실제로 이 분류기를 통해 UWB 성능을 검증하지 못했습니다.
그리고 제가 생각하는 또 다른 문제는 오픈소스의 데이터라고 생각합니다.
이 데이터를 보았을 때, 굳이 딥러닝, 머신러닝을 사용할 필요가 있을까? 의문이 듭니다.
1차원적으로 생각하였을 때 Threshold를 10000에서 잘라버리면, NLOS를 쉽게 지워버릴 수 있습니다.
현 데이터는 허수(방향) 부분을 제외한 오직 크기의 성질만을 가지고 학습하였기 때문에, 과연 실제 환경 속에서 제대로 작동할지 의문이 듭니다.
UWB 특성상 Nanosecond로 시간을 재는 방식이기 때문에 데이터 추출하는 것이 굉장히 굉장히 어려움이 있어 구현하는 것은 어려움이 있습니다. 그렇기 때문에 5년 전 오픈소스이지만, 2020년에도 이를 이용해 논문을 작성했을 것이라고 생각합니다.
'Python > Deep Learning' 카테고리의 다른 글
[딥러닝] Pytorch. Target n is out of bounds. (0) | 2021.03.26 |
---|---|
[딥러닝] ResNet - Residual Block 쉽게이해하기! (Pytorch 구현) (2) | 2021.03.24 |
[Pytorch] RNN에서 Tanh말고 Sigmoid나 ReLU를 사용하면 안될까? (0) | 2021.02.04 |
[Pytorch] LSTM을 이용한 삼성전자 주가 예측하기 (33) | 2021.02.02 |
[딥러닝] Depth-wise Separable Convolution 원리(Pytorch 구현) (2) | 2021.01.22 |