|
python信号处理之特征提取
python玩转信号处理与机器学习
通过手机采集的振动信号识别人体的动作。
- import numpy as np
- import os
- import matplotlib.pyplot as plt
- from collections import defaultdict, Counter
- from detecta import detect_peaks
- '''
- python信号处理之特征提取
- python玩转信号处理与机器学习
- 通过手机采集的振动信号识别人体的动作。
- 该数据集通过在30个不同年龄分布的志愿者上做实验采集得到,在志愿者的腰上固定一手机,手机以50Hz的采样频率采集内嵌加速度计和陀螺仪的数据,
- 志愿者被要求做以下六个动作:walking(行走),walking upstairs(爬楼梯),walking downstairs(下楼梯),
- sitting(坐着),standing(站着),laying(躺下)。
- 在滤除噪声之后,通过滑动窗口的方式将信号切分成2.56s的窗口,每个窗口包含128个样本。
- 该数据集包含三组数据three-axial linear body acceleration(去除重力加速度的加速度计数据)、
- three-axial linear total acceleration(包含重力加速度的加速度计数据)、
- three-axial angular velocity(陀螺仪的数据),
- 因此共有九个分量,其中训练集有7392个窗口,测试集有2947个窗口
- pip3 install siml 不需要这个
- pip3 install detecta
- 那么提取什么特征呢,一种方式是提取脉冲(peak)发生时所在的横纵坐标,我们提取频谱中的前5个脉冲的横纵坐标作为特征
- 对于每个变换,我们取前五个峰值的横纵坐标(若少于五个则填充0)
- 这270个特性中的一些特征将比其他特征提供更多的信息,若我们能主动地选择对分类很重要的特征进行组合,或者对超参数进行优化,相信准确率还能继续提高。
- '''
- # The train dataset contains 7352 signals, each one of length 128 and 9 components
- # The test dataset contains 2947 signals, each one of length 128 and 9 components
- # The train dataset contains 7352 labels, with the following distribution:
- # Counter({6: 1407, 5: 1374, 4: 1286, 1: 1226, 2: 1073, 3: 986})
- # The test dataset contains 2947 labels, with the following distribution:
- # Counter({6: 537, 5: 532, 1: 496, 4: 491, 2: 471, 3: 420})
- activities_description = {
- 1: 'walking',
- 2: 'walking upstairs',
- 3: 'walking downstairs',
- 4: 'sitting',
- 5: 'standing',
- 6: 'laying'
- }
- def read_signals(filename):
- with open(filename, 'r') as fp:
- data = fp.read().splitlines()
- data = map(lambda x: x.rstrip().lstrip().split(), data)
- data = [list(map(float, line)) for line in data]
- return data
- def read_labels(filename):
- with open(filename, 'r') as fp:
- activities = fp.read().splitlines()
- activities = list(map(int, activities))
- return activities
- def randomize(dataset, labels):
- permutation = np.random.permutation(labels.shape[0])
- shuffled_dataset = dataset[permutation, :, :]
- shuffled_labels = labels[permutation]
- return shuffled_dataset, shuffled_labels
- INPUT_FOLDER_TRAIN = 'D:\\UCI HAR Dataset\\UCI HAR Dataset\\train\\Inertial Signals\\'
- INPUT_FOLDER_TEST = 'D:\\UCI HAR Dataset\\UCI HAR Dataset\\test\\Inertial Signals\\'
- INPUT_FILES_TRAIN = ['body_acc_x_train.txt', 'body_acc_y_train.txt', 'body_acc_z_train.txt',
- 'body_gyro_x_train.txt', 'body_gyro_y_train.txt', 'body_gyro_z_train.txt',
- 'total_acc_x_train.txt', 'total_acc_y_train.txt', 'total_acc_z_train.txt']
- INPUT_FILES_TEST = ['body_acc_x_test.txt', 'body_acc_y_test.txt', 'body_acc_z_test.txt',
- 'body_gyro_x_test.txt', 'body_gyro_y_test.txt', 'body_gyro_z_test.txt',
- 'total_acc_x_test.txt', 'total_acc_y_test.txt', 'total_acc_z_test.txt']
- LABELFILE_TRAIN = 'D:\\UCI HAR Dataset\\UCI HAR Dataset\\train\\y_train.txt'
- LABELFILE_TEST = 'D:\\UCI HAR Dataset\\UCI HAR Dataset\\test\\y_test.txt'
- train_signals, test_signals = [], []
- for input_file in INPUT_FILES_TRAIN:
- # signal = read_signals(INPUT_FOLDER_TRAIN + input_file)
- signal = read_signals(os.path.join(INPUT_FOLDER_TRAIN, input_file))
- # print("读入%s文件:" % input_file, signal[0], len(signal[0]))
- # assert 0 == 1, "停"
- train_signals.append(signal)
- # 训练的信号: (9, 7352, 128) 有9个文件
- print("训练的信号:", np.array(train_signals).shape)
- train_signals = np.transpose(np.array(train_signals), (1, 2, 0))
- print("交换通道后的训练的信号:", train_signals.shape)
- # assert 0 == 1, "停"
- for input_file in INPUT_FILES_TEST:
- signal = read_signals(INPUT_FOLDER_TEST + input_file)
- test_signals.append(signal)
- test_signals = np.transpose(np.array(test_signals), (1, 2, 0))
- print("交换通道后的test的信号:", test_signals.shape)
- train_labels = read_labels(LABELFILE_TRAIN)
- test_labels = read_labels(LABELFILE_TEST)
- print("训练的标签文件长度:", len(train_labels), train_labels[:10], set(train_labels))
- print("test的标签文件长度:", len(test_labels), test_labels[:10], set(test_labels))
- # assert 0 == 1, "停"
- [no_signals_train, no_steps_train, no_components_train] = np.shape(train_signals)
- [no_signals_test, no_steps_test, no_components_test] = np.shape(test_signals)
- no_labels = len(np.unique(train_labels))
- print("去重之后的标签数:", no_labels)
- print("The train dataset contains {} signals, each one of length {} and {} components ".format(no_signals_train,
- no_steps_train,
- no_components_train))
- print("The test dataset contains {} signals, each one of length {} and {} components ".format(no_signals_test,
- no_steps_test,
- no_components_test))
- # 这个计数器:统计每个标签重复的次数
- print("The train dataset contains {} labels, with the following distribution:\n {}".format(np.shape(train_labels)[0],
- Counter(train_labels)))
- print("The test dataset contains {} labels, with the following distribution:\n {}".format(np.shape(test_labels)[0],
- Counter(test_labels[:])))
- print("随机打乱 训练与测试数据集")
- train_signals, train_labels = randomize(train_signals, np.array(train_labels))
- test_signals, test_labels = randomize(test_signals, np.array(test_labels))
- # 数据可视化
- N = 128 # 采样个数
- f_s = 50 # 采样频率
- denominator = 10 # 分母
- # 随机选择一个信号样本(signal为长度128的列表)
- signal_no = 15
- signals = train_signals[signal_no, :, :]
- # 这里选择3是这个文件:前面已经打乱了
- signal = signals[:, 3]
- label = train_labels[signal_no]
- activity_name = activities_description[label]
- print("选择的具体某一个信号样本:", signals.shape, signal.shape, activity_name)
- print("看看具体这个样本数据:", signal)
- def get_values(y_values, N, f_s):
- f_values = np.linspace(0.0, N / f_s, N)
- return f_values, y_values
- f_values, signal_values = get_values(signal, N, f_s)
- plt.plot(f_values, signal_values, linestyle='-', color='blue')
- plt.xlabel('Time [sec]', fontsize=16)
- plt.ylabel('Amplitude', fontsize=16)
- plt.title("Time domain of the signal", fontsize=16)
- # plt.savefig('01_time_domain.png', dpi=200)
- plt.show()
- from scipy.fftpack import fft
- def get_fft_values(y_values, N, f_s):
- f_values = np.linspace(0.0, f_s / 2.0, N // 2)
- fft_values_ = fft(y_values)
- fft_values = 2.0 / N * np.abs(fft_values_[0:N // 2])
- return f_values, fft_values
- f_values, fft_values = get_fft_values(signal, N, f_s)
- plt.plot(f_values, fft_values, linestyle='-', color='blue')
- plt.xlabel('Frequency [Hz]', fontsize=16)
- plt.ylabel('Amplitude', fontsize=16)
- plt.title("Frequency domain of the signal", fontsize=16)
- # plt.savefig('02_fft.png', dpi=200)
- plt.show()
- from scipy.signal import welch
- def get_psd_values(y_values, N, f_s):
- f_values, psd_values = welch(y_values, fs=f_s)
- return f_values, psd_values
- f_values, psd_values = get_psd_values(signal, N, f_s)
- plt.plot(f_values, psd_values, linestyle='-', color='blue')
- plt.xlabel('Frequency [Hz]', fontsize=16)
- plt.ylabel('PSD [V**2 / Hz]', fontsize=16)
- # plt.savefig('03_psd.png', dpi=200)
- plt.show()
- def autocorr(x):
- result = np.correlate(x, x, mode='full')
- return result[len(result) // 2:]
- def get_autocorr_values(y_values, N, f_s):
- autocorr_values = autocorr(y_values)
- x_values = np.array([1.0 * jj / f_s for jj in range(0, N)])
- return x_values, autocorr_values
- t_values, autocorr_values = get_autocorr_values(signal, N, f_s)
- plt.plot(t_values, autocorr_values, linestyle='-', color='blue')
- plt.xlabel('time delay [s]', fontsize=16)
- plt.ylabel('Autocorrelation amplitude', fontsize=16)
- # plt.savefig('04_autocorr.png', dpi=200)
- plt.show()
- labels = ['x-component', 'y-component', 'z-component']
- colors = ['r', 'g', 'b']
- suptitle = "Different signals for the activity: {}"
- xlabels = ['Time [sec]', 'Freq [Hz]', 'Freq [Hz]', 'Time lag [s]']
- ylabel = 'Amplitude'
- axtitles = [['Acceleration', 'Gyro', 'Total acceleration'],
- ['FFT acc', 'FFT gyro', 'FFT total acc'],
- ['PSD acc', 'PSD gyro', 'PSD total acc'],
- ['Autocorr acc', 'Autocorr gyro', 'Autocorr total acc']
- ]
- list_functions = [get_values, get_fft_values, get_psd_values, get_autocorr_values]
- f, axarr = plt.subplots(nrows=4, ncols=3, figsize=(12, 12))
- f.suptitle(suptitle.format(activity_name), fontsize=16)
- for row_no in range(0, 4):
- for comp_no in range(0, 9):
- col_no = comp_no // 3
- plot_no = comp_no % 3
- color = colors[plot_no]
- label = labels[plot_no]
- axtitle = axtitles[row_no][col_no]
- xlabel = xlabels[row_no]
- value_retriever = list_functions[row_no]
- ax = axarr[row_no, col_no]
- ax.set_title(axtitle, fontsize=16)
- ax.set_xlabel(xlabel, fontsize=16)
- if col_no == 0:
- ax.set_ylabel(ylabel, fontsize=16)
- signal_component = signals[:, comp_no]
- x_values, y_values = value_retriever(signal_component, N, f_s)
- ax.plot(x_values, y_values, linestyle='-', color=color, label=label)
- if row_no > 0:
- max_peak_height = 0.1 * np.nanmax(y_values)
- indices_peaks = detect_peaks(y_values, mph=max_peak_height)
- ax.scatter(x_values[indices_peaks], y_values[indices_peaks], c=color, marker='*', s=60)
- if col_no == 2:
- ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
- plt.tight_layout()
- plt.subplots_adjust(top=0.90, hspace=0.6)
- # plt.savefig('05_all.png', dpi=200)
- plt.show()
- # 特征提取
- def get_first_n_peaks(x, y, no_peaks=5):
- x_, y_ = list(x), list(y)
- if len(x_) >= no_peaks:
- return x_[:no_peaks], y_[:no_peaks]
- else:#少于5个peaks,以0填充
- missing_no_peaks = no_peaks-len(x_)
- return x_ + [0]*missing_no_peaks, y_ + [0]*missing_no_peaks
- def get_features(x_values, y_values, mph):
- indices_peaks = detect_peaks(y_values, mph=mph)
- peaks_x, peaks_y = get_first_n_peaks(
- x_values[indices_peaks], y_values[indices_peaks])
- return peaks_x + peaks_y
- def extract_features_labels(dataset, labels, N, f_s, denominator):
- percentile = 5
- list_of_features = []
- list_of_labels = []
- for signal_no in range(0, len(dataset)):
- # signal_no 信号序号
- features = []
- list_of_labels.append(labels[signal_no])
- for signal_comp in range(0, dataset.shape[2]):
- # signal_component = 9
- signal = dataset[signal_no, :, signal_comp]
- assert len(signal) == 128, "某条波=128"
- # 这是在干啥?
- signal_min = np.nanpercentile(signal, percentile)
- signal_max = np.nanpercentile(signal, 100-percentile)
- #ijk = (100 - 2*percentile)/10
- #set minimum peak height
- mph = signal_min + (signal_max - signal_min)/denominator
- features += get_features(*get_psd_values(signal, N, f_s), mph)
- features += get_features(*get_fft_values(signal, N, f_s), mph)
- features += get_features(*get_autocorr_values(signal, N, f_s), mph)
- list_of_features.append(features)
- return np.array(list_of_features), np.array(list_of_labels)
- print("开始提取特征train:", train_signals.shape, train_labels.shape)
- X_train, Y_train = extract_features_labels(train_signals, train_labels, N, f_s, denominator)
- print("开始提取特征test:", test_signals.shape, test_labels.shape)
- X_test, Y_test = extract_features_labels(test_signals, test_labels, N, f_s, denominator)
- print("提取完毕特征train:", X_train.shape, Y_train.shape)
- print("提取完毕特征test:", X_test.shape, Y_test.shape)
- # 保存提取的特征
- np.save("train.npy", np.hstack([X_train, np.reshape(Y_train, newshape=[-1, 1])]))
- np.save("test.npy", np.hstack([X_test, np.reshape(Y_test, newshape=[-1, 1])]))
复制代码
|
|