반응형

안녕하세요. 

 

Low Pass Filter, High Pass Filter, Band Pass Filter를 이용한 신호 처리와 FFT 변환으로 주파수 영역에서 필터가 제대로 동작했는지 확인하는 것까지 파이썬으로 구현합니다. 


from scipy import signal
import matplotlib.pyplot as plt
import numpy as np
import scipy.io
import os


mat_file = scipy.io.loadmat('signal1.mat')
(file_path, file_id) = os.path.split('signal1.mat')  # file path, file name

fs = 1024  # sample rate
order = 10  # order
cut_off_freq = 150  # cut off frequency

# raw signal
# t = np.linspace(0, 1, fs, False)  # 1 second
# sig = np.sin(2*np.pi*100*t) + np.sin(2*np.pi*50*t)  # signal
sig = mat_file[file_id[:-4]][0]

freq = np.fft.fftfreq(len(sig), 1/1024)

# filtered signal
sos = signal.butter(order, [cut_off_freq], 'low', fs=fs, output='sos')  # low pass filter
filtered = signal.sosfilt(sos, sig)

# raw signal fft
raw_fft = np.fft.fft(sig) / len(sig)
raw_fft_abs = abs(raw_fft)

# filter signal fft
filtered_fft = np.fft.fft(filtered) / len(filtered)
filtered_fft_abs = abs(filtered_fft)

# plot
fig, ((ax00, ax01), (ax10, ax11)) = plt.subplots(2, 2)

# raw signal plot : 0 row 0 column
ax00.plot(t, sig)
ax00.set_title('Raw Data Time Domain')
ax00.set_xlabel('Time [seconds]')
ax00.set_ylabel('Amplitude')

# filtered signal plot : 1 row 0 column
ax10.plot(t, filtered)
ax10.set_title('Filtered Data Time Domain')
ax10.set_xlabel('Time [seconds]')
ax10.set_ylabel('Amplitude')

# raw signal fft plot : 0 row 1 column
ax01.stem(freq, raw_fft_abs, use_line_collection=True)
ax01.set_title('Raw Data Frequency Domain')
ax01.set_xlabel('Frequency [HZ]')
ax01.set_ylabel('Amplitude')

# filtered signal fft plot : 1 row column
ax11.stem(freq,filtered_fft_abs, use_line_collection=True)
ax11.set_title('Filtered Data Frequency Domain')
ax11.set_xlabel('Frequency [HZ]')
ax11.set_ylabel('Amplitude')

# plot
plt.tight_layout()
plt.show()

 

 

1. Low Pass Filter

sos = signal.butter(order, [cut_off_freq], 'low', fs=fs, output='sos')
filtered = signal.sosfilt(sos, sig)

2. High Pass Filter

sos = signal.butter(order, [cut_off_freq], 'high', fs=fs, output='sos')
filtered = signal.sosfilt(sos, sig)

3. Band Pass Filter

sos = signal.butter(order, [150, 200], 'band', fs=fs, output='sos')
filtered = signal.sosfilt(sos, sig)

 


Low Pass Filter, Cut off frequency: 150Hz

 


High Pass Filter, Cut off frequency: 150Hz


Band Pass Filter, Cut off frequency: 150-200Hz

728x90
반응형
반응형

https://coding-yoon.tistory.com/23?category=830190

 

[파이썬 응용] 1탄 Scipy : 음성 신호를 LPF , HPF 돌려보기!

안녕하세요. 글은 계속 쓰는데 적는데 마음에 안들어서 전부 비공개로 해놨는데... 네 그렇다구요. 이번 글은 파이썬에서 Scipy를 통해서 음성신호를 필터링해볼려고 합니다. 음성신호를 필터링했으면 matplotlib..

coding-yoon.tistory.com

파이썬에는 장점 중 하나는 여러가지 라이브러리가 구현되어 있고, 손쉽게 이용할 수 있습니다.

 

위 글에 들어가보시면, Scipy 라이브러리에는 Low Pass Filter, High Pass Filter 등등 필터를 구현되어 있습니다. 

 

그렇다면 우리들 또한 라이브러리를 만들 수 있고, 배포할 수 있습니다.

 

이번 글은 Low Pass Filter, High Pass Filter, Band Pass Filter를 Scipy 라이브러리를 사용하지 않고 모듈화시키고 구현하겠습니다.

 

우선, 구현된 모습을 보겠습니다.

 

Low Pass FIlter
High Pass Filter
Band Pass Filter

위 그림은 제가 Scipy 라이브러리를 사용하지 않았고, 제가 모듈화를 시켜서 돌린 것입니다.

 

파이썬에서 모듈화를 하는 방법은 굉장히 간단합니다.

 

예를 들어 filter_module의 이름을 가진 라이브러리라면, filter_module.py로 파이썬 파일을 만들어줍니다.

 

그리고 자신이 원하는 부분을 구현하면 됩니다.

 

단, 조건이 있다면 같은 프로젝트 안에 있어야 합니다. 아무리 같은 폴더 안에 있다 하더라도 같은 프로젝트로 공유하고 있지 않다면 인식을 할 수 없습니다. 

 

# filter_module.py
import numpy as np


# Low Pass Filter
def lpf_module(raw_data, time, f_cut_lpf, order, sr):
    """
    :param raw_data: Low Pass Filter 통과할 데이터
    :param time: 시간
    :param f_cut_lpf: 차단 주파수
    :param order: 차수
    :param sr: sample rate
    :param current_list: 진행도를 보여줄 list
    :return result: Low Pass Filter 데이터
    """
    sample_time = 1 / sr
    w_cut_lpf = 2 * np.pi * f_cut_lpf
    w_cut_period = 1 / w_cut_lpf
    raw_data_temp = raw_data.copy()
    previous_low_result = 0
    low_result = []
    low_list_result = []

    step = 0
    # low pass filter
    for k in range(order, 0, -1):
        for j in range(0, len(raw_data)):
            for i in range(0, len(time)):
                y = (w_cut_period * previous_low_result + sample_time * raw_data_temp[j][i]) / (
                        w_cut_period + sample_time)
                previous_low_result = y
                low_result.append(y)
            if k > 1:
                raw_data_temp[j] = low_result
                low_result = []
            if k == 1:
                low_list_result = np.append(low_list_result, low_result)
            low_result = []

    result = low_list_result.reshape(len(raw_data_temp), len(time))
    return result


# High Pass Filter
def hpf_module(raw_data, time, f_cut_hpf, order, sr):
    """
    :param raw_data: High Pass Filter 통과할 데이터
    :param time: 시간
    :param f_cut_hpf: 차단 주파수
    :param order: 차수
    :param sr: sample rate
    :param current_list: 진행도를 보여줄 list
    :return result: High Pass Filter 데이터
    """
    w_cut_hpf = 2 * np.pi * f_cut_hpf
    w_cut_period = 1 / w_cut_hpf
    sample_time = 1/sr
    """
    raw_data_temp = self.raw_data  # 주소를 복사
    raw_data_temp = self.raw_data.copy  # 값을 복사
    """
    raw_data_temp = raw_data.copy()  # raw data
    previous_high_result = 0  # previous result
    previous_raw_data = 0  # previous raw data
    high_result = []  # result
    high_list_result = []
    step = 0

    # high pass filter
    for k in range(order, 0, -1):
        for j in range(0, len(raw_data_temp)):
            for i in range(0, len(time)):
                y = w_cut_period / (w_cut_period + sample_time) * \
                    previous_high_result + w_cut_period / \
                    (w_cut_period + sample_time) * (
                            raw_data_temp[j][i] - previous_raw_data)
                previous_high_result = y
                previous_raw_data = raw_data_temp[j][i]
                high_result = np.append(high_result, y)
            if k > 1:
                raw_data_temp[j] = high_result
                high_result = []
            if k == 1:
                high_list_result = np.append(high_list_result, high_result)
            high_result = []

    # filter data reshape ( np.append 를 사용하면 일차배열로 쌓임 )
    result = high_list_result.reshape(len(raw_data_temp), len(time))
    return result


# Band Pass Filter
def bpf_module(raw_data, time, f_cut_hpf, f_cut_lpf, order, sr, current_list=None):
    """
        :param raw_data: Band Pass Filter 통과할 데이터
        :param time: 시간
        :param f_cut_lpf: 차단 주파수
        :param f_cut_hpf: 차단 주파수
        :param order: 차수
        :param sr: sample rate
        :param current_list: 진행도를 보여줄 list
        :return result: Band Pass Filter 데이터
        """
    sample_time = sr
    raw_data_temp = raw_data.copy()  # raw data
    w_cut_lpf = 2 * np.pi * f_cut_lpf
    w_cut_lpf_period = 1 / w_cut_lpf

    previous_low_result = 0  # previous raw data
    low_result = []  # result
    low_list_result = []
    previous_raw_data = 0  # previous raw data
    step = 0

    # low pass filter
    for k in range(order, 0, -1):
        for j in range(0, len(raw_data_temp)):
            for i in range(0, len(time)):
                y = (w_cut_lpf_period * previous_low_result + sample_time * raw_data_temp[j][i]) / (
                        w_cut_lpf_period + sample_time)
                previous_low_result = y
                low_result = np.append(low_result, y)
            if k > 1:
                raw_data_temp[j] = low_result
                low_result = []
            if k == 1:
                low_list_result = np.append(low_list_result, low_result)
            low_result = []

    raw_data_temp = low_list_result.reshape(len(raw_data_temp), len(time))
    previous_high_result = 0  # previous result
    w_cut_hpf = 2 * np.pi * f_cut_hpf
    w_cut_period = 1 / w_cut_hpf
    high_result = []  # result
    high_list_result = []
    step = 0
    # high pass filter
    for k in range(order, 0, -1):
        for j in range(0, len(raw_data_temp)):
            for i in range(0, len(time)):
                y = w_cut_period / (w_cut_period + sample_time) * \
                    previous_high_result + w_cut_period / \
                    (w_cut_period + sample_time) * (
                            raw_data_temp[j][i] - previous_raw_data)
                previous_high_result = y
                previous_raw_data = raw_data_temp[j][i]
                high_result = np.append(high_result, y)
            if k > 1:
                raw_data_temp[j] = high_result
                high_result = []
            if k == 1:
                high_list_result = np.append(high_list_result, high_result)
            high_result = []
    # filter data reshape ( np.append 를 사용하면 일차배열로 쌓임 )
    result = high_list_result.reshape(len(raw_data_temp), len(time))
    return result

 

 

이렇게 Low Pass Filter, High Pass Filter, Band Pass Filter를 구현된 저만의 라이브러리를 만들었습니다. 

 

그렇다면 어떻게 사용할까요? 사용방법도 굉장히 간단합니다. 평소 우리들이 다운받아 사용하는 라이브러리처럼

import 해서 불러오면 됩니다. from에는 파일명이 들어갑니다. 

from filter_module import *
"""
from filter_module import lpf_module
from filter_module import hpf_module
from filter_module import bpf_module
"""

raw_data = "데이터가 담겨 있는 이차원 배열형식"
time = "데이터의 시간영역"
cut_off_frequency = "차단주파수"
order = "차수"
sr = "sample rate"

# 저역통과필터
lpf_result = lpf_module(raw_data, time, cut_off_frequency, order, sr)
# 고역통과필터
hpf_result = hpf_module(raw_data, time, cut_off_frequency, order, sr)

이런식으로 모듈화를 시켜 프로그래밍을 진행하게 되면, 나중에 팀프로젝트할 때도 상당히 수월하며, 몇 달이 지나 수정

 

하고 싶다고 해도 비교적 쉽게 할 수 있습니다. 그리고 모듈화의 큰 장점은 재사용성입니다. 

 

나중에 필터기능이 필요할 때, 위 코딩처럼 주석과 모듈화를 시켜놓으면 filter_module.py만 불러오기만 하면 됩니다. 

 

처음엔 복잡할지라도 점점 코딩이 이뻐지기 시작하면서 기분도 좋아집니다@@

728x90
반응형

+ Recent posts