从零搭建深度学习框架(一)用NumPy实现GAN

我在2015年用C++开发过一个CNN工具(HanyNet),功能上只是能够满足当时做的一些项目,作为一个独立的工具包还是比较简陋。所以在TensorFlow发布之后我就完全抛弃了自己写的工具。不久之后PyTorch面世,我又转投PyTorch阵营,而且一直用到现在。如今过去将近5年了,还是没有忘记开发一个完整的深度学习工具的想法。所以在博客上开始《从零搭建深度学习框架》系列文章,一边用业余时间开发,一边记录开发过程当中的想法与成果。这个系列可能会持续2-3年的时间,运气好的话,到时候可以开发出一个可以用的深度学习框架。当然也是在深度学习方法的主体框架和技术在未来3年不会有太大变化的前提下。

我会先总结一下5年前老版本的问题,然后用NumPy实现一个简单的GAN模型,最后结合目前的需求大致设想一下新版框架需要开发的关键组件。

老版HanyNet的问题

  1. 基础数据依赖OpenCV的Mat。目前大部分深度学习模型都认为中间数据保存在4维或5维张量上,所以主流的深度学习框架都会围绕Tensor提供一系列的操作,比如开辟一段内存用来保存Tensor数据,对Tensor的数值计算,在维度上和尺寸上的调整,在Tensor内部对其子集的查询和赋值等。
  2. 没有计算图。当时在写那个工具时,人工神经网络还是线型的,不需要考虑带分支的结构。虽然当时考虑过把浅层的输出和深层的输出合并起来计算最终的结果,但ResNet发表之后少了很多动力做这方面的开发。现在主流的深度学习框架都是用计算图来构建整个网络,图的节点代表数据或某种计算操作,节点之间的边代表数据传输的方向。这就需要实现一个有向无循环图(DAG)以及相应的广度/深度优先搜索算法。当然,计算节点的正向及反向运算也是需要实现的,这就包括了卷积、池化、全连通、激活函数、加减乘除等所有常见的计算函数。
  3. 计算效率低。其一在于不支持GPU加速计算,基于CUDA的GPU加速计算大致有两种方案,要么直接调用cuDNN提供的函数,要么所有的计算全部自己用CUDA C实现。其二在于CPU上的训练和预测也不是多线程的。

用NumPy实现GAN

下面,在不依赖任何深度学习框架的前提下,我用NumPy实现一个能产生正弦信号的GAN模型。这段代码是从我去年出版的书《Hands-On Generative Adversarial Networks with PyTorch 1.x》第一章里的代码改写而来。

真实信号和激活函数

1. 导入NumPy库,并定义几个全局变量。

import matplotlib.pyplot as plt
import numpy as np


Z_DIM = 1
G_HIDDEN = 10
X_DIM = 10
D_HIDDEN = 10

ITER_NUM = 50000

Z_DIM表示隐变量的维度,G_HIDDEN表示生成网络隐层的通道数,X_DIM表示生成数据的维度,D_HIDDEN表示判别网络隐层的通道数,ITER_NUM表示模型训练的迭代次数。这里,我们让生成网络把一个1维的随机变量映射为一个长度为10的向量,向量里每个元素的值按照正弦函数递增或递减。

2. 产生真实信号

def get_samples(signal_dim, random=True):
    if random:
        x0 = np.random.uniform(0, 0.5)
        freq = np.random.uniform(1.2, 1.3)
        mult = np.random.uniform(0.6, 0.8)
    else:
        x0 = 0
        freq = 0.2
        mult = 1
    signal = [mult * np.sin(x0+freq*i) for i in range(signal_dim)]
    return np.array(signal)

这里产生一组长度为signal_dim的正弦信号。正弦信号的起始点、频率和振幅允许出现扰动,以使得每次产生的信号都不同,相当于模拟真实数据的多样性。

3. 几种激活函数

def ReLU(x):
    return np.maximum(x, 0.)

def dReLU(x):
    return ReLU(x)

def LeakyReLU(x, k=0.2):
    return np.where(x >= 0, x, x * k)

def dLeakyReLU(x, k=0.2):
    return np.where(x >= 0, 1., k)

def Tanh(x):
    return np.tanh(x)

def dTanh(x):
    return 1. - Tanh(x)**2

def Sigmoid(x):
    return 1. / (1. + np.exp(-x))

def dSigmoid(x):
    return Sigmoid(x) * (1. - Sigmoid(x))

这里给出ReLU,Leaky ReLU,Tanh,Sigmoid几种激活函数以及相应的导函数,便于模型在前向和反向计算时调用。

定义全连通层

我们这里的生成网络和判别网络都用类似MLP的结构,完全由全连通层构成,也足以完成产生正弦信号这种简单的任务了。

1. 定义可训练的参数

class LinearLayer(object):
    def __init__(self, input_dim, output_dim, bias=True, act_func='relu'):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.bias = bias
        self.act_func = act_func
        assert self.act_func in ['relu', 'lrelu', 'tanh', 'sigmoid', 'none']
        self.x = None
        self.w = weight_init(self.input_dim, self.output_dim)
        if self.bias:
            self.b = weight_init(1, self.output_dim)
        self.y = None

一个全连通层只有两个参数:一个乘性的矩阵w(或直接称权重)和一个加性的向量b(或直接称偏置)。为了后面方便定义网络结构,我们把激活函数也写在全连通层当中,激活函数的梯度传播也在全连通层中处理。

由于在训练过程中,梯度需要从一个层传递到另一个层,入层和出层的梯度计算离不开该层在前向计算时的输出和输入,所以要用self.xself.y记录一下该层的原始输入和输出。

2. 前向计算

    def forward(self, x):
        self.x = x
        if self.bias:
            x = np.matmul(self.x, self.w) + self.b
        else:
            x = np.matmul(self.x, self.w)
        if self.act_func == 'relu':
            self.y = ReLU(x)
        elif self.act_func == 'lrelu':
            self.y = LeakyReLU(x)
        elif self.act_func == 'tanh':
            self.y = Tanh(x)
        elif self.act_func == 'sigmoid':
            self.y = Sigmoid(x)
        else:
            self.y = x
        return self.y

计算过程很简单,即把输入乘以矩阵w,加上向量b,然后套上一个激活函数。

3. 反向计算

    def backward(self, delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=True):
        if self.x is None or self.y is None:
            raise ValueError("You must run `forward()` before executing `backward()`")
        if self.act_func == 'relu':
            delta *= dReLU(self.y)
        elif self.act_func == 'lrelu':
            delta *= dLeakyReLU(self.y)
        elif self.act_func == 'tanh':
            delta *= dTanh(self.y)
        elif self.act_func == 'sigmoid':
            delta *= dSigmoid(self.y)
        else:
            pass
        d_w = np.matmul(np.transpose(self.x), delta)
        if apply_grads and self.bias:
            d_b = delta.copy()
            if np.linalg.norm(d_b) > gradient_clip:
                d_b = gradient_clip / np.linalg.norm(d_b) * d_b
            self.b -= lr * d_b
            self.b = np.maximum(-weight_clip, np.minimum(weight_clip, self.b))
        delta = np.matmul(delta, np.transpose(self.w))
        if apply_grads:
            if np.linalg.norm(d_w) > gradient_clip:
                d_w = gradient_clip / np.linalg.norm(d_w) * d_w
            self.w -= lr * d_w
            self.w = np.maximum(-weight_clip, np.minimum(weight_clip, self.w))
        return delta

反相计算是网络训练的关键步骤,稍微优点复杂。其本质上就是把函数y=g(f(x)),按照链式求导法则,计算输入的梯度(可以是误差函数求得的误差值,也可以是后一层传递过来的梯度)对函数g的偏导,乘以g对f的偏导,再乘以f对x的偏导,就得到了输入的delta对x的梯度值了。

代码中的过程就是,先计算激活函数的梯度,再计算两个待训练参数的梯度(d_wd_b),用这两个梯度(d_wd_b)按照SGD方法更新两组参数(wb),然后把backward()函数输入的原始梯度对该层原始输入的梯度求出来,并返回给前一层,这样前一层的backward()函数才能更新它自己的参数。

为了训练的稳定,我这里也加上了gradient clipping和weight clipping,避免训练过程中梯度或参数本身过激的抖动导致训练不稳定、难以收敛的问题。apply_grads的作用是(设置为False时)在不更新自身参数的前提下,把它读到的梯度传递回给前一层。这个功能对GAN的训练还是比较有用的。

构造生成网络

网络中最麻烦的地方已经写在全连通层当中了,所以网络本身的代码就清爽多了。

class Generator(object):
    def __init__(self, latent_dim, hidden_dim, signal_dim):
        self.latent_dim = latent_dim
        self.hidden_dim = hidden_dim
        self.signal_dim = signal_dim
        self.layer1 = LinearLayer(self.latent_dim, self.hidden_dim, bias=True, act_func='relu')
        self.layer2 = LinearLayer(self.hidden_dim, self.hidden_dim, bias=True, act_func='relu')
        self.layer3 = LinearLayer(self.hidden_dim, self.signal_dim, bias=True, act_func='tanh')

    def forward(self, inputs):
        x = inputs.reshape(1, self.latent_dim)
        x = self.layer1.forward(x)
        x = self.layer2.forward(x)
        x = self.layer3.forward(x)
        return x

    def backward(self, delta, apply_grads=True):
        delta = self.layer3.backward(delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=apply_grads)
        delta = self.layer2.backward(delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=apply_grads)
        delta = self.layer1.backward(delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=apply_grads)
        return delta

生成网络隐层的激活函数采用ReLU,输出层的激活函数采用Tanh。

构造判别网络

类似地,判别网络的代码也很简便。

class Discriminator(object):
    def __init__(self, signal_dim, hidden_dim, num_class):
        self.signal_dim = signal_dim
        self.hidden_dim = hidden_dim
        self.num_class = num_class
        self.layer1 = LinearLayer(self.signal_dim, self.hidden_dim, bias=True, act_func='lrelu')
        self.layer2 = LinearLayer(self.hidden_dim, self.hidden_dim, bias=True, act_func='lrelu')
        self.layer3 = LinearLayer(self.hidden_dim, self.num_class, bias=True, act_func='sigmoid')

    def forward(self, inputs):
        x = inputs.reshape(1, self.signal_dim)
        x = self.layer1.forward(x)
        x = self.layer2.forward(x)
        x = self.layer3.forward(x)
        return x

    def backward(self, delta, apply_grads=True):
        delta = self.layer3.backward(delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=apply_grads)
        delta = self.layer2.backward(delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=apply_grads)
        delta = self.layer1.backward(delta, gradient_clip=0.2, weight_clip=0.25, lr=0.01, apply_grads=apply_grads)
        return delta

判别网络隐层的激活函数采用Leaky ReLU,输出层的激活函数采用Sigmoid。

参数初始化和误差函数

1. 参数初始化

def weight_init(in_channels, out_channels, method='xavier'):
    if method == 'xavier':
        scale = np.sqrt(2. / (in_channels + out_channels))
    elif method == 'he':
        scale = np.sqrt(1. / in_channels)
    elif method == 'constant':
        scale = 0.05
    else:
        raise ValueError("Unsupported `method` arg: {}".format(method))
    return np.random.uniform(-scale, scale, (in_channels, out_channels))

有3种初始化方式:Xavier,He和常量。默认采用Xavier初始化。

2. 定义误差函数

class LossFunc(object):
    def __init__(self):
        self.logit = None
        self.label = None

    def forward(self, logit, label):
        if logit[0, 0] < 1e-7:
            logit[0, 0] = 1e-7
        if 1. - logit[0, 0] < 1e-7:
            logit[0, 0] = 1. - 1e-7
        self.logit = logit
        self.label = label
        return - (label * np.log(logit) + (1-label) * np.log(1-logit))

    def backward(self):
        return (1-self.label) / (1-self.logit) - self.label / self.logit

为了防止np.log出现数值计算的错误,对十分接近于0和1的输入值做一下处理。

模型训练

1. 实例化两个网络和误差函数

# Create model
G = Generator(latent_dim=Z_DIM, hidden_dim=G_HIDDEN, signal_dim=X_DIM)
D = Discriminator(signal_dim=X_DIM, hidden_dim=D_HIDDEN, num_class=1)
criterion = LossFunc()

2. 迭代训练

# Model training
for itr in range(ITER_NUM):
    # Update D with real data
    x_real = get_samples(signal_dim=10, random=True)
    y_real = D.forward(x_real)
    loss_D_r = criterion.forward(y_real, real_label)
    d_loss_D = criterion.backward()
    D.backward(d_loss_D)

    # Update D with fake data
    z_noise = np.random.randn(Z_DIM)
    x_fake = G.forward(z_noise)
    y_fake = D.forward(x_fake)
    loss_D_f = criterion.forward(y_fake, fake_label)
    d_loss_D = criterion.backward()
    D.backward(d_loss_D)

    # Update G with fake data
    y_fake_r = D.forward(x_fake)
    loss_G = criterion.forward(y_fake_r, real_label)
    d_loss_G = criterion.backward()
    d_loss_G = D.backward(d_loss_G, apply_grads=False)
    G.backward(d_loss_G)
    loss_D = loss_D_r + loss_D_f
    if itr % 500 == 0:
        print('Itr {} D_r loss {:.4f} D_f loss {:.4f} G loss {:.4f}'.format(itr, loss_D_r.item((0, 0)), loss_D_f.item((0, 0)), loss_G.item((0, 0))))

每次迭代分为3步:

(1) 用真实样本计算判别网络D的误差,并更新D的参数。作用是让D能够识别真样本。

(2) 用生成网络G产生的假样本计算判别网络D的误差,并更新D的参数。作用是让D能够识别假样本。第1步和第2步合起来即完成了对D的训练,目的是使D能够对真样本给出预测值1,并对G产生的假样本给出预测值0。

(3) 用同样的假样本再次计算判别网络D的误差,但这个误差是相对于真标签的误差,目的是让D对G产生的假样本给出预测值1。此时只训练G,不训练D。第2步和第3步合起来就是对抗训练的精髓之处。虽然同样是假样本,在第2步以标签0训练D,和在第3步以标签1训练G,两个网络的训练是分开的,可以避免同时以相反的目标训练使得网络的训练变成原地踏步。

结果可视化

直接给出代码和可视化图像

# Results visualization
x_axis = np.linspace(0, 10, 10)
plt.subplot(211)
for i in range(50):
    z_noise = np.random.randn(Z_DIM)
    x_fake = G.forward(z_noise)
    y_fake = D.forward(x_fake)
    loss_D = criterion.forward(y_fake, real_label)
    plt.plot(x_axis, x_fake.reshape(X_DIM))
plt.ylim((-1, 1))
plt.gca().set_title('fake')
plt.subplot(212)
for i in range(50):
    x_real = get_samples(signal_dim=10, random=True)
    plt.plot(x_axis, x_real)
plt.ylim((-1, 1))
plt.gca().set_title('real')
plt.show()

模型比较小,训练起来也很快。在i7-9700k上大概耗时23秒(我在跑这段代码时也在跑其他任务,在低负载的CPU上应该会更快)。

可以看出生成的假信号(上)与真实正弦信号(下)还是很相似的。但这个GAN模型还是有局限性,比如把训练数据的频率、振幅等抖动范围扩大一些,模型就很难产生丰富的样本了,比如这样(产生了mode collapse现象):


框架设计

上面的代码能够在一定程度上反映出一个深度学习框架应该具有的功能。我们一一来看:

  1. 底层数据结构。这里我们直接利用了NumPy的ndarray作为数据容器。新框架也需要实现一个底层的张量库,包括CPU端的内存分配,CUDA端的类型转换,类似NumPy的viewbroadcast,以及针对张量的常用数值计算函数。我之前尝试过在xtensor的基础之上用模板类的方式写一个header-only的张量库,但写着写着就发现有些张量的计算操作完全依靠xtensor自带的API实现起来很繁琐,去改xtensor的底层代码倒不如按照自己的喜好重新造个轮子吧。
  2. 数据预处理。这里我们只是利用数学函数直接产生训练数据,而且也不是mini-batch的形式。新框架当然需要专门处理数据I/O的接口,类似于PyTorch中的DatasetDataLoader,包含图像数据的解码/编码,文本数据的词典,数据标准化/增强的常见操作,以及产生mini-batch。这个在平时用PyTorch做一些实验的时候写过一些工具,逻辑上基本可以直接照搬过来。图像和视频的解码/编码倒是可以直接调用OpenCV这样的第三方库,可以节约大量的时间。
  3. 计算图。这里我们在网络里体现各层之间的关系时,只是把上一行代码的结果作为下一行代码的输入。新框架确实需要有计算图才能构造结构更复杂的模型。实现一个DAG难度并不大,而且之前也写过一个小型的DAG库。至于如何把数据节点、计算节点、自动微分与DAG揉在一起,需要好好斟酌一下。我计划在系列的下一篇文章用Python先写一个简单的自动微分工具,验证一下思路。
  4. 计算节点和优化器。这里自动求导的过程完全依赖层内单独地处理,而且激活函数和梯度下降也都写死在了层里。实际上计算函数和优化算法应该分开来,所有的张量在计算图内用数据节点表示,对张量进行的任何操作都以计算节点表示,这样依靠节点之间的连接就可以完成前向计算。而反向计算就需要引入优化器。梯度在DAG内逆向流动,遇到计算节点就调用该节点内部定义的导函数求导,遇到数据节点(而且该数据是可训练的)就调用优化器更新该节点的值。预计未来在底层张量库和计算图搭建好以后这一部分是工作量最大的,因为有数不清的函数需要实现,这些函数和优化方法的测试肯定要花费不少精力。
  5. CPU和GPU加速。预计计划先在CPU端实现不怎么优化的计算函数作为基准,然后直接调用cuDNN的现成接口实现GPU版本,再研究一下CPU端的多线程优化(最繁琐也是最关键的当然属卷积计算的优化了,需要针对不同的输入数据采用不同的优化策略),最后再琢磨一下计算函数的CUDA C实现,如果框架本身写得够轻的话,性能上应该是有希望的。
  6. 可视化等琐碎的功能。包括便于调试的工具,可视化工具,对计算图的优化(比如可以考虑自动生成静态图以提高运行效率)等。
  7. 接口语言。现在最流行的几个深度学习框架都是CUDA执行数值计算->C++实现具体操作->Python提供用户接口这样的逻辑。我对C++和Python都比较喜欢,但总是感觉这样的安排有点类似于在越野大吉普上安装航空发动机。虽然可以保证会开车的人都能体验原地起飞的快感,但大吉普的外壳在高速下已经成为拖累。这个时候只有重新设计气动外形才能发挥出航发的优势,但是这样可能只有会开飞机的人才能开得动这样的“车”。大公司开发的开源框架必须要考虑用户群体(很大一部分人是不屑于花时间学新语言的研究者),但我这个项目单纯属于自己玩自己的,所以只计划开发一个C++的框架,完全不管Python的接口(前期也许会用Python做一些思路验证的工作)。这样也能节约不少时间。

这个项目肯定需要花费大量的时间和精力,也很有可能会被我半途而废。把进度贴在博客里也起到一个自我鞭策的作用。

买五花肉和佐料的钱已经准备好了,就看最后出锅的是香喷喷的红烧肉,还是把钱弄丢了盯着锅发呆,还是锅糊了引来消防员灭火,就拭目以待吧。

把这篇文章分享给你的朋友:
Subscribe
订阅评论
guest
1 评论
最新
最旧 得票最多
Inline Feedbacks
View all comments
trackback
5 月 之前

[…] 我们在上一篇文章《从零搭建深度学习框架(一)用NumPy实现GAN》中用Python+NumPy实现了一个简单的GAN模型,并大致设想了一下深度学习框架需要实现的主要功能。其中,不确定性最大的要属于计算图的实现。所以在这篇文章中,我们用Python实现一个简单的计算图,并用它对一个线性模型进行自动微分,作为后续C++开发的思路验证。 […]