์๋ ํ์ธ์. WB LOS/NLOS Classification Using Deep Learning Method(1)์์ UWB CIR Dataset์ ์์ฑํ์๋ค๋ฉด,
2ํธ์ผ๋ก ๋ ผ๋ฌธ์์ ์ ์ํ CNN_LSTM ๋คํธ์ํฌ๋ฅผ ์ฝ๊ฐ ๋ณํํ์ฌ ๊ตฌ์ฑํ๊ฒ ์ต๋๋ค.
[๋ฌด์ ํต์ ] UWB LOS/NLOS Classification Using Deep Learning Method (1)
์๋ ํ์ธ์. ์ค๋์ Indoor Positioning์์ [cm]๋จ์์ ์ค์ฐจ๋ฅผ ๋ด๋ UWB ๊ด๋ จ ๋ ผ๋ฌธ์ ์ด์ผ๊ธฐํ๊ฒ ์ต๋๋ค. coding-yoon.tistory.com/136?category=910542 [๋ฌด์ ํต์ ] Bluetooth Low Energy(BLE) 1. Physical Layer..
coding-yoon.tistory.com
1ํธ์ ๋ณด๊ณ ์ค์๋ ๊ฒ์ ์ถ์ฒ๋๋ฆฝ๋๋ค. ์ด๋ 1ํธ์ฒ๋ผ Dataset์ด ์ค๋น๋๋ค๋ ๊ฐ์  ํ์ ์งํ๋ฉ๋๋ค.

Columns : 1016 (Sampling CIR)
Label : 42000(LOS : 21000, NLOS : 21000)
๋จผ์  ์ ๋ ผ๋ฌธ์ CNN-LSTM ๊ตฌ์กฐ๋ก LOS/NLOS๋ฅผ ํ์ตํ๋ ๋ชจ๋ธ์ ๋๋ค.
(epoch : 10, learning rate : 0.001, dropout : 0.5, Train Sample : 35000, Test Sample : 7000)
CNN์์ CIR Featur์ ์ถ์ถ, Redundant information์ ์ ๊ฑฐํ๊ณ , LSTM์ ์ด์ฉํ์ฌ ๋ถ๋ฅํฉ๋๋ค.
( CNN+stacked-LSTM Accuracy : 82.14% )




Implemnet ( Dataset : df_uwb_data ์ค๋น )
1. Import
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import time
import random
import uwb_dataset
print("Pytorch Version :", torch.__version__)  # Pytorch Version : 1.7.1+cu110
writer = SummaryWriter('runs/UWB_CIR_Classfication')
%matplotlib inline
2. Hyper-Parameters
# random seed
random_seed = 42
num_epoch = 10
batch_size = 64
in_channels = 1
num_classes = 2
num_layers = 2
fully_connected = 128
lr = 0.001
weight_decay = 0.0
# Parameters
view_train_iter = 50
view_val_iter = 5
save_point = 0.903. Random Seed
def torch_random_seed(on_seed=False, random_seed=1):
    if on_seed:
        torch.manual_seed(random_seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        np.random.seed(random_seed)
        random.seed(random_seed)
        
torch_random_seed(on_seed=True, random_seed=random_seed)4. Model Evaluation Function
def get_clf_eval(y_true, y_pred, average='weighted'):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average=average)
    recall = recall_score(y_true, y_pred, average=average)
    f1 = f1_score(y_true, y_pred, average=average)
    return accuracy, precision, recall, f15. Split (Train, Validation, Test) X, label Data
# sklearn์ train_test_split์ stratify ํ๋ผ๋ฏธํฐ๋ฅผ ํตํด Label์ ๋น์จ์ ์ ์งํ๋ฉด์ Split
x_train, x_test, y_train, y_test = train_test_split(df_uwb_data.values, df_uwb['NLOS'].values, test_size=0.1, random_state=42, stratify=df_uwb['NLOS'].values)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=random_seed, stratify=y_train)
print("x_train shape :", x_train.shape, y_train.shape)
print("x_val shape :", x_val.shape, y_val.shape)
print("x_test shape :", x_test.shape, y_test.shape)print("Train NLOS 0 count :", len(y_train[y_train==0]))
print("Train NLOS 1 count :", len(y_train[y_train==1]))
print("Validation NLOS 0 count :", len(y_val[y_val==0]))
print("Validation NLOS 1 count :", len(y_val[y_val==1]))
print("Test NLOS 0 count :", len(y_test[y_test==0]))
print("Test NLOS 0 count :", len(y_test[y_test==1]))

7. Dataset & DataLoader
def generating_loader(x_data, y_data, batch_size=batch_size, shuffle=True, drop_last=True):
    # preprocessing x_data
    x_data = np.expand_dims(x_data, axis=1)
    x_tensor = torch.tensor(x_data, dtype=torch.float32)
    # preprocessing y_data
    y_tensor = torch.tensor(y_data, dtype=torch.long).view(-1)
    return DataLoader(TensorDataset(x_tensor, y_tensor), batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)trainloader = generating_loader(x_train, y_train, batch_size=batch_size, shuffle=True, drop_last=True)
validationloader = generating_loader(x_val, y_val, batch_size=batch_size, shuffle=False, drop_last=True)
testloader = generating_loader(x_val, y_val, batch_size=batch_size, shuffle=False, drop_last=True)for x, label in trainloader:
    print(x.shape, label.shape)
    break
8. Create Model
class CNN_LSTM(nn.Module):
    def __init__(self, in_channels, out_channels, batch_size, num_layers, fully_connected, device):
        super(CNN_LSTM, self).__init__()
        self.batch_size = batch_size
        self.conv1d_layer = nn.Sequential(
            nn.Conv1d(in_channels=in_channels, out_channels=10, kernel_size=4, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv1d(in_channels=10, out_channels=20, kernel_size=5, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
        ) 
        self.lstm = nn.LSTM(input_size = 504, 
                            hidden_size = 32, 
                            num_layers = num_layers,
                            bias = False,
                            dropout = 0.5,
                            bidirectional = True,
                            batch_first=True)
        self.hidden_state, self.cell_state = self.init_hidden()
        
        self.bn2 = nn.BatchNorm1d(20)
        self.bn0 = nn.BatchNorm1d(64)
        self.bn1 = nn.BatchNorm1d(128)
        self.fc_layer = nn.Linear(64, 128)
        self.relu = nn.ReLU()
        self.fc_layer_class = nn.Linear(128, out_channels)
    def init_hidden(self):
        hidden_state = torch.zeros(num_layers*2, self.batch_size, 32).to(device)
        cell_state = torch.zeros(num_layers*2, self.batch_size, 32).to(device)
        return hidden_state, cell_state
        
    def forward(self, x):
        x = self.conv1d_layer(x)
        x, _ = self.lstm(x,(self.hidden_state, self.cell_state))
        x = x[:, -1 :].view(x.size(0), -1)
        x = self.bn0(x)
        x = self.fc_layer(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.fc_layer_class(x)
        x = self.relu(x)
        return x9. Loss Function, Optimizer
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN_LSTM(
    in_channels=in_channels,\
    device=device,\
    out_channels=num_classes,\
    batch_size=batch_size,\
    fully_connected=fully_connected,\
    num_layers=num_layers).to(device)
loss_function = nn.CrossEntropyLoss()  
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)  # optimizer
# tensorboard
images, labels = next(iter(trainloader))
writer.add_graph(model, images.to(device))
# lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=[int(num_epoch * 0.5), int(num_epoch * 0.75)], gamma=0.1, last_epoch=-1)10. Train, Validation
start = time.time()
correct = 0
total = 0
train_acc = []
tmp_acc = 0
loss_arr = []
print("*Train Start!!*")
if torch.cuda.device_count() == True:
    print("epoch : {}, learing rate : {}, device : {}".format(num_epoch, lr, torch.cuda.get_device_name(0)))
    print("Model : {}".format(model._get_name()))
    print("Loss function : {}".format(loss_function._get_name()))
    print("Optimizer : {}".format(str(optimizer).replace("\n", " ").replace("     ", ", ")))
else:
    print("epoch : {}, learing rate : {}, device : {}".format(num_epoch, lr, device))
    print("Model : {}".format(model._get_name()))
    print("Loss function : {}".format(loss_function._get_name()))
    print("Optimizer : {}".format(str(optimizer).replace("\n", " ").replace("     ", ", ")))
print("*"*100)
# train
for epoch in range(num_epoch):
    epoch += 1
    for train_iter, (train_x, train_y_true) in enumerate(trainloader):
        model.train()  # Train mode
        model.zero_grad()  # model zero initialize
        optimizer.zero_grad()  # optimizer zero initialize
        
        train_x, train_y_true = train_x.to(device), train_y_true.to(device)  # device(gpu)
        train_y_pred = model.forward(train_x)  # forward
        loss = loss_function(train_y_pred, train_y_true)  # loss function
        loss.backward()  # backward
        optimizer.step()  # optimizer
        _, pred_index = torch.max(train_y_pred, 1)
        
        if train_iter % view_train_iter == 0:
            loss_arr.append(loss.item())
            total += train_y_true.size(0)  # y.size(0)
            correct += (pred_index == train_y_true).sum().float()  # correct
            tmp_acc = correct / total  # accuracy
            train_acc.append(tmp_acc)
            writer.add_scalar("Loss/train", loss, epoch)
            writer.add_scalar("Accuracy/train",tmp_acc, epoch)
            print("[Train] ({}, {}) Time={:.2f}[s], loss = {:.5f}, Accuracy = {:.4f}, lr={:.6f}".format(epoch, train_iter, time.time()-start, loss.item(), tmp_acc, optimizer.param_groups[0]['lr']))
    # lr_scheduler.step()
    # validation 
    if epoch % view_val_iter == 0: 
        val_acc_tmp, val_precision_tmp, val_recall_tmp, val_f1_tmp = [], [], [], []
        val_acc_result, val_precision_result, val_recall_result, val_f1_result = [], [], [], []
        val_time = time.time()
        for val_iter, (val_x, val_y_true) in enumerate(validationloader):
            model.eval()
            val_x, val_y_true = val_x.to(device), val_y_true.to(device)  # device(gpu)
            val_y_pred = model.forward(val_x)  # forward
            _, val_pred_index = torch.max(val_y_pred, 1)
            val_pred_index_cpu = val_pred_index.cpu().detach().numpy()
            val_y_true_cpu = val_y_true.cpu().detach().numpy()
            
            val_acc, val_precision, val_recall, val_f1 = get_clf_eval(val_y_true_cpu, val_pred_index_cpu)
            val_acc_tmp.append(val_acc), val_acc_result.append(val_acc)
            val_precision_tmp.append(val_precision), val_precision_result.append(val_precision)
            val_recall_tmp.append(val_recall), val_recall_result.append(val_recall)
            val_f1_tmp.append(val_f1), val_f1_result.append(val_f1)
        val_acc_mean = sum(val_acc_tmp, 0.0)/len(val_acc_tmp)
        val_precision_mean = sum(val_precision_tmp, 0.0)/len(val_precision_tmp)
        val_recall_mean = sum(val_recall_tmp, 0.0)/len(val_recall_tmp)
        val_f1_mean = sum(val_f1_tmp, 0.0)/len(val_f1_tmp)
        print("-"*100)
        print("|  Validation {:.2f}[s], Accuracy : {:.4f}, Precision : {:.4f}, Recall : {:.4f}, F1 Score : {:.4f}   |".format(
            time.time()-val_time, val_acc_mean, val_precision_mean, val_recall_mean, val_f1_mean))
        print("-"*100)
        if val_acc_mean >= save_point:
            epoch_str = str(epoch)
            lr_str = str(lr)
            batch_str= str(batch_size)
            acc_str= str((int(val_acc_mean*100)))
            model_name = "["+model._get_name()+"](epoch-"+epoch_str+")-"+"(init_lr-"+lr_str+")-"+"(batch-"+batch_str+")-"+"(acc-"+acc_str+").pt"
            save_path = os.path.join(path, dir_ ,model_name)
            parameters = {'epoch' : epoch, 'model_state_dict' : model.state_dict(), 'optimizer_state_dict' : optimizer.state_dict(), 'loss' : loss}
            torch.save(parameters, save_path)
            print('[INFO] Model Saved : '+ save_path)
writer.flush()
writer.close()
fig = plt.figure(figsize=[16, 8])
loss_plt = plt.subplot(2,1,1)
acc_plt = plt.subplot(2,1,2)
loss_plt.plot(loss_arr, color='red', marker="*")
loss_plt.set_title("Train - Loss", fontsize=15)
loss_plt.legend(['Train-Loss'])
loss_plt.grid(True, axis='y')
acc_plt.plot(train_acc, color='green', marker="*")
acc_plt.set_title("Train - Accuracy", fontsize=15)
acc_plt.legend(['Train-Accuracy'])
acc_plt.set_ylim((0.0, 1.05))
acc_plt.grid(True, axis='y')
plt.show()
11. Model Evaluation
test_start = time.time()
model.eval()
with torch.no_grad():
    test_acc_tmp, test_precision_tmp, test_recall_tmp, test_f1_tmp = [], [], [], []
    for test_iter, (test_x, test_y_true) in enumerate(testloader):
        test_x, test_y_true = test_x.to(device), test_y_true.to(device)
        test_y_pred = model.forward(test_x)  # forward
        _, test_pred_index = torch.max(test_y_pred, 1)
        test_pred_index_cpu = test_pred_index.cpu().detach().numpy()
        test_y_true_cpu = test_y_true.cpu().detach().numpy()
            
        test_acc, test_precision, test_recall, test_f1 = get_clf_eval(test_y_true_cpu, test_pred_index_cpu)
        test_acc_tmp.append(test_acc), test_precision_tmp.append(test_precision), test_recall_tmp.append(test_recall), test_f1_tmp.append(test_f1)
    test_acc_mean = sum(test_acc_tmp, 0.0)/len(test_acc_tmp)
    test_precision_mean = sum(test_precision_tmp, 0.0)/len(test_precision_tmp)
    test_recall_mean = sum(test_recall_tmp, 0.0)/len(test_recall_tmp)
    test_f1_mean = sum(test_f1_tmp, 0.0)/len(test_f1_tmp)
    print("[Evaluation] {:.2f}[s], Test Accuracy : {:.4f}, Precision : {:.4f}, Recall : {:.4f}, F1 Score : {:.4f}".format(
        time.time()-test_start, test_acc_mean, test_precision_mean, test_recall_mean, test_f1_mean))
    print("[Model Performance] Model Performance : {:.5f}".format(test_acc_mean))

๋ชจ๋ธ์ ์ ์๋ ํ๋ผ๋ฏธํฐ๋ ๊ทธ๋๋ก ์ฌ์ฉํ๊ณ , ์ฝ๊ฐ ๋ณํํ์ฌ ๋ชจ๋ธ์ ๊ตฌ์ถํ์๋๋ฐ ๋์ Accuracy๋ฅผ ๋ณด์ฌ์ค๋๋ค.
ํ์ง๋ง LOS/NLOS์ ๋ถ๋ฅ๋ฅผ ํตํด UWB ์ฑ๋ฅ์ ์ฌ๋ฆฌ๋ ๋ฐฉ๋ฒ์ ์ ์ํ์์ง๋ง, ๋ ผ๋ฌธ์ Limitations์ผ๋ก ์ค์ ๋ก ์ด ๋ถ๋ฅ๊ธฐ๋ฅผ ํตํด UWB ์ฑ๋ฅ์ ๊ฒ์ฆํ์ง ๋ชปํ์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ์ ๊ฐ ์๊ฐํ๋ ๋ ๋ค๋ฅธ ๋ฌธ์ ๋ ์คํ์์ค์ ๋ฐ์ดํฐ๋ผ๊ณ ์๊ฐํฉ๋๋ค.

์ด ๋ฐ์ดํฐ๋ฅผ ๋ณด์์ ๋, ๊ตณ์ด ๋ฅ๋ฌ๋, ๋จธ์ ๋ฌ๋์ ์ฌ์ฉํ ํ์๊ฐ ์์๊น? ์๋ฌธ์ด ๋ญ๋๋ค.
1์ฐจ์์ ์ผ๋ก ์๊ฐํ์์ ๋ Threshold๋ฅผ 10000์์ ์๋ผ๋ฒ๋ฆฌ๋ฉด, NLOS๋ฅผ ์ฝ๊ฒ ์ง์๋ฒ๋ฆด ์ ์์ต๋๋ค.
ํ ๋ฐ์ดํฐ๋ ํ์(๋ฐฉํฅ) ๋ถ๋ถ์ ์ ์ธํ ์ค์ง ํฌ๊ธฐ์ ์ฑ์ง๋ง์ ๊ฐ์ง๊ณ ํ์ตํ์๊ธฐ ๋๋ฌธ์, ๊ณผ์ฐ ์ค์  ํ๊ฒฝ ์์์ ์ ๋๋ก ์๋ํ ์ง ์๋ฌธ์ด ๋ญ๋๋ค.
UWB ํน์ฑ์ Nanosecond๋ก ์๊ฐ์ ์ฌ๋ ๋ฐฉ์์ด๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ ์ถ์ถํ๋ ๊ฒ์ด ๊ต์ฅํ ๊ต์ฅํ ์ด๋ ค์์ด ์์ด ๊ตฌํํ๋ ๊ฒ์ ์ด๋ ค์์ด ์์ต๋๋ค. ๊ทธ๋ ๊ธฐ ๋๋ฌธ์ 5๋  ์  ์คํ์์ค์ด์ง๋ง, 2020๋ ์๋ ์ด๋ฅผ ์ด์ฉํด ๋ ผ๋ฌธ์ ์์ฑํ์ ๊ฒ์ด๋ผ๊ณ ์๊ฐํฉ๋๋ค.
'๐ Python > Deep Learning' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [๋ฅ๋ฌ๋] Pytorch. Target n is out of bounds. (1) | 2021.03.26 | 
|---|---|
| [๋ฅ๋ฌ๋] ResNet - Residual Block ์ฝ๊ฒ์ดํดํ๊ธฐ! (Pytorch ๊ตฌํ) (2) | 2021.03.24 | 
| [Pytorch] RNN์์ Tanh๋ง๊ณ Sigmoid๋ ReLU๋ฅผ ์ฌ์ฉํ๋ฉด ์๋ ๊น? (1) | 2021.02.04 | 
| [Pytorch] LSTM์ ์ด์ฉํ ์ผ์ฑ์ ์ ์ฃผ๊ฐ ์์ธกํ๊ธฐ (35) | 2021.02.02 | 
| [๋ฅ๋ฌ๋] Depth-wise Separable Convolution ์๋ฆฌ(Pytorch ๊ตฌํ) (2) | 2021.01.22 |