반응형

안녕하세요. WB LOS/NLOS Classification Using Deep Learning Method(1)에서 UWB CIR Dataset을 생성하였다면, 

2편으로 논문에서 제시한 CNN_LSTM 네트워크를 약간 변형하여 구성하겠습니다. 

 

coding-yoon.tistory.com/138

 

[무선 통신] UWB LOS/NLOS Classification Using Deep Learning Method (1)

안녕하세요. 오늘은 Indoor Positioning에서 [cm]단위의 오차를 내는 UWB 관련 논문에 이야기하겠습니다. coding-yoon.tistory.com/136?category=910542 [무선 통신] Bluetooth Low Energy(BLE) 1. Physical Layer..

coding-yoon.tistory.com

1편을 보고 오시는 것을 추천드립니다. 이는 1편처럼 Dataset이 준비됐다는 가정 하에 진행됩니다.

 

Dataset

Columns : 1016 (Sampling CIR)

Label : 42000(LOS : 21000, NLOS : 21000)  

 

먼저 위 논문은 CNN-LSTM 구조로 LOS/NLOS를 학습하는 모델입니다.

(epoch : 10, learning rate : 0.001, dropout : 0.5, Train Sample : 35000, Test Sample : 7000)

CNN에서 CIR Featur을 추출, Redundant information을 제거하고, LSTM을 이용하여 분류합니다.

( CNN+stacked-LSTM Accuracy : 82.14% )

 

Model Structure
CNN Structure
LSTM Structure
Result

Implemnet ( Dataset : df_uwb_data 준비 )

 

1. Import

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 

import time
import random

import uwb_dataset

print("Pytorch Version :", torch.__version__)  # Pytorch Version : 1.7.1+cu110
writer = SummaryWriter('runs/UWB_CIR_Classfication')

%matplotlib inline

 

2. Hyper-Parameters

# random seed
random_seed = 42

num_epoch = 10
batch_size = 64
in_channels = 1
num_classes = 2
num_layers = 2
fully_connected = 128

lr = 0.001
weight_decay = 0.0

# Parameters
view_train_iter = 50
view_val_iter = 5
save_point = 0.90

3. Random Seed

def torch_random_seed(on_seed=False, random_seed=1):
    if on_seed:
        torch.manual_seed(random_seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

        np.random.seed(random_seed)
        random.seed(random_seed)
        
torch_random_seed(on_seed=True, random_seed=random_seed)

4. Model Evaluation Function

def get_clf_eval(y_true, y_pred, average='weighted'):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average=average)
    recall = recall_score(y_true, y_pred, average=average)
    f1 = f1_score(y_true, y_pred, average=average)

    return accuracy, precision, recall, f1

5. Split (Train, Validation, Test) X, label Data

# sklearn의 train_test_split은 stratify 파라미터를 통해 Label의 비율을 유지하면서 Split
x_train, x_test, y_train, y_test = train_test_split(df_uwb_data.values, df_uwb['NLOS'].values, test_size=0.1, random_state=42, stratify=df_uwb['NLOS'].values)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=random_seed, stratify=y_train)

print("x_train shape :", x_train.shape, y_train.shape)
print("x_val shape :", x_val.shape, y_val.shape)
print("x_test shape :", x_test.shape, y_test.shape)
print("Train NLOS 0 count :", len(y_train[y_train==0]))
print("Train NLOS 1 count :", len(y_train[y_train==1]))
print("Validation NLOS 0 count :", len(y_val[y_val==0]))
print("Validation NLOS 1 count :", len(y_val[y_val==1]))
print("Test NLOS 0 count :", len(y_test[y_test==0]))
print("Test NLOS 0 count :", len(y_test[y_test==1]))

7. Dataset & DataLoader

def generating_loader(x_data, y_data, batch_size=batch_size, shuffle=True, drop_last=True):
    # preprocessing x_data
    x_data = np.expand_dims(x_data, axis=1)
    x_tensor = torch.tensor(x_data, dtype=torch.float32)
    # preprocessing y_data
    y_tensor = torch.tensor(y_data, dtype=torch.long).view(-1)

    return DataLoader(TensorDataset(x_tensor, y_tensor), batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
trainloader = generating_loader(x_train, y_train, batch_size=batch_size, shuffle=True, drop_last=True)
validationloader = generating_loader(x_val, y_val, batch_size=batch_size, shuffle=False, drop_last=True)
testloader = generating_loader(x_val, y_val, batch_size=batch_size, shuffle=False, drop_last=True)
for x, label in trainloader:
    print(x.shape, label.shape)
    break

8. Create Model

class CNN_LSTM(nn.Module):
    def __init__(self, in_channels, out_channels, batch_size, num_layers, fully_connected, device):
        super(CNN_LSTM, self).__init__()
        self.batch_size = batch_size
        self.conv1d_layer = nn.Sequential(
            nn.Conv1d(in_channels=in_channels, out_channels=10, kernel_size=4, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv1d(in_channels=10, out_channels=20, kernel_size=5, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
        ) 
        self.lstm = nn.LSTM(input_size = 504, 
                            hidden_size = 32, 
                            num_layers = num_layers,
                            bias = False,
                            dropout = 0.5,
                            bidirectional = True,
                            batch_first=True)

        self.hidden_state, self.cell_state = self.init_hidden()
        
        self.bn2 = nn.BatchNorm1d(20)
        self.bn0 = nn.BatchNorm1d(64)
        self.bn1 = nn.BatchNorm1d(128)

        self.fc_layer = nn.Linear(64, 128)
        self.relu = nn.ReLU()
        self.fc_layer_class = nn.Linear(128, out_channels)


    def init_hidden(self):
        hidden_state = torch.zeros(num_layers*2, self.batch_size, 32).to(device)
        cell_state = torch.zeros(num_layers*2, self.batch_size, 32).to(device)

        return hidden_state, cell_state
        
    def forward(self, x):
        x = self.conv1d_layer(x)
        x, _ = self.lstm(x,(self.hidden_state, self.cell_state))
        x = x[:, -1 :].view(x.size(0), -1)
        x = self.bn0(x)
        x = self.fc_layer(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.fc_layer_class(x)
        x = self.relu(x)

        return x

9. Loss Function, Optimizer

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CNN_LSTM(
    in_channels=in_channels,\
    device=device,\
    out_channels=num_classes,\
    batch_size=batch_size,\
    fully_connected=fully_connected,\
    num_layers=num_layers).to(device)
loss_function = nn.CrossEntropyLoss()  
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)  # optimizer

# tensorboard
images, labels = next(iter(trainloader))
writer.add_graph(model, images.to(device))

# lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer=optimizer, milestones=[int(num_epoch * 0.5), int(num_epoch * 0.75)], gamma=0.1, last_epoch=-1)

10. Train, Validation 

start = time.time()

correct = 0
total = 0
train_acc = []
tmp_acc = 0
loss_arr = []

print("*Train Start!!*")
if torch.cuda.device_count() == True:
    print("epoch : {}, learing rate : {}, device : {}".format(num_epoch, lr, torch.cuda.get_device_name(0)))
    print("Model : {}".format(model._get_name()))
    print("Loss function : {}".format(loss_function._get_name()))
    print("Optimizer : {}".format(str(optimizer).replace("\n", " ").replace("     ", ", ")))
else:
    print("epoch : {}, learing rate : {}, device : {}".format(num_epoch, lr, device))
    print("Model : {}".format(model._get_name()))
    print("Loss function : {}".format(loss_function._get_name()))
    print("Optimizer : {}".format(str(optimizer).replace("\n", " ").replace("     ", ", ")))
print("*"*100)

# train
for epoch in range(num_epoch):
    epoch += 1
    for train_iter, (train_x, train_y_true) in enumerate(trainloader):
        model.train()  # Train mode
        model.zero_grad()  # model zero initialize
        optimizer.zero_grad()  # optimizer zero initialize
        

        train_x, train_y_true = train_x.to(device), train_y_true.to(device)  # device(gpu)
        train_y_pred = model.forward(train_x)  # forward
        loss = loss_function(train_y_pred, train_y_true)  # loss function
        loss.backward()  # backward
        optimizer.step()  # optimizer
        _, pred_index = torch.max(train_y_pred, 1)
        
        if train_iter % view_train_iter == 0:
            loss_arr.append(loss.item())
            total += train_y_true.size(0)  # y.size(0)
            correct += (pred_index == train_y_true).sum().float()  # correct
            tmp_acc = correct / total  # accuracy
            train_acc.append(tmp_acc)

            writer.add_scalar("Loss/train", loss, epoch)
            writer.add_scalar("Accuracy/train",tmp_acc, epoch)

            print("[Train] ({}, {}) Time={:.2f}[s], loss = {:.5f}, Accuracy = {:.4f}, lr={:.6f}".format(epoch, train_iter, time.time()-start, loss.item(), tmp_acc, optimizer.param_groups[0]['lr']))
    # lr_scheduler.step()
    # validation 
    if epoch % view_val_iter == 0: 
        val_acc_tmp, val_precision_tmp, val_recall_tmp, val_f1_tmp = [], [], [], []
        val_acc_result, val_precision_result, val_recall_result, val_f1_result = [], [], [], []
        val_time = time.time()
        for val_iter, (val_x, val_y_true) in enumerate(validationloader):
            model.eval()
            val_x, val_y_true = val_x.to(device), val_y_true.to(device)  # device(gpu)
            val_y_pred = model.forward(val_x)  # forward
            _, val_pred_index = torch.max(val_y_pred, 1)

            val_pred_index_cpu = val_pred_index.cpu().detach().numpy()
            val_y_true_cpu = val_y_true.cpu().detach().numpy()
            
            val_acc, val_precision, val_recall, val_f1 = get_clf_eval(val_y_true_cpu, val_pred_index_cpu)

            val_acc_tmp.append(val_acc), val_acc_result.append(val_acc)
            val_precision_tmp.append(val_precision), val_precision_result.append(val_precision)
            val_recall_tmp.append(val_recall), val_recall_result.append(val_recall)
            val_f1_tmp.append(val_f1), val_f1_result.append(val_f1)

        val_acc_mean = sum(val_acc_tmp, 0.0)/len(val_acc_tmp)
        val_precision_mean = sum(val_precision_tmp, 0.0)/len(val_precision_tmp)
        val_recall_mean = sum(val_recall_tmp, 0.0)/len(val_recall_tmp)
        val_f1_mean = sum(val_f1_tmp, 0.0)/len(val_f1_tmp)

        print("-"*100)
        print("|  Validation {:.2f}[s], Accuracy : {:.4f}, Precision : {:.4f}, Recall : {:.4f}, F1 Score : {:.4f}   |".format(
            time.time()-val_time, val_acc_mean, val_precision_mean, val_recall_mean, val_f1_mean))
        print("-"*100)
        if val_acc_mean >= save_point:
            epoch_str = str(epoch)
            lr_str = str(lr)
            batch_str= str(batch_size)
            acc_str= str((int(val_acc_mean*100)))
            model_name = "["+model._get_name()+"](epoch-"+epoch_str+")-"+"(init_lr-"+lr_str+")-"+"(batch-"+batch_str+")-"+"(acc-"+acc_str+").pt"
            save_path = os.path.join(path, dir_ ,model_name)
            parameters = {'epoch' : epoch, 'model_state_dict' : model.state_dict(), 'optimizer_state_dict' : optimizer.state_dict(), 'loss' : loss}
            torch.save(parameters, save_path)
            print('[INFO] Model Saved : '+ save_path)
writer.flush()
writer.close()

fig = plt.figure(figsize=[16, 8])
loss_plt = plt.subplot(2,1,1)
acc_plt = plt.subplot(2,1,2)

loss_plt.plot(loss_arr, color='red', marker="*")
loss_plt.set_title("Train - Loss", fontsize=15)
loss_plt.legend(['Train-Loss'])
loss_plt.grid(True, axis='y')

acc_plt.plot(train_acc, color='green', marker="*")
acc_plt.set_title("Train - Accuracy", fontsize=15)
acc_plt.legend(['Train-Accuracy'])
acc_plt.set_ylim((0.0, 1.05))
acc_plt.grid(True, axis='y')

plt.show()

11. Model Evaluation

test_start = time.time()

model.eval()
with torch.no_grad():
    test_acc_tmp, test_precision_tmp, test_recall_tmp, test_f1_tmp = [], [], [], []
    for test_iter, (test_x, test_y_true) in enumerate(testloader):
        test_x, test_y_true = test_x.to(device), test_y_true.to(device)
        test_y_pred = model.forward(test_x)  # forward

        _, test_pred_index = torch.max(test_y_pred, 1)

        test_pred_index_cpu = test_pred_index.cpu().detach().numpy()
        test_y_true_cpu = test_y_true.cpu().detach().numpy()
            
        test_acc, test_precision, test_recall, test_f1 = get_clf_eval(test_y_true_cpu, test_pred_index_cpu)

        test_acc_tmp.append(test_acc), test_precision_tmp.append(test_precision), test_recall_tmp.append(test_recall), test_f1_tmp.append(test_f1)

    test_acc_mean = sum(test_acc_tmp, 0.0)/len(test_acc_tmp)
    test_precision_mean = sum(test_precision_tmp, 0.0)/len(test_precision_tmp)
    test_recall_mean = sum(test_recall_tmp, 0.0)/len(test_recall_tmp)
    test_f1_mean = sum(test_f1_tmp, 0.0)/len(test_f1_tmp)
    print("[Evaluation] {:.2f}[s], Test Accuracy : {:.4f}, Precision : {:.4f}, Recall : {:.4f}, F1 Score : {:.4f}".format(
        time.time()-test_start, test_acc_mean, test_precision_mean, test_recall_mean, test_f1_mean))
    print("[Model Performance] Model Performance : {:.5f}".format(test_acc_mean))

Model Performance : 89.83%
Tensorboard

 

모델에 제시된 파라미터는 그대로 사용하고, 약간 변형하여 모델을 구축하였는데 높은 Accuracy를 보여줍니다. 

 

하지만 LOS/NLOS의 분류를 통해 UWB 성능을 올리는 방법을 제시하였지만, 논문의 Limitations으로 실제로 이 분류기를 통해 UWB 성능을 검증하지 못했습니다. 

 

그리고 제가 생각하는 또 다른 문제는 오픈소스의 데이터라고 생각합니다. 

이 데이터를 보았을 때, 굳이 딥러닝, 머신러닝을 사용할 필요가 있을까? 의문이 듭니다. 

1차원적으로 생각하였을 때 Threshold를 10000에서 잘라버리면, NLOS를 쉽게 지워버릴 수 있습니다. 

 

현 데이터는 허수(방향) 부분을 제외한 오직 크기의 성질만을 가지고 학습하였기 때문에, 과연 실제 환경 속에서 제대로 작동할지 의문이 듭니다. 

 

UWB 특성상 Nanosecond로 시간을 재는 방식이기 때문에 데이터 추출하는 것이 굉장히 굉장히 어려움이 있어 구현하는 것은 어려움이 있습니다. 그렇기 때문에 5년 전 오픈소스이지만, 2020년에도 이를 이용해 논문을 작성했을 것이라고 생각합니다.

 

728x90
반응형
반응형

안녕하세요. 오늘은 Indoor Positioning에서 [cm]단위의 오차를 내는 UWB 관련 논문에 이야기하겠습니다. 

 

coding-yoon.tistory.com/136?category=910542

 

[무선 통신] Bluetooth Low Energy(BLE) 1. Physical Layer

microchipdeveloper.com/wireless:start Wireless Communications - Developer Help Microchip offers a broad portfolio of wireless solutions which are cost effective and very easy to implement. Depending..

coding-yoon.tistory.com

BLE는 기기간 수신신호세기(RSSI)를 이용해 Indoor Positioning을 하는 방면, UWB는 nanosecond(10e-9)의 길이를 갖는 매우 짧은 펄스를 이용하여 통신하는 방식입니다. 

 

시간 분해능이 매우 높아 최단 경로와 다중 경로간의 신호구별을 용이하여 수 cm이내의 정확도로 거리 측정이 가능하고 낮은 전력 밀도로 저전력 대용량 데이터 전송이 가능합니다. 

 

 

대표적으로 TWR(Two Way Ranging) 방식으로 Distance를 추정합니다. 

 

t_round A : 노드 A가 거리측적용 메세지를 송신한 순간부터 노드 B의 응답 메세지를 수신했을 때의 시간, 즉 거리 측정                 메세지의 왕복시간(Round Trip Time : RTT)

t_reply B : 노드 B의 응답시간 또는 처리 시간(Processing Time)

t_p : 거리 측정 메세지의 단방향 도달 시간

TOA = t_p, C : 전파속도, 광속도(3e8)

 

일반 TWR은 두 기기간의 하드웨어적 차이로 인해 시간 측정이 달라 TOA에 오차가 발생할 수 있어, SDS-TWR 등 많은 방식이 존재합니다.

< [UWB] IEEE 802.15.4a UWB 기반 실내 위치측정 시스템의 설계 및 구현 > 

 

 

기본적으로 UWR을 이용하여 거리를 구하는 방법을 알아보았으며, Non-Line of Sight 에 대해 알아보겠습니다. 

 

ieeexplore.ieee.org/document/9108193

 

UWB NLOS/LOS Classification Using Deep Learning Method

Ultra-Wide-Band (UWB) was recognized as its great potential in constructing accurate indoor position system (IPS). However, indoor environments were full of complex objects, the signals might be reflected by the obstacles. Compared with the Line-Of-Sight (

ieeexplore.ieee.org

 

논문의 아이디어는 UWB LOS/NLOS 분류를 딥러닝을 통해 성능을 향상시키는 것입니다. 

(Line of Sight, Non-Line of Sight)

 

실내 환경은 다양한 장애물(책상, 캐비넷, 책장 등)으로 가득차있으며, 이 장애물을 통해 UWB 신호는 반사될 수 있으며, 신호 수신은 직접 수신 된 신호에 비해 거리 정보에 더 긴 전송 시간을 가져오고 추가적인 Time-varying bias를 유발합니다. 

cf) UWB 신호의 오차는 대게 양의 값(+)을 가집니다. UWB 신호는 Nanosecond 단위로 시간측정 방식이기 때문에, 신호가 반사되면 지연 시간이 더해져 오차로 발생하기 때문입니다.

 

반사를 통해 오차를 일으킨 NLOS를 제거하면, 정확한 거리를 구할 수 있습니다. 

 

가장 직접적인 접근 방식은 UWB 신호 전파 경로 손실 모델 또는 CIR (Channel Impulse Response)을 기반으로 NLOS / LOS 신호 특성을 분석하는 것입니다.

 

위 논문은 Doctor Klemen Bregar providing the UWB NLOS/LOS open source data 를 이용

github.com/ewine-project/UWB-LOS-NLOS-Data-Set

 

ewine-project/UWB-LOS-NLOS-Data-Set

Repository with UWB data traces representing LOS and NLOS channel conditions in 7 different indoor locations. - ewine-project/UWB-LOS-NLOS-Data-Set

github.com

 

이 포스팅은 두 편을 걸쳐 진행됩니다. ( 1편 : 오픈소스 데이터셋 전처리, 2편 : 논문 기반 네트워크를 통해 학습 )

 

Doctor Klemen Bregar providing the UWB NLOS/LOS open source data를 딥러닝을 위해 전처리 과정이 필요합니다. 

 

오픈소스 데이터 구성

오픈소스는 csv, 데이터를 불러오는 모듈로 구성되어 있습니다. 

4년 전 데이터이므로 pandas 버전 오류 해결을 위해 uwb_dataset.py를 약간 변경했습니다.

 

uwb_dataset.py

"""
Created on Feb 6, 2017
@author: Klemen Bregar 
"""

import os
import pandas as pd
from numpy import vstack


def import_from_files():
    """
        Read .csv files and store data into an array
        format: |LOS|NLOS|data...|
    """
    rootdir = '../dataset/'
    output_arr = []
    first = 1
    for dirpath, dirnames, filenames in os.walk(rootdir):
        for file in filenames:
            filename = os.path.join(dirpath, file)
            print(filename)
            output_data = [] 
            # read data from file
            df = pd.read_csv(filename, sep=',', header=0)
            # ------------------------ update Mar 3, 2021 ----------------------------- #
            columns_name = df.columns.values
            # ------------------------ update Mar 3, 2021 ----------------------------- #
            

            # ------------------------ update Mar 3, 2021 ----------------------------- #
            # input_data = df.as_matrix()
            # df.as_matrix() was depriciated after the version 0.23.0 use df.values()
            input_data = df.values
            
            # ------------------------ update Mar 3, 2021 ----------------------------- #
            
            # append to array
            if first > 0:
                first = 0
                output_arr = input_data
            else:
                output_arr = vstack((output_arr, input_data))
    
    return columns_name, output_arr

if __name__ == '__main__':

    # import raw data from folder with dataset
    print("Importing dataset to numpy array")
    print("-------------------------------")
    data = import_from_files()
    print("-------------------------------")
    # print dimensions and data
    print("Number of samples in dataset: %d" % len(data))
    print("Length of one sample: %d" % len(data[0]))
    print("-------------------------------")
    print("Dataset:")
    print(data)

 

1. Load UWB data

import numpy as np
import pandas as pd
import uwb_dataset

# import raw data
data = uwb_dataset.import_from_files()

# divide CIR by RX preable count (get CIR of single preamble pulse)
# item[2] represents number of acquired preamble symbols
for item in data:
	item[15:] = item[15:]/float(item[2])

print("\nColumns :", columns.shape, sep=" ")
print("Data :", data.shape, sep=" ")
print(type(data))

 

2. Create UWB(CIR) Pandas Dataframe

cir_n = len(columns[15:])

print("Columns :", columns, sep=" ")
print("Channel Inpulse Response Count :", cir_n, sep=" ")

df_uwb = pd.DataFrame(data=data, columns=columns)
print("Channel 2 count :", df_uwb.query("CH == 2")['CH'].count())
print("Null/NaN Data Count : ", df_uwb.isna().sum().sum())
df_uwb.head(3)

# LOS/NLOS Count
los_count = df_uwb.query("NLOS == 0")["NLOS"].count()
nlos_count = df_uwb.query("NLOS == 1")["NLOS"].count()

print("Line of Sight Count :", los_count)
print("Non Line of Sight Count :", nlos_count)

# Columns CIR Extract
df_uwb_data = df_uwb[["CIR"+str(i) for i in range(cir_n)]]
print("UWB DataFrame X for Trainndarray shape : ",df_uwb_data.values.shape)
df_uwb_data.head(5)

 

Columns : Sampling 1016 CIR

LOS : 21000, NLOS : 21000  Pandas Dataframe 

 

다음 글은 생선된 위 데이터 프레임을 이용해 논문에 제시된 CNN+LSTM 모델을 Pytorch로 구현하겠습니다.

coding-yoon.tistory.com/139

 

[무선 통신] UWB LOS/NLOS Classification Using Deep Learning Method (2)

안녕하세요. WB LOS/NLOS Classification Using Deep Learning Method(1)에서 UWB CIR Dataset을 생성하였다면, 2편으로 논문에서 제시한 CNN_LSTM 네트워크를 약간 변형하여 구성하겠습니다. coding-yoon.tistory..

coding-yoon.tistory.com

 

728x90
반응형

+ Recent posts