文章目录

  • 一、理论基础
    • 1、小波神经网络结构
    • 2、前向传播过程
    • 3、反向传播过程
    • 4、建模步骤
  • 二、小波神经网络的实现
    • 1、训练过程(WNN.py)
    • 2、测试过程(test.py)
    • 3、测试结果
    • 4、参考源码及实验数据集

一、理论基础

小波神经网络(Wavelet Neural Network,简称WNN)是基于小波变换理论构造而成,其原理原理与反向传播神经网络(BPNN)较为接近,最主要的特征是它的隐含层神经元激活函数为小波基函数,这一特性使其充分利用了小波变换的局部化性质和神经网络的大规模数据并行处理、自学习能力,因而具有较强的逼近能力和较快的收敛速度。
反向传播神经网络(BPNN)原理参考:
反向传播神经网络(BPNN)的实现(Python,附源码及数据集)

1、小波神经网络结构

小波神经网络的结构图如下图所示:

2、前向传播过程

假设输入层、隐含层、输出层的节点数分别为n、i和m,则数据由输出层传递到隐含层时,隐含层第j个节点的输入数据的计算公式如下:

其中x_k为输入数据中第k个样本数据,ω_kj为隐含层节点的连接权值。
上述计算结果在隐含层节点处进行小波基的伸缩变化,具体的变换公式如下:

其中∅(x)为小波基函数,b_j为基函数的平滑因子,a_j为基函数的伸缩因子,h_j为隐含层第j个节点的输出数据。
最后隐含层第j个节点的输出数据进入输出层,经过计算后从输出层的t个节点输出,此节点上的计算公式如下:

其中ω_jt为输出层的连接权值,φ为激活函数。
激活函数原理参考:
神经网络基础知识之激活函数

3、反向传播过程

由前向传播过程可以了解到,数据在神经元与神经元之间的传递是单向的,每个神经元只接受上一层神经元传递过来的数据并对其处理。在这个处理过程中,小波神经网络主要有四个参数参与计算,这四个参数分别是小波基函数的平滑因子b_j与伸缩因子a_j以及隐含层与输出层的两个连接权值,这四个参数值的大小将直接影响网络的性能。因此WNN的训练过程如BPNN一样主要使用反向传播算法如随机梯度下降法(SGD)对这四个参数进行不断的修正。
以输出层的权值为例,其更新公式如下:

其中E为误差函数,μ为学习率。
损失函数原理参考:
机器学习基础知识之损失函数
反向传播原理参考:
神经网络之反向传播算法(梯度、误差反向传播算法BP)

4、建模步骤

以使用小波神经网络进行预测为例,可以将小波神经网络预测模型的建模步骤总结如下:

  1. 根据输入数据的相关特征确定小波神经网络输入层、隐含层以及输出层的节点数;
  2. 选择一种参数初始化方法对小波神经网络隐含层的连接权值、平滑因子和伸缩因子、输出层的连接权值进行随机初始化;
  3. 数据由输入层输入小波神经网络,传递至隐含层后经小波变换对数据进行非线性转换;
  4. 数据在隐含层输出后传递至输出层,在与输出层的连接权值进行线性计算后由激活函数进行非线性转换,最后得到网络的前向传播输出;
  5. 选择一种损失函数对网络的前向传播输出以及目标值进行相关计算得到损失值;
  6. 以输出层的损失值计算得到输出层连接权值以及阈值的梯度,选择一种反向传播算法对它们进行调整;
  7. 损失值传递至隐含层,同样使用相同的反向传播算法对隐含层的中心点以及宽度向量进行调整;
  8. 获得一个参数得到更新后的小波神经网络;
  9. 在达到最大迭代次数或满足停止迭代条件之前,重复步骤4到步骤8,在达到最大迭代次数后,输出所有参数确定的小波神经网络。

参数初始化方法参考:
神经网络基础知识之参数初始化

二、小波神经网络的实现

以数据预测为例,下面介绍基于Python实现小波神经网络的过程。
选用某省市的表层土壤重金属元素数据集作为实验数据,该数据集总共96组,随机选择其中的24组作为测试数据集,72组作为训练数据集。选取重金属Ti的含量作为待预测的输出特征,选取重金属Co、Cr、Mg、Pb作为模型的输入特征。

1、训练过程(WNN.py)

#库的导入import numpy as npimport pandas as pdimport math#激活函数def tanh(x):    return (np.exp(x)-np.exp(-x))/(np.exp(x)+np.exp(-x))#激活函数偏导数def de_tanh(x):    return (1-x**2)#小波基函数def wavelet(x):    return (math.cos(1.75*x)) * (np.exp((x**2)/(-2)))#小波基函数偏导数def de_wavelet(x):    y = (-1) * (1.75 * math.sin(1.75 * x)  + x * math.cos(1.75 * x)) * (np.exp(( x **2)/(-2)))    return y#参数设置samnum = 72   #输入数据数量hiddenunitnum = 8   #隐含层节点数indim = 4   #输入层节点数outdim = 1   #输出层节点数maxepochs = 500   #迭代次数errorfinal = 0.65*10**(-3)   #停止迭代训练条件learnrate = 0.001   #学习率#输入数据的导入df = pd.read_csv("train.csv")df.columns = ["Co", "Cr", "Mg", "Pb", "Ti"]Co = df["Co"]Co = np.array(Co)Cr = df["Cr"]Cr = np.array(Cr)Mg=df["Mg"]Mg=np.array(Mg)Pb = df["Pb"]Pb =np.array(Pb)Ti = df["Ti"]Ti = np.array(Ti)samplein = np.mat([Co,Cr,Mg,Pb])#数据归一化,将输入数据压缩至0到1之间,便于计算,后续通过反归一化恢复原始值sampleinminmax = np.array([samplein.min(axis=1).T.tolist()[0],samplein.max(axis=1).T.tolist()[0]]).transpose()#对应最大值最小值#待预测数据为Tisampleout = np.mat([Ti])sampleoutminmax = np.array([sampleout.min(axis=1).T.tolist()[0],sampleout.max(axis=1).T.tolist()[0]]).transpose()#对应最大值最小值sampleinnorm = ((np.array(samplein.T)-sampleinminmax.transpose()[0])/(sampleinminmax.transpose()[1]-sampleinminmax.transpose()[0])).transpose()sampleoutnorm = ((np.array(sampleout.T)-sampleoutminmax.transpose()[0])/(sampleoutminmax.transpose()[1]-sampleoutminmax.transpose()[0])).transpose()#给归一化后的数据添加噪声noise = 0.03*np.random.rand(sampleoutnorm.shape[0],sampleoutnorm.shape[1])sampleoutnorm += noise#scale = np.sqrt(3/((indim+outdim)*0.5))w1 = np.random.uniform(low=-scale,high=scale,size=[hiddenunitnum,indim])b = np.random.uniform(low=-scale, high=scale, size=[hiddenunitnum,1])a = np.random.uniform(low=-scale, high=scale, size=[hiddenunitnum,1])w2 = np.random.uniform(low=-scale,high=scale,size=[hiddenunitnum,outdim])#对隐含层的连接权值w1、平滑因子被b和伸缩因子a、输出层的连接权值w2进行随机初始化inputin=np.mat(sampleinnorm.T)w1=np.mat(w1)b=np.mat(b)a=np.mat(a)w2=np.mat(w2)#errhistory存储每次迭代训练计算的误差errhistory = np.mat(np.zeros((1,maxepochs)))#开始训练for i in range(maxepochs):    #前向计算:    #hidden_out为隐含层输出    hidden_out = np.mat(np.zeros((samnum,hiddenunitnum)))    for m in range(samnum):        for j in range(hiddenunitnum):            d=((inputin[m, :] * w1[j, :].T) - b[j,:]) * (a[j,:] ** (-1))            hidden_out[m,j] = wavelet(d)    #output为输出层输出    output = tanh(hidden_out * w2)    #计算误差    out_real = np.mat(sampleoutnorm.transpose())    err = out_real - output    loss = np.sum(np.square(err))    #判断是否停止训练    if loss < errorfinal:        break    errhistory[:,i] = loss    #反向计算    out_put=np.array(output.T)    belta=de_tanh(out_put).transpose()    #分别计算每个参数的误差项    for j in range(hiddenunitnum):        sum1 = 0.0        sum2 = 0.0        sum3 = 0.0        sum4 = 0.0        sum5 = 0.0        for m in range(samnum):            sum1+= err[m,:] * belta[m,:] * w2[j,:] * de_wavelet(hidden_out[m,j]) * (inputin[m,:] / a[j,:])            #1*1            sum2+= err[m,:] * belta[m,:] * w2[j,:] * de_wavelet(hidden_out[m,j]) * (-1) * (1 / a[j,:])            #1*1            sum3+= err[m,:] * belta[m,:] * w2[j,:] * de_wavelet(hidden_out[m,j]) * (-1) * ((inputin[m,:] * w1[j,:].T - b[j,:]) / (a[j,:] * a[j,:]))            #1*1            sum4+= err[m,:] * belta[m,:] * hidden_out[m,j]        delta_w1 = sum1        delta_b = sum2        delta_a = sum3        delta_w2 = sum4        #根据误差项对四个参数进行更新        w1[j,:] = w1[j,:] + learnrate * delta_w1        b[j,:] = b[j,:] + learnrate * delta_b        a[j,:] = a[j,:] + learnrate * delta_a        w2[j,:] = w2[j,:] + learnrate * delta_w2    print("the generation is:",i+1,",the loss is:",loss)print('更新的w1:',w1)print('更新的b:',b)print('更新的w2:',w2)print('更新的a:',a)print("The loss after iteration is :",loss)np.save("w1.npy",w1)np.save("b.npy",b)np.save("w2.npy",w2)np.save("a.npy",a)

2、测试过程(test.py)

#库的导入import numpy as npimport pandas as pdimport math#小波基函数def wavelet(x):    return (math.cos(1.75*x)) * (np.exp((x**2)/(-2)))#激活函数tanhdef tanh(x):    return (np.exp(x)-np.exp(-x))/(np.exp(x)+np.exp(-x))#输入数据的导入,用于测试数据的归一化与返归一化df = pd.read_csv("train.csv")df.columns = ["Co", "Cr", "Mg", "Pb", "Ti"]Co = df["Co"]Co = np.array(Co)Cr = df["Cr"]Cr = np.array(Cr)Mg=df["Mg"]Mg=np.array(Mg)Pb = df["Pb"]Pb =np.array(Pb)Ti = df["Ti"]Ti = np.array(Ti)samplein = np.mat([Co,Cr,Mg,Pb])sampleinminmax = np.array([samplein.min(axis=1).T.tolist()[0],samplein.max(axis=1).T.tolist()[0]]).transpose()#对应最大值最小值sampleout = np.mat([Ti])sampleoutminmax = np.array([sampleout.min(axis=1).T.tolist()[0],sampleout.max(axis=1).T.tolist()[0]]).transpose()#对应最大值最小值#导入WNN.py训练好的参数w1=np.load('w1.npy')b=np.load('b.npy')a=np.load('a.npy')w2=np.load('w2.npy')w1 = np.mat(w1)w2 = np.mat(w2)b = np.mat(b)a = np.mat(a)#隐含层节点数hiddenunitnum = 8#测试数据数量testnum = 24#测试数据的导入df = pd.read_csv("test.csv")df.columns = ["Co", "Cr", "Mg", "Pb", "Ti"]Co = df["Co"]Co = np.array(Co)Cr = df["Cr"]Cr = np.array(Cr)Mg=df["Mg"]Mg=np.array(Mg)Pb = df["Pb"]Pb =np.array(Pb)Ti = df["Ti"]Ti = np.array(Ti)input=np.mat([Co,Cr,Mg,Pb])#测试数据中输入数据的归一化inputnorm=(np.array(input.T)-sampleinminmax.transpose()[0])/(sampleinminmax.transpose()[1]-sampleinminmax.transpose()[0])#hidden_out2用于保存隐含层输出hidden_out = np.mat(np.zeros((testnum,hiddenunitnum)))#计算隐含层输出for m in range(testnum):    for j in range(hiddenunitnum):        d = ((inputnorm[m, :] * w1[j, :].T) - b[j, :]) * (a[j, :] ** (-1))        hidden_out[m, j] = wavelet(d)#计算输出层输出output = tanh(hidden_out * w2 )#对输出结果进行反归一化diff = sampleoutminmax[:,1]-sampleoutminmax[:,0]networkout2 = output*diff+sampleoutminmax[0][0]networkout2 = np.array(networkout2).transpose()output1=networkout2.flatten()#降成一维数组output1=output1.tolist()for i in range(testnum):    output1[i] = float('%.2f'%output1[i])print("the prediction is:",output1)#将输出结果与真实值进行对比,计算误差output=Tirmse = (np.sum(np.square(output-output1))/len(output)) ** 0.5mae = np.sum(np.abs(output-output1))/len(output)average_loss1=np.sum(np.abs((output-output1)/output))/len(output)mape="%.2f%%"%(average_loss1*100)f1 = 0for m in range(testnum):    f1 = f1 + np.abs(output[m]-output1[m])/((np.abs(output[m])+np.abs(output1[m]))/2)f2 = f1 / testnumsmape="%.2f%%"%(f2*100)print("the MAE is :",mae)print("the RMSE is :",rmse)print("the MAPE is :",mape)print("the SMAPE is :",smape)#计算预测值与真实值误差与真实值之比的分布A=0B=0C=0D=0E=0for m in range(testnum):    y1 = np.abs(output[m]-output1[m])/np.abs(output[m])    if y1 <= 0.1:        A = A + 1    elif y1 > 0.1 and y1 <= 0.2:        B = B + 1    elif y1 > 0.2 and y1 <= 0.3:        C = C + 1    elif y1 > 0.3 and y1 <= 0.4:        D = D + 1    else:        E = E + 1print("Ratio <= 0.1 :",A)print("0.1< Ratio <= 0.2 :",B)print("0.2< Ratio <= 0.3 :",C)print("0.3< Ratio <= 0.4 :",D)print("Ratio > 0.4 :",E)

3、测试结果


注:由于每次初始化生成的参数不同,因此对参数设置相同的神经网络进行多次训练和预测,测试结果不会完全一致,此外测试结果的好坏也会受到隐含层节点数、学习率、训练次数等参数的影响。

4、参考源码及实验数据集

参考源码及实验数据集