手撕系列:原生python实现卷积神经网络
前言之前就一直想不调用框架,实现一个CNN和RNN,实现这两种网络的主要难度就在于反向传播,对与CNN来说反向传播也要涉及到卷积,对于RNN来说反向传播会涉及到沿时间序列进行传播,也就是BPTT。在此过程中遇到不少困难,踩了不少坑,所以写此博文总结一下。实现卷积神经网络我们这里要实现的卷积神经网络是Lenet-5模型,其模型结构图如下其网络结构用语言描述的话,就是:输入层->卷积...
前言
之前就一直想不调用框架,实现一个CNN和RNN,实现这两种网络的主要难度就在于反向传播,对与CNN来说反向传播也要涉及到卷积,对于RNN来说反向传播会涉及到沿时间序列进行传播,也就是BPTT。在此过程中遇到不少困难,踩了不少坑,所以写此博文总结一下。
实现卷积神经网络
我们这里要实现的卷积神经网络是Lenet-5
模型,其模型结构图如下
其网络结构用语言描述的话,就是:
输入层->卷积层->池化层->卷积层->池化层->全连接层->输出层
其中全连接层的激活函数使用relu
,输出层的激活函数使用softmax
咱们现在从第一层一步步实现到输出层,后面会有部分公式的讲解
各个层的主要函数实现
输入层
这一层没啥说的,就是对数据进行输入,我们这里是要读取MNIST数据集,因为读取的数据都是向量形式,所以在输入之前需要把向量给拉成图片的形式。在这里我们规定数据的输入形式为
[
b
a
t
c
h
,
w
i
d
t
h
,
h
e
i
g
h
t
,
c
h
a
n
n
e
l
]
[batch,width,height,channel]
[batch,width,height,channel]
b
a
t
c
h
batch
batch是指的输入图片的数量,
w
i
d
t
h
、
h
e
i
g
h
t
width、height
width、height分别指的图片高度、宽度,
c
h
a
n
n
e
l
channel
channel指的是图片的通道数
卷积层
这一层就是对输入层输入的数据进行卷积,在这里我们采用img2col
算法,我之前写过一篇关于快速卷积的文章,不清楚这个算法的可以去看看快速卷积算法,实在不行你用传统的卷积算法也是可以,不过速度相对来说慢一点。
def img2col_conv(X,filter,step):
'''
:param X: 输入 [1,28,28,3]
:param filter: 卷积核 [1,3,3,3]
:param step: 1
:param padding: 0
:return:
'''
f_b, f_h, f_w, f_c = filter.shape
filter_convert = np.zeros(shape=[f_w * f_h * f_c,f_b])
for b in range(0,f_b):
for c in range(0,f_c):
f_unit = filter[b,:,:,c].flatten()
star_p = c * len(f_unit)
end_p = star_p + len(f_unit)
filter_convert[star_p:end_p,b] = f_unit
cur = 0
height_out, width_out = int(np.ceil((X.shape[1] - filter.shape[1] + 1) / step)), int(
np.ceil((X.shape[2] - filter.shape[2] + 1) / step))
x_convert = np.zeros(shape=[width_out * height_out * X.shape[0], f_h * f_w * f_c])
for b in range(0,X.shape[0]):
for y in range(0,X.shape[1]-filter.shape[1]+1,step):
for x in range(0,X.shape[2]-filter.shape[2]+1,step):
for c in range(0,X.shape[3]):
tile = X[b,y:y + f_h, x:x + f_w, c]
star_p = c * f_h * f_w
end_p = star_p + f_h * f_w
x_convert[cur,star_p:end_p] = tile.flatten()
cur = cur + 1
state = np.dot(x_convert,filter_convert)
res = np.zeros(shape=[X.shape[0],height_out,width_out,f_b])
for b in range(0,res.shape[0]):
star_p = b * width_out * height_out
end_p =star_p + width_out * height_out
for c in range(0,f_b):
tile = state[star_p:end_p,c].reshape(height_out,width_out)
res[b,:,:,c] = tile
return x_convert,filter_convert,state,res
看了上面的代码,你可能会疑惑为啥我会返回这么多值,这个你不用管,在本文中只会使用返回值的最后一个res
另外需要注意的是,一副图片进行卷积之后,输出的大小是怎么样的?
输 出 高 度 = 向 上 取 整 ( ( 输 入 高 度 − 卷 积 核 高 度 + 1 + 补 零 数 ) 步 长 ) 输出高度 =向上取整( \frac{(输入高度-卷积核高度+1+补零数)}{步长}) 输出高度=向上取整(步长(输入高度−卷积核高度+1+补零数))
输 出 宽 度 = 向 上 取 整 ( ( 输 入 宽 度 − 卷 积 核 宽 度 + 1 + 补 零 数 ) 步 长 ) 输出宽度 = 向上取整(\frac{(输入宽度-卷积核宽度+1+补零数)}{步长}) 输出宽度=向上取整(步长(输入宽度−卷积核宽度+1+补零数))
池化层
如果我们直接采用滑块进行一个个滑动,然后求解最大值的话这是非常麻烦在这里分享一个类似于img2col
算法的进行池化的一种方法,其做法大致如下。
假设现在输入到池化层的图片如下
池化大小为
2
×
2
2\times2
2×2,步长为
2
2
2,那么我们就这张图片进行类似于img2col的处理
然后把它们堆叠在一起,就变成如下形式
然后我们就可以利用numpy的广播机制,直接对每一行求最大值
然后再对其进行reshape,既可得到池化结果
但是实际过程中,我们不会 直接求最大值,而是求最大值的那个下标,之所以这么做是因为反向传播的过程中需要使用到其原始坐标,后面会讲到
def img2col_maxpool(X,pool_size,step):
height_out,width_out = int(np.ceil((X.shape[1] - pool_size[0] + 1) / step)), int(
np.ceil((X.shape[2] - pool_size[1] + 1) / step))
pool_convert = np.zeros(shape=[height_out * width_out * X.shape[0],pool_size[0] * pool_size[1],X.shape[3]])
pool_height,pool_width = pool_size
cur = 0
for b in range(0,X.shape[0]):
for y in range(0,X.shape[1]-pool_height+1,step):
for x in range(0,X.shape[2]-pool_width+1,step):
tile = X[b,y:y + pool_height , x:x + pool_width]
for c in range(0,X.shape[3]):
pool_convert[cur,:,c] = tile[:,:,c].flatten()
cur = cur + 1
index = np.argmax(pool_convert,axis=1)
p_c = np.zeros_like(index,dtype=float)
for y in range(0,p_c.shape[0]):
for c in range(0,p_c.shape[1]):
p_c[y,c] = pool_convert[y,index[y,c],c]
res = np.zeros(shape=[X.shape[0],height_out,width_out,X.shape[3]])
for b in range(0,res.shape[0]):
start_p =b * (width_out * height_out)
end_p = start_p + (width_out * height_out)
for c in range(0,res.shape[3]):
tile = p_c[start_p:end_p,c].reshape(height_out,width_out)
res[b,:,:,c] = tile
return pool_convert,p_c,index,res
这里需要注意的就是池化之后的图片大小,其计算公式于卷积之后的图片大小的计算公式是一样的。
全连接层
因为在经过最后一次池化之后,我们的数据还是高维张量的,所以需要把张量的拉直,其代码如下
def flatten(x_pool2):
x_flatten = np.zeros(shape=[x_pool2.shape[0],x_pool2.shape[1] * x_pool2.shape[2] * x_pool2.shape[3]])
for i in range(0,x_flatten.shape[0]):
for c in range(0,x_pool2.shape[3]):
start_p = c * (x_pool2.shape[1] * x_pool2.shape[2])
end_p =start_p + (x_pool2.shape[1] * x_pool2.shape[2])
x_flatten[i,start_p:end_p] = x_pool2[i,:,:,c].flatten()
return x_flatten
然后对数据进行矩阵乘法即可,在全连接层我们使用的激活函数是relu
,这个函数实现很简单,如下:
def relu(t):
res = np.copy(t)
res[t < 0] = 0
return res
输出层
这一层完全跟DNN一样,激活函数使用的是softmax,其实现如下
def softmax(X):
for i in range(0, len(X)):
X[i,:] = X[i,:] - np.max(X[i,:])
X[i,:] = np.exp(X[i, :]) / (np.sum(np.exp(X[i, :])))
return X
因为在softmax中需要计算
e
n
e^n
en,如果
n
n
n值太大,会导致数值上溢,所以我们需要利用softmax函数的一个性质,如下
s
o
f
t
m
a
x
(
z
)
=
s
o
f
t
m
a
x
(
z
−
a
)
[
其
中
a
是
一
个
常
数
]
.
softmax(z) = softmax(z-a) [其中a是一个常数] .
softmax(z)=softmax(z−a)[其中a是一个常数].
我们可以直接对
x
x
x减去其中的一个最大值,不仅可以保持输出结果不变,还可以让指数计算的结果不至于溢出。这个性质也很容易推出,动动笔很快就可以写出的。
损失函数
损失函数使用的交叉熵损失,其实现也非常的简单,一行代码即可搞定
def entrop_loss(y_p,y_label):
return np.mean(np.sum(-y_label * np.log(y_p+1e-5),axis=1))
前向传播过程
完成了各个层主要函数的编写,前向传播的过程编写起来就特别方便了,直接上代码,如下。
def forward(X,Paramters):
filter1,filter2,w3,w4 = Paramters
# 第一层:卷积层
x_convet1,filter_convert1,state1,x_conv1=img2col_conv(X,filter1,1)
a_1 = relu(x_conv1)
cash1 = {'z_p':X,'a_p':X,'z':x_conv1,'a':a_1,'w':filter1.copy()}
# 第二次:池化层
cv_p1,p_c1,index1,x_pool1 = img2col_maxpool(cash1['a'],(2,2),2)
cash2 = {'z_p':cash1['z'],'a_p':cash1['a'],'z':x_pool1,'a':x_pool1,'w':(2,2),'os':x_pool1.shape,'index':index1}
# 第三层:卷积层
x_convet2, filter_convert2, state2, x_conv2 = img2col_conv(x_pool1,filter2,step=1)
a_2 = relu(x_conv2)
cash3 = {'c_z_p':state2,'c_a_p':x_convet2,'c_w':filter_convert2,'z_p':cash2['z'],'a_p':cash2['a'],'z':x_conv2,'a':a_2,'w':filter2.copy()}
# 第四层:池化层
cv_p2,p_c2,index2,x_pool2 = img2col_maxpool(x_conv2,(2,2),2)
cash4 = {'z_p':cash3['z'],'a_p':cash3['a'],'z':x_pool2,'a':x_pool2,'w':(2,2),'os':x_pool2.shape,'index':index2}
# 第五层: 隐藏层
x_flatten = flatten(x_pool2)
f3 = np.dot(x_flatten,w3)
a_3 = relu(f3)
cash5 = {'z_p':x_flatten,'a_p':x_flatten,'z':f3,'a':a_3,'w':w3.copy()}
# 输出层
f4 = np.dot(f3,w4)
y_p = softmax(f4)
cash6 = {'z_p':cash5['z'],'a_p':cash5['a'],'z':f4,'a':y_p,'w':w4.copy()}
return [cash1,cash2,cash3,cash4,cash5,cash6],y_p
只要完成了前述函数的编写,前向传播的过程是非常好写的,难度主要在后向传播里。
在这里写一下整个数据流动过程中,其大小变化
假设输入300张图片,图片的大小为 28 × 28 28\times28 28×28,通道为 1 1 1,第一个卷积层的卷积核为 5 × 3 × 3 × 1 5\times3\times3\times1 5×3×3×1,第二个卷积核的卷积层为 4 × 3 × 3 × 5 4\times3\times3\times5 4×3×3×5,卷积步长为1,池化大小为 2 × 2 2\times2 2×2,池化步长为 2 2 2,padding均为1;隐藏层输出50个值,输出层输出10个结果
[300,28,28,1] -------输入----->卷积层-------输出----->[300,26,26,5]
[300,26,26,5] -------输入----->池化层-------输出----->[300,13,13,5]
[300,13,13,5] -------输入----->卷积层-------输出----->[300,11,11,4]
[300,11,11,4] -------输入----->池化层-------输出----->[300,5,5,4]
[300,5,5,4] ----------输入----->Flatten-------输出----->[300,100]
[300,5,5,4] ----------输入----->隐藏层-------输出----->[300,50]
[300,50] --------------输入----->输出层-------输出----->[300,10]
反向传播过程
如果想要看以下部分,至少需要掌握前馈神经网络中的反向传播算法
求最后一层的损失 δ L \delta^L δL
根据定义可知
δ
L
=
∂
L
∂
z
L
\delta^L = \frac{\partial L}{\partial z^L}
δL=∂zL∂L
因为最后一层使用的输出函数是softmax,所以这里求出最后一层的损失非常非常的简单,即
δ
L
=
y
p
r
e
d
i
c
t
−
y
t
r
u
e
\delta^L = y_{predict} - y_{true}
δL=ypredict−ytrue
全连接层反向传播过程
在这一部分中,其反向传播过程与普通的前馈神经网络是完全一样的,即根据本层的
δ
\delta
δ,求出本层参数的梯度和下一层的损失,计算公式如下
def full_backprop(delta,cash):
dw = np.dot(cash['a_p'].T,delta)
db = np.sum(delta,axis=0)
delta_pre = np.dot(delta,cash['w'].T) * drelu(cash['z_p'])
grad_dict = {'dw':dw,'db':db,'delta_pre':delta_pre}
return grad_dict
下面根据之前的前向传播过程,来算一下 δ \delta δ在整个反向传播过程中,其形状的变化
[300,28,28,1] -------输入----->卷积层-------输出----->[300,26,26,5]
[300,26,26,5] -------输入----->池化层-------输出----->[300,13,13,5]
[300,13,13,5] -------输入----->卷积层-------输出----->[300,11,11,4]
[300,11,11,4] -------输入----->池化层-------输出----->[300,5,5,4]
[300,5,5,4] ----------输入----->Flatten-------输出----->[300,100]
[300,5,5,4] ----------输入----->隐藏层-------输出----->[300,50]
[300,50] --------------输入----->输出层-------输出----->[300,10]
因为最后一层的损失值计算如下
δ
L
=
y
p
−
y
t
\delta^L = y_p - y_t
δL=yp−yt
所以最后一层损失的形状大小如下
δ
o
u
t
p
u
t
_
l
a
y
e
r
=
[
300
,
10
]
\delta^{output \_ layer}= [300,10]
δoutput_layer=[300,10]
所以在全连接部分,其
δ
\delta
δ变化如下
[300,100]《----[300,50](全连接层)《-----[300,10](输出层)
池化层反向传播过程
从前面可知,全连接层向池化层传递进来的 δ \delta δ是一个[300,100]的矩阵,所以我们需要这个矩阵给变成之前我们池化层输出的形状
[300,28,28,1] -------输入----->卷积层-------输出----->[300,26,26,5]
[300,26,26,5] -------输入----->池化层-------输出----->[300,13,13,5]
[300,13,13,5] -------输入----->卷积层-------输出----->[300,11,11,4]
[300,11,11,4] -------输入----->池化层-------输出----->[300,5,5,4]
[300,5,5,4] ----------输入----->Flatten-------输出----->[300,100]
[300,5,5,4] ----------输入----->隐藏层-------输出----->[300,50]
[300,50] --------------输入----->输出层-------输出----->[300,10]
观察前向传播过程,即把[300,100]这个矩阵reshape成[300,5,5,4]。
经过上述处理,我们就得到了本层即池化层的损失 δ \delta δ,因为池化层是没有参数的,所以我们不关系如何计算梯度,我们只关心,如何将这个损失给传递到上一层去。
传递方式其实就是进行上采样
,这个过程其实很简单
假如前向传播的时候,池化过程是下面样子的
那么反向传播的时候,我们会得到[2,2]的
δ
\delta
δ
那么上采样就是指的是,如下过程
了解了上述过程,就可以进行编码了
def pool_backprop(delta_pool,cash,flattened = True):
if flattened:
delta_pool = conv_flatten(delta_pool,cash['os'])
return upsample(delta_pool,cash['w'],cash['z_p'].shape,cash['index'])
先是判断,是不是需要将输入的 δ \delta δ变成原来的样子,因为只有在全连接层向池化层传递误差的时候才需要进行reshape。然后再进行上采样即可。
上采样的实现代码如下
def upsample(delta,poos_size,target_shape,index):
res = np.zeros(shape=target_shape,dtype=float)
cur = 0
for b in range(0,target_shape[0]):
for y in range(0,target_shape[1] - poos_size[0] + 1,poos_size[0]):
for x in range(0,target_shape[2] - poos_size[0] + 1,poos_size[1]):
for c in range(target_shape[3]):
i = index[cur,c]
x_epoch = i % poos_size[1]
y_epoch = int(i / poos_size[0])
res[b,y+y_epoch,x+x_epoch,c] = delta[b,int(y/poos_size[0]),int(x/poos_size[0]),c]
cur = cur + 1
return res
卷积层的反向传播过程
在本层,最重要的一步就是要根据当前层的 δ \delta δ求出权重的梯度了,网上很多教程只讲述了单通道的做法,并没有细说多通道的情况,这里我们说一下多通道的做法,如果你懂单通道的做法,那么可以直接看下去,如果不懂得单通道的做法,可以看一下此文卷积神经网络(CNN)反向传播算法。
[300,11,11,4](卷积层)<----[300,5,5,4]<---reshape--[300,100](池化层)《----[300,50](全连接层)《-----[300,10](输出层)
我们从上一层池化层得到了本层的 δ \delta δ,其形状为 [ 300 , 11 , 11 , 4 ] [300,11,11,4] [300,11,11,4]
从上述前向传播过程
[300,28,28,1] -------输入----->卷积层-------输出----->[300,26,26,5]
[300,26,26,5] -------输入----->池化层-------输出----->[300,13,13,5]
[300,13,13,5] -------输入----->卷积层-------输出----->[300,11,11,4]
[300,11,11,4] -------输入----->池化层-------输出----->[300,5,5,4]
[300,5,5,4] ----------输入----->Flatten-------输出----->[300,100]
[300,5,5,4] ----------输入----->隐藏层-------输出----->[300,50]
[300,50] --------------输入----->输出层-------输出----->[300,10]
我们可以知道,上层的输出形状[300,13,13,5],该层的卷积核的形状为[4,3,3,5]。
针对对多个通道的情况,其步骤如下:
- 将上层的输出从[300,13,13,5]变为[5,13,13,300]
- 将本层的损失[300,11,11,4]变为[4,11,11,300]
- 将改变形状后的上层输出[5,13,13,300]拆分成5个[1,13,13,300],记数组为A[i]
- 将改变形状后的本层损失[4,11,11,300]拆分成4个[1,11,11,300],记数组为d[i]
5.将A[0]分别于d中所有元素做卷积,得到4个[1,3,3,1]卷积结果,这4个实际分别是4个卷积核的第一个通道的梯度。
6.循环第五步,知道将A数组全部遍历完,既可得到4个卷积核所有通道的梯度。
计算完本层的梯度,就要通过本层的 δ \delta δ来求出上一层的 δ \delta δ了。做法如下。
- 对本层的 δ \delta δ的四周进行填充,其填充大小为卷积核的大小减去1。即[300,11,11,4]填充为[300,15,15,4]
- 将本层的卷积核进行一百八十度的旋转;其形状大小任然为[4,3,3,5]
- 交换卷积核的维度,将通道数变成核数,将核数变成通道数,改变之后,形状为:[5,3,3,4]。
- 用填充后的 δ \delta δ和处理之后的卷积核进行卷积运算;[300,15,15,4] * [5,3,3,4],即可得到上一层的 δ \delta δ,大小为:[300,13,13,5]
注意,上述涉及到的卷积运算,步长均为1
实现代码如下
# 计算卷积层的反向传播
def conv_backprop(delta,cash):
delta_c = np.copy(delta)
delta =swap_first_end_axis(delta)
a_p = swap_first_end_axis(cash['a_p'])
jacoby = np.zeros_like(cash['w'])
for i in range(0,delta.shape[0]):
for c in range(0,a_p.shape[0]):
a_p_temp = a_p[np.newaxis,c,:,:,:]
delta_temp = delta[np.newaxis,i,:,:]
_,_,_,dw = img2col_conv(a_p_temp,delta_temp,step=1)
jacoby[i,:,:,c] = dw[0,:,:,0]
w = cash['w']
padding_h = w.shape[1] - 1
padding_w = w.shape[2] - 1
delta_padding = np.zeros(shape=[delta_c.shape[0],padding_h + delta_c.shape[1] + padding_h,padding_w + delta_c.shape[2] + padding_w,delta_c.shape[3]])
# 下面要计算前向传播的delta。
delta_padding[:,padding_h:-padding_h,padding_w:-padding_w] = delta_c
w = np.flip(w,axis=1)
w = np.flip(w,axis=2)
w = swap_first_end_axis(w)
_, _, _, delta_pre = img2col_conv(delta_padding,w,step=1)
gradient_dict = {'dw':jacoby,'delta_pre':delta_pre}
return gradient_dict
至此,我们就完成了整个卷积神经网络中的反向传播过程
下面来训练测试一下,结果如下:
只训练了76轮,在验证集上准确率就可以达到80%,因为训练速度较慢,没有继续训练下去了。
各种踩坑
- 参数初始化问题。一开始参数初始化的值比较大,所以导致各种数值爆炸,无法收敛。需要将初始化参数设置的尽量少一些
- 梯度爆炸的问题。需要进行梯度截断
- 训练无法收敛问题。 大可能性是因为学习率的设置的问题,这里推荐使用Adagrad或Rmsp来进行训练,下面附上两种训练方式的代码,仅供参考
def __sgd__adagrad(self, lr):
self.t = 0
self.accumulation = 0.001
self.eta = lr
self.accumulation_bias = 0.001
def update(jacoby, bais_jacoby):
# 对学习率的更新
eta_t = self.eta / np.sqrt(self.t + 1)
# 对权重的更新如下:
self.accumulation = self.accumulation + np.square(jacoby)
sigma = np.sqrt(self.accumulation / (self.t + 1))
self.ow = self.w
self.w = self.w - (eta_t / sigma) * jacoby
# 对偏置的更新如下:
self.accumulation_bias = self.accumulation_bias + np.square(bais_jacoby)
sigma_b = np.sqrt(self.accumulation_bias / (self.t + 1))
self.bias = self.bias - (eta_t / sigma_b) * bais_jacoby
# 对时间进行更新
self.t = self.t + 1
return update
def __sgd_rmsprop__(self, lr, alpha):
self.t = 0
self.accumulation = None
self.eta = lr
self.accumulation_bias = None
self.alpha = alpha
def update(jacoby, bais_jacoby):
# 对学习率的更新
eta_t = self.eta / np.sqrt(self.t + 1)
# 对权重的更新如下:
if self.accumulation is None:
self.accumulation = jacoby
sigma = self.accumulation
else:
self.accumulation = np.sqrt(
np.square(self.accumulation) * self.alpha + (1 - self.alpha) * np.square(jacoby))
sigma = np.sqrt(self.accumulation )
self.ow = self.w
self.w = self.w - (eta_t / (sigma+1e-6)) * jacoby
# 对偏置的更新如下:
if self.accumulation_bias is None:
self.accumulation_bias = bais_jacoby
sigma_b = self.accumulation_bias
else:
self.accumulation_bias = np.sqrt(
np.square(self.accumulation_bias) * self.alpha + (1 - self.alpha) * np.square(bais_jacoby))
sigma_b = np.sqrt(self.accumulation_bias)
self.bias = self.bias - (eta_t / (sigma_b+1e-6)) * bais_jacoby
# 对时间进行更新
self.t = self.t + 1
return update
代码
分别给出tensorflow
实现和原生python实现,可以对比一下两者训练区别,体会下tensorflow
的强大
tensorflow实现
import tensorflow as tf
from src.卷积神经网络.dataload import loadMinist
# 定义输入img的宽度、高度、通道数
IMG_WIDTH = 28
IMG_HEIGHT = 28
IMG_CHANNEL = 1
# 定义训练参数
BITCH_SIZE = 1000
LEARNING_RATE = 1e-4
EPOCH = 10000
PRINT_EPOCH = 10
# 定义dropout的大小
DROPOUT_RATE = 0.5
# 定义第一层卷积层的参数
CONV1_SIZE = 5
CONV1_COUNT = 1
CONV1_STRADE = 1
# 定义第二层卷积层的参数
CONV2_SIZE = 5
CONV2_COUNT = 1
CONV2_STRADE = 1
# 定义第一个池化层的参数
POOL1_SIZE = [1,2,2,1]
POOL1_STRIDE = [1,2,2,1]
# 定义第二个池化层的参数
POOL2_SIZE = [1,2,2,1]
POOL2_STRIDE = [1,2,2,1]
# 定义第一个全连接层的神经元数
FC1_SIZE = 512
# 定义输出层层神经元数量
OUTPUT_SIZE = 10
# 定义输入、输出
x_input_ph = tf.placeholder(dtype=tf.float32, shape=[1000, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL])
y_input_ph = tf.placeholder(dtype=tf.float32, shape=[1000, 10])
def accuracy( y_pred, y_target):
equals = tf.equal(tf.argmax(y_pred, axis=1), tf.argmax(y_target, axis=1))
accuracy = tf.reduce_mean(tf.cast(equals, tf.float32))
return accuracy
def inference(input_tensor, train,regularizer=None, SoftMax=False,reuse=False):
# 第一层卷积层
with tf.variable_scope('layer1_conv1',reuse=reuse):
weight1 = tf.get_variable(name='weight', shape=[CONV1_SIZE, CONV1_SIZE, IMG_CHANNEL, CONV1_COUNT],
initializer=tf.truncated_normal_initializer(stddev=0.1))
bias1 = tf.get_variable(name='bias', shape=[CONV1_COUNT],
initializer=tf.constant_initializer(0.0))
conv1_res = tf.nn.conv2d(input_tensor, weight1, padding='SAME',strides=[1, CONV1_STRADE, CONV1_STRADE, 1])
layer1_res = tf.nn.relu(tf.nn.bias_add(conv1_res, bias1))
# 第二层池化层
with tf.variable_scope('layer2_pool1',reuse=reuse):
pool1_res = tf.nn.max_pool(layer1_res,ksize=POOL1_SIZE,strides=POOL1_STRIDE,padding='SAME')
# 第三层卷积层
with tf.variable_scope('layer3_conv2',reuse=reuse):
weight2 = tf.get_variable(name='weight', shape=[CONV2_SIZE, CONV2_SIZE, CONV1_COUNT, CONV2_COUNT],
initializer=tf.truncated_normal_initializer(stddev=0.1))
bias2 = tf.get_variable(name='bias', shape=[CONV2_COUNT],
initializer=tf.constant_initializer(0.0))
conv2_res = tf.nn.conv2d(pool1_res, weight2, padding='SAME',strides=[1, CONV2_STRADE, CONV2_STRADE, 1])
layer2_res = tf.nn.relu(tf.nn.bias_add(conv2_res, bias2))
# 第四层池化层
with tf.variable_scope('layer4_pool2',reuse=reuse):
pool2_res = tf.nn.max_pool(layer2_res,ksize=POOL2_SIZE,strides=POOL2_STRIDE,padding='SAME')
# 将输入拉直
pool2_output_shape = pool2_res.get_shape().as_list()
data_length = pool2_output_shape[1] * pool2_output_shape[2] * pool2_output_shape[3]
x_flatten = tf.reshape(pool2_res,[-1,data_length])
# 第五层 全连接层
with tf.variable_scope('layer5_fullconnected1',reuse=reuse):
weight3 = tf.get_variable(name='weight',shape=[data_length,FC1_SIZE],initializer=tf.truncated_normal_initializer(stddev=0.1))
bias3 = tf.get_variable(name='bias',shape=[FC1_SIZE,],initializer=tf.constant_initializer(0.0))
if regularizer is not None:
tf.add_to_collection('loss',regularizer(weight3))
fc1_res = tf.nn.relu(tf.matmul(x_flatten,weight3)+bias3)
if train:
fc1_res = tf.nn.dropout(fc1_res,keep_prob=DROPOUT_RATE)
# 第六层输出层
with tf.variable_scope('layer6_fullconnected2',reuse=reuse):
weight4 = tf.get_variable(name='weight',shape=[FC1_SIZE,OUTPUT_SIZE],initializer=tf.truncated_normal_initializer(stddev=0.1))
bias4 = tf.get_variable(name='bias',shape=[OUTPUT_SIZE],initializer=tf.constant_initializer(0.0))
if regularizer is not None:
tf.add_to_collection('loss',regularizer(weight4))
nosoftmax_res = tf.matmul(fc1_res,weight4)+bias4
if SoftMax == True:
return tf.nn.softmax(nosoftmax_res)
else:
return nosoftmax_res
if __name__ == '__main__':
train, test = loadMinist()
x_train, y_train = train
x_test, y_test = test
x_train = x_train.reshape(-1,28,28,1)
x_test = x_test.reshape(-1,28,28,1)
l2_loss = tf.contrib.layers.l2_regularizer(0.05)
logits = inference(x_input_ph,True,None,False,reuse=False)
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_input_ph,logits=logits))
tf.add_to_collection('loss',loss)
losses = tf.add_n(tf.get_collection('loss'))
opt = tf.train.AdamOptimizer(learning_rate=LEARNING_RATE)
train_op = opt.minimize(losses)
ac = accuracy(inference(x_input_ph,False,None,True,reuse=True),y_input_ph)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(0,EPOCH):
start = (i * BITCH_SIZE) % len(x_train)
end = min(start+BITCH_SIZE, len(x_train))
feed_data = {x_input_ph: x_train[start:end], y_input_ph: y_train[start:end]}
if i % PRINT_EPOCH == 0:
loss_value = sess.run(losses,feed_dict=feed_data)
acc = sess.run(ac,feed_dict=feed_data)
print("after %i steps ,the loss is %f and accuracy is %.2f"%(i,loss_value,acc))
sess.run(train_op,feed_dict=feed_data)
原生python实现
def relu(t):
res = np.copy(t)
res[t < 0] = 0
return res
def drelu(t):
res = np.copy(t)
res[t > 0] = 1
res[t <= 0] = 0
return res
def softmax(X):
for i in range(0, len(X)):
X[i,:] = X[i,:] - np.max(X[i,:])
X[i,:] = np.exp(X[i, :]) / (np.sum(np.exp(X[i, :])))
return X
def gradient_clip(dw,min,max):
res = np.copy(dw)
res[dw<min] = min
res[dw>max] = max
return res
# 该卷积网络层次结构
def img2col_conv(X,filter,step):
'''
:param X: 输入 [1,28,28,3]
:param filter: 卷积核 [1,3,3,3]
:param step: 1
:param padding: 0
:return:
'''
f_b, f_h, f_w, f_c = filter.shape
filter_convert = np.zeros(shape=[f_w * f_h * f_c,f_b])
for b in range(0,f_b):
for c in range(0,f_c):
f_unit = filter[b,:,:,c].flatten()
star_p = c * len(f_unit)
end_p = star_p + len(f_unit)
filter_convert[star_p:end_p,b] = f_unit
cur = 0
height_out, width_out = int(np.ceil((X.shape[1] - filter.shape[1] + 1) / step)), int(
np.ceil((X.shape[2] - filter.shape[2] + 1) / step))
x_convert = np.zeros(shape=[width_out * height_out * X.shape[0], f_h * f_w * f_c])
for b in range(0,X.shape[0]):
for y in range(0,X.shape[1]-filter.shape[1]+1,step):
for x in range(0,X.shape[2]-filter.shape[2]+1,step):
for c in range(0,X.shape[3]):
tile = X[b,y:y + f_h, x:x + f_w, c]
star_p = c * f_h * f_w
end_p = star_p + f_h * f_w
x_convert[cur,star_p:end_p] = tile.flatten()
cur = cur + 1
state = np.dot(x_convert,filter_convert)
res = np.zeros(shape=[X.shape[0],height_out,width_out,f_b])
for b in range(0,res.shape[0]):
star_p = b * width_out * height_out
end_p =star_p + width_out * height_out
for c in range(0,f_b):
tile = state[star_p:end_p,c].reshape(height_out,width_out)
res[b,:,:,c] = tile
return x_convert,filter_convert,state,res
def img2col_maxpool(X,pool_size,step):
height_out,width_out = int(np.ceil((X.shape[1] - pool_size[0] + 1) / step)), int(
np.ceil((X.shape[2] - pool_size[1] + 1) / step))
pool_convert = np.zeros(shape=[height_out * width_out * X.shape[0],pool_size[0] * pool_size[1],X.shape[3]])
pool_height,pool_width = pool_size
cur = 0
for b in range(0,X.shape[0]):
for y in range(0,X.shape[1]-pool_height+1,step):
for x in range(0,X.shape[2]-pool_width+1,step):
tile = X[b,y:y + pool_height , x:x + pool_width]
for c in range(0,X.shape[3]):
pool_convert[cur,:,c] = tile[:,:,c].flatten()
cur = cur + 1
index = np.argmax(pool_convert,axis=1)
p_c = np.zeros_like(index,dtype=float)
for y in range(0,p_c.shape[0]):
for c in range(0,p_c.shape[1]):
p_c[y,c] = pool_convert[y,index[y,c],c]
res = np.zeros(shape=[X.shape[0],height_out,width_out,X.shape[3]])
for b in range(0,res.shape[0]):
start_p =b * (width_out * height_out)
end_p = start_p + (width_out * height_out)
for c in range(0,res.shape[3]):
tile = p_c[start_p:end_p,c].reshape(height_out,width_out)
res[b,:,:,c] = tile
return pool_convert,p_c,index,res
def conv_flatten(x_flatten,os):
res = np.zeros(shape = os)
for i in range(0,len(x_flatten)):
for c in range(0,os[3]):
start_p = c * os[1] * os[2]
end_p = start_p + os[1] * os[1]
res[i,:,:,c] = x_flatten[i,start_p:end_p].reshape(os[1],os[2])
return res
def flatten(x_pool2):
x_flatten = np.zeros(shape=[x_pool2.shape[0],x_pool2.shape[1] * x_pool2.shape[2] * x_pool2.shape[3]])
for i in range(0,x_flatten.shape[0]):
for c in range(0,x_pool2.shape[3]):
start_p = c * (x_pool2.shape[1] * x_pool2.shape[2])
end_p =start_p + (x_pool2.shape[1] * x_pool2.shape[2])
x_flatten[i,start_p:end_p] = x_pool2[i,:,:,c].flatten()
return x_flatten
def entrop_loss(y_p,y_label):
return np.mean(np.sum(-y_label * np.log(y_p+1e-5),axis=1))
def forward(X,Paramters):
filter1,filter2,w3,w4 = Paramters
# 第一层:卷积层
x_convet1,filter_convert1,state1,x_conv1=img2col_conv(X,filter1,1)
a_1 = relu(x_conv1)
cash1 = {'z_p':X,'a_p':X,'z':x_conv1,'a':a_1,'w':filter1.copy()}
# 第二次:池化层
cv_p1,p_c1,index1,x_pool1 = img2col_maxpool(cash1['a'],(2,2),2)
cash2 = {'z_p':cash1['z'],'a_p':cash1['a'],'z':x_pool1,'a':x_pool1,'w':(2,2),'os':x_pool1.shape,'index':index1}
# 第三层:卷积层
x_convet2, filter_convert2, state2, x_conv2 = img2col_conv(x_pool1,filter2,step=1)
a_2 = relu(x_conv2)
cash3 = {'c_z_p':state2,'c_a_p':x_convet2,'c_w':filter_convert2,'z_p':cash2['z'],'a_p':cash2['a'],'z':x_conv2,'a':a_2,'w':filter2.copy()}
# 第四层:池化层
cv_p2,p_c2,index2,x_pool2 = img2col_maxpool(x_conv2,(2,2),2)
cash4 = {'z_p':cash3['z'],'a_p':cash3['a'],'z':x_pool2,'a':x_pool2,'w':(2,2),'os':x_pool2.shape,'index':index2}
# 第五层: 隐藏层
x_flatten = flatten(x_pool2)
f3 = np.dot(x_flatten,w3)
a_3 = relu(f3)
cash5 = {'z_p':x_flatten,'a_p':x_flatten,'z':f3,'a':a_3,'w':w3.copy()}
# 输出层
f4 = np.dot(f3,w4)
y_p = softmax(f4)
cash6 = {'z_p':cash5['z'],'a_p':cash5['a'],'z':f4,'a':y_p,'w':w4.copy()}
return [cash1,cash2,cash3,cash4,cash5,cash6],y_p
# 全连接层的反向传播
def full_backprop(delta,cash):
dw = np.dot(cash['a_p'].T,delta)
db = np.sum(delta,axis=0)
delta_pre = np.dot(delta,cash['w'].T) * drelu(cash['z_p'])
grad_dict = {'dw':dw,'db':db,'delta_pre':delta_pre}
return grad_dict
#计算池化层的反向传播:
def upsample(delta,poos_size,target_shape,index):
res = np.zeros(shape=target_shape,dtype=float)
cur = 0
for b in range(0,target_shape[0]):
for y in range(0,target_shape[1] - poos_size[0] + 1,poos_size[0]):
for x in range(0,target_shape[2] - poos_size[0] + 1,poos_size[1]):
for c in range(target_shape[3]):
i = index[cur,c]
x_epoch = i % poos_size[1]
y_epoch = int(i / poos_size[0])
res[b,y+y_epoch,x+x_epoch,c] = delta[b,int(y/poos_size[0]),int(x/poos_size[0]),c]
cur = cur + 1
return res
def pool_backprop(delta_pool,cash,flattened = True):
if flattened:
delta_pool = conv_flatten(delta_pool,cash['os'])
return upsample(delta_pool,cash['w'],cash['z_p'].shape,cash['index'])
def swap_first_end_axis(mat):
delta = np.copy(mat)
delta = np.rollaxis(delta,3,0)
delta = np.rollaxis(delta, 2, 1)
delta = np.rollaxis(delta, 3, 2)
return delta
# 计算卷积层的反向传播
def conv_backprop(delta,cash):
delta_c = np.copy(delta)
delta =swap_first_end_axis(delta)
a_p = swap_first_end_axis(cash['a_p'])
jacoby = np.zeros_like(cash['w'])
for i in range(0,delta.shape[0]):
for c in range(0,a_p.shape[0]):
a_p_temp = a_p[np.newaxis,c,:,:,:]
delta_temp = delta[np.newaxis,i,:,:]
_,_,_,dw = img2col_conv(a_p_temp,delta_temp,step=1)
jacoby[i,:,:,c] = dw[0,:,:,0]
w = cash['w']
padding_h = w.shape[1] - 1
padding_w = w.shape[2] - 1
delta_padding = np.zeros(shape=[delta_c.shape[0],padding_h + delta_c.shape[1] + padding_h,padding_w + delta_c.shape[2] + padding_w,delta_c.shape[3]])
# 下面要计算前向传播的delta。
delta_padding[:,padding_h:-padding_h,padding_w:-padding_w] = delta_c
w = np.flip(w,axis=1)
w = np.flip(w,axis=2)
w = swap_first_end_axis(w)
_, _, _, delta_pre = img2col_conv(delta_padding,w,step=1)
gradient_dict = {'dw':jacoby,'delta_pre':delta_pre}
return gradient_dict
def conv_backprop2(delta,cash,converted = True):
delta_c = np.zeros(shape=[delta.shape[0] * delta.shape[1] * delta.shape[2], delta.shape[3]])
for i in range(0,delta.shape[0]):
cursor_start = i * delta.shape[1] * delta.shape[2]
cursor_end = cursor_start + delta.shape[1] * delta.shape[2]
for c in range(0,delta.shape[3]):
unit = delta[i,:,:,c].flatten()
delta_c[cursor_start:cursor_end,c]=unit
dw = np.dot(cash['c_a_p'].T,delta_c)
jacoby = np.zeros_like(cash['w'])
for i in range(0,dw.shape[1]):
for c in range(0,jacoby.shape[3]):
star_p = c * 9
end_p = star_p + 9
jacoby[i,:,:,c]= dw[star_p:end_p,i].reshape([jacoby.shape[1],jacoby.shape[2]])
return {'dw':jacoby}
def tensorHandle(X,shape):
res=None
for img in X:
if res is None:
res=np.array([img.reshape([*shape])])
else:
res=np.concatenate([res,np.array([img.reshape([*shape])])])
return res
def accuracy(y_predict,y_t):
return np.mean(np.argmax(y_predict,axis=1)==np.argmax(y_t,axis=1))
if __name__ == '__main__':
filter1 = np.random.normal(size=[5, 3, 3, 1], loc=0,scale=0.1)
filter2 = np.random.normal(size=[4, 3, 3, 5], loc=0,scale=0.1)
w3 = np.random.normal(size=[100, 50], loc=0,scale=0.1)
w4 = np.random.normal(size=[50, 10], loc=0,scale=0.1)
paramters = [filter1,filter2,w3,w4]
train,test=loadMinist()
x_train,y_train=train
x_test,y_test=test
X = x_train
Y = y_train
for i in range(0,5000):
cash,y_p = forward(X=X, Paramters=paramters)
loss = entrop_loss(y_p, Y)
if i % 5 == 1:
_,y_pre = forward(x_test / 255,paramters)
print("epoch %i , loss:%f accuracy :%f"%(i,loss,accuracy(y_pre,y_test)))
delta = y_p - Y
gradient_dict = full_backprop(delta,cash[-1])
paramters[3] -= gradient_clip(gradient_dict['dw'] * 0.01,-10,10)
delta = gradient_dict['delta_pre']
gradient_dict = full_backprop(delta,cash[-2])
paramters[2] -= gradient_clip(gradient_dict['dw'] * 0.01, -10, 10)
delta = gradient_dict['delta_pre']
delta = pool_backprop(delta,cash[-3])
gradient_dict = conv_backprop(delta,cash[-4])
paramters[1] -= gradient_clip((gradient_dict['dw'] / X.shape[0]) * 0.01, -10, 10)
delta = gradient_dict['delta_pre']
delta = pool_backprop(delta,cash[-5],flattened=False)
gradient_dict = conv_backprop(delta,cash[-6])
paramters[0] -= gradient_clip((gradient_dict['dw'] / X.shape[0]) * 0.01, -10, 10)
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)