前言

好久都没更新联邦学习相关内容了,这也是我更新这篇我认为非常硬核的文章的原因,这也算是实现了我在学习联邦学习半年以来的一个目标,基于混合加密机制实现联邦学习任务,这次任务使用的框架是FedAvg,在github上非常热门的联邦学习模拟实现方案,FedAvg的代码还是非常好理解的,本文的结构将主要分为三个部分,第一部分是对FedAvg代码的讲解和修改,第二部分将差分隐私机制加入到FedAvg中,包括高斯机制和拉普拉斯机制,第三部分将同态加密算法Paillier加入到第二部分中,实现基于同态加密和差分隐私混合加密机制的FedAvg,话不多说,就让我们开始吧!


首先把项目的代码链接附上(请为我点亮一颗Star!)
平台链接
Githubhttps://github.com/heroding77/fedavg_encrypt
Giteehttps://gitee.com/heroding77/fedavg_encrypt

1. FedAvg

联邦学习有两种更新方式,对服务器端,一种是共享梯度方式,一种是共享模型参数。共享模型参数是做了几轮梯度下降,针对共享梯度,它的一大优势是通信代价会低;同时,对整个梯度信息的保护也会更好,因此本次联邦学习实战的内容也是基于共享参数形式实现的。本次实战使用的FedAvg是github上非常热门的联邦学习实现方案,它模拟了多个参与方进行联邦学习的场景,并且支持Non-IID和独立同分布数据集,代码逻辑十分清晰,对于联邦学习初学者入门再适合不过了。
在这里插入图片描述
FedAvg实现的是联邦学习场景下的手写数字识别模型的训练,它的结构如下图所示,包括数据集,模型,参与方和服务器端代码,以及数据集导入的代码,除了TensorFlow版本,它还包括PyTorch版本,更便于支持gpu运行代码,本文介绍的代码部分也是基于PyTorch实现的。下面就对各个部分进行简单的介绍。

1.1 getData.py

getData.py文件是对MNIST数据集的数据集提取和预处理,获得训练数据集和测试数据集,MNIST数据集的数据格式为-ubyte.gz,因此需要编写函数对图片数据进行提取,如下所示:

def extract_images(filename):
    """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
    print('Extracting', filename)
    with gzip.open(filename) as bytestream:
        magic = _read32(bytestream)
        if magic != 2051:
            raise ValueError(
                    'Invalid magic number %d in MNIST image file: %s' %
                    (magic, filename))
        num_images = _read32(bytestream)
        rows = _read32(bytestream)
        cols = _read32(bytestream)
        buf = bytestream.read(rows * cols * num_images)
        data = np.frombuffer(buf, dtype=np.uint8)
        data = data.reshape(num_images, rows, cols, 1)
        return data

本质上是将比特流数据转换为[index, y, x, depth]维度的数组形式,index为下标,y和x分别表示每张图像行列像素点数目,depth是图像的通道,对于MNIST黑白图片,通道为1。接着编写图片标签提取函数,将标签提取出并转为one-hot形式,便于模型训练时计算损失函数并反向传播,代码如下:

# 标签one-hot编码
def dense_to_one_hot(labels_dense, num_classes=10):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels = labels_dense.shape[0]
    index_offset = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes))
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

# 提取标签
def extract_labels(filename):
    """Extract the labels into a 1D uint8 numpy array [index]."""
    print('Extracting', filename)
    with gzip.open(filename) as bytestream:
        magic = _read32(bytestream)
        if magic != 2049:
            raise ValueError(
                    'Invalid magic number %d in MNIST label file: %s' %
                    (magic, filename))
        num_items = _read32(bytestream)
        buf = bytestream.read(num_items)
        labels = np.frombuffer(buf, dtype=np.uint8)
        return dense_to_one_hot(labels)

图片和标签信息提取完成后,多次assert以验证提取图片过程没有出现问题。接着将图片展平,即每张图片28 * 28被展平为784 * 1,这里的目的是用于全连接神经网络的使用,如果只是用卷积神经网络,大可不必将其展平(在模型部分还要复原回去)。紧接着就是对数据标准化处理,如果不进行这一步,模型训练的结果可能惨不忍睹(比如预测所有图像都是7),这是由于发生了梯度爆炸现象。标准化处理后是两个数据处理方式:IID和Non-IID,熟悉联邦学习的读者应该了解这个概念,这也是联邦学习面临的一个巨大问题——数据非独立同分布。
在传统应用场景中,数据存储在中心,ML model 可以获取所有数据的整体信息,但是在联邦学习中,由于数据仅存储在本地,导致数据之间分布的不一致性,例如,美国西海岸针叶林茂盛,东海岸阔叶林分布广泛,又因为用户之间喜好和生活习惯不同,因此产生了数据分布的不一致性; 另一方面,以家庭或亲属关系的群体之间会相互影响,产生了数据分布的不独立性,以FedAvg中MNIST为例,Non-IID的数据应该是这样表现的:参与方1只有手写数字1,参与方2只有手写数字2,以此类推。在Non-IID数据场景下,模型训练会异常缓慢,效率很低,这将在之后的实验所有介绍。
FedAvg提供了IID和Non-IID两种数据处理形式,帮助用户理解实际应用场景下联邦学习的过程,Non-IID数据就是不对数据进行操作,IID数据就是将所有数据打乱,这样各个参与方数据的分布就满足独立同分布了,代码如下:

class GetDataSet(object):
    def __init__(self, dataSetName, isIID):
        self.name = dataSetName
        self.train_data = None
        self.train_label = None
        self.train_data_size = None
        self.test_data = None
        self.test_label = None
        self.test_data_size = None

        self._index_in_train_epoch = 0

        if self.name == 'mnist':
            self.mnistDataSetConstruct(isIID)
        else:
            pass


    def mnistDataSetConstruct(self, isIID):
        data_dir = r'./data/MNIST'
        # 选定图片路径
        train_images_path = os.path.join(data_dir, 'train-images-idx3-ubyte.gz')
        train_labels_path = os.path.join(data_dir, 'train-labels-idx1-ubyte.gz')
        test_images_path = os.path.join(data_dir, 't10k-images-idx3-ubyte.gz')
        test_labels_path = os.path.join(data_dir, 't10k-labels-idx1-ubyte.gz')
        # 从.gz中提取图片
        train_images = extract_images(train_images_path)
        train_labels = extract_labels(train_labels_path)
        test_images = extract_images(test_images_path)
        test_labels = extract_labels(test_labels_path)

        assert train_images.shape[0] == train_labels.shape[0]
        assert test_images.shape[0] == test_labels.shape[0]

        self.train_data_size = train_images.shape[0]
        self.test_data_size = test_images.shape[0]

        # mnist黑白图片通道为1
        assert train_images.shape[3] == 1
        assert test_images.shape[3] == 1
        # 图片展平
        train_images = train_images.reshape(train_images.shape[0], train_images.shape[1] * train_images.shape[2])
        test_images = test_images.reshape(test_images.shape[0], test_images.shape[1] * test_images.shape[2])
        
        # 标准化处理
        train_images = train_images.astype(np.float32)
        train_images = np.multiply(train_images, 1.0 / 255.0)
        test_images = test_images.astype(np.float32)
        test_images = np.multiply(test_images, 1.0 / 255.0)

        # 是否独立同分布
        if isIID:
            # 打乱顺序
            order = np.arange(self.train_data_size)
            np.random.shuffle(order)
            self.train_data = train_images[order]
            self.train_label = train_labels[order]
        else:
            # 按照0——9顺序排列
            labels = np.argmax(train_labels, axis=1)
            order = np.argsort(labels)
            self.train_data = train_images[order]
            self.train_label = train_labels[order]

        self.test_data = test_images
        self.test_label = test_labels

1.2 Models.py

Models.py文件中,定义了执行任务所使用的模型,这里提供了两种模型,简单全连接层模型和简单卷积神经网络模型,每个模型都继承了torch的nn.Module,表明是神经网络模型,每个模型类中都定义了__init__(self)和forward()两个函数,初始化函数中定义了每个模型的组件,比如简单全连接层模型中定义了三个卷积模块,两个池化模块,两个线性模块,并在forward()函数中组装起来。代码如下:

import torch
import torch.nn as nn
import torch.nn.functional as F

'''
简单全连接模型
'''
class Mnist_2NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(784, 200)
        self.fc2 = nn.Linear(200, 200)
        self.fc3 = nn.Linear(200, 10)

    def forward(self, inputs):
        tensor = F.relu(self.fc1(inputs))
        tensor = F.relu(self.fc2(tensor))
        tensor = self.fc3(tensor)
        return tensor

'''
简单卷积神经网络模型
'''
class Mnist_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # 定义每一层模型
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=0)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=0)
        self.fc1 = nn.Linear(3*3*64, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, inputs):
        # 构造模型
        tensor = inputs.view(-1, 1, 28, 28)
        tensor = F.relu(self.conv1(tensor))
        tensor = self.pool1(tensor)
        tensor = F.relu(self.conv2(tensor))
        tensor = self.pool2(tensor)
        tensor = F.relu(self.conv3(tensor))     
        tensor = tensor.view(-1, 3*3*64)
        tensor = F.relu(self.fc1(tensor))
        tensor = self.fc2(tensor)
        return tensor


至于每一个模块中具体的参数信息,包括channel的大小,卷积核的大小,步长,padding,神经元个数的设计,就不具体细说了,请各位自行阅读,并修改成自己想用的模型。

1.3 client.py

参与方的代码都存放在client.py文件中,里面包括了两个类的实现,一个是单个参与方类client,另一个是ClientsGroup,即参与方的集合类,首先分析一下client类,它定义了一个初始化函数,一个本地更新函数,一个本地评估函数(由于评估部分在server.py实现,这里就直接pass了,可以自定义修改)。
初始化函数输入参数有trainDataSet和dev,前者是输入到该参与方的数据集,dev是计算环境(cpu or gpu)。初始化函数中定义了训练数据,计算环境,本地参数等。
本地更新函数输入了本地迭代次数,本地BatchSize,神经网络模型,损失函数,优化器,全局参数。在本地更新函数中,首先向模型中导入全局参数,根据localBatchSize转换本地数据,接着执行localEpoch次本地更新。本地更新的过程就是普通的正向传播,梯度下降,反向传播的过程,这里就不赘述了。

import numpy as np
import torch
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
from getData import GetDataSet


class client(object):
    def __init__(self, trainDataSet, dev):
        self.train_ds = trainDataSet
        self.dev = dev
        self.train_dl = None
        self.local_parameters = None

    def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters):
        Net.load_state_dict(global_parameters, strict=True)
        self.train_dl = DataLoader(self.train_ds, batch_size=localBatchSize, shuffle=True)
        for epoch in range(localEpoch):
            for data, label in self.train_dl:
                data, label = data.to(self.dev), label.to(self.dev)
                preds = Net(data)
                loss = lossFun(preds, label)
                loss.backward()
                opti.step()
                opti.zero_grad()

        return Net.state_dict()

    def local_val(self):
        pass

1.4 server.py

server.py是FedAvg实现的核心部分,项目的执行也是直接运行该文件,原github运行的代码为:

python server.py -nc 100 -cf 0.1 -E 5 -B 10 -mn mnist_cnn  -ncomm 1000 -iid 0 -lr 0.01 -vf 20 -g 0

可见参数繁多,执行起来过于繁琐,因此在介绍前,我对该部分进行了简单的修改,原代码为:

parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="FedAvg")
parser.add_argument('-g', '--gpu', type=str, default='0', help='gpu id to use(e.g. 0,1,2,3)')
parser.add_argument('-nc', '--num_of_clients', type=int, default=100, help='numer of the clients')
parser.add_argument('-cf', '--cfraction', type=float, default=0.1, help='C fraction, 0 means 1 client, 1 means total clients')
parser.add_argument('-E', '--local_epoch', type=int, default=5, help='local train epoch')
parser.add_argument('-B', '--batch_size', type=int, default=10, help='local train batch size')
parser.add_argument('-mn', '--model_name', type=str, default='mnist_2nn', help='the model to train')
parser.add_argument('-lr', "--learning_rate", type=float, default=0.01, help="learning rate, \
                    use value from origin paper as default")
parser.add_argument('-vf', "--val_freq", type=int, default=5, help="model validation frequency(of communications)")
parser.add_argument('-sf', '--save_freq', type=int, default=20, help='global model save frequency(of communication)')
parser.add_argument('-ncomm', '--num_comm', type=int, default=1000, help='number of communications')
parser.add_argument('-sp', '--save_path', type=str, default='./checkpoints', help='the saving path of checkpoints')
parser.add_argument('-iid', '--IID', type=int, default=0, help='the way to allocate data to clients')

我将该部分代码改为了conf.json文件,这样之后所有的超参数调整都在该文件中进行,执行起来只需要链接到该文件即可,conf.json内容如下:

{
	// 是否使用gpu
	"gpu" : "0",
	// 参与方个数
	"num_of_clients" : 10,
	// 参与方百分比
	"cfraction" : 1,
	// 本地迭代训练次数
	"local_epoch" : 3,
	// mini-batch
	"batch_size" : 100,
	// 模型名称
	"model_name" : "mnist_cnn",
	// 学习率
	"learning_rate" : 0.005,
	// 评估频率 次/epoch
	"val_freq" : 1,
	// 模型参数存储频率 次/epoch
	"save_freq" : 1000,
	// 全局迭代次数
	"num_comm" : 100,
	// 模型参数存储路径
	"save_path" : "./checkpoints",
	// 是否独立同分布
	"IID" : false,
  // 差分隐私噪声幅度
	"sigma" : 0.1,
  // 是否添加差分隐私
	"noise" : 1
	
}

执行代码为:

python server.py -c ./utils/conf.json

当然也要在main函数中添加解析json文件的代码,代码如下:

# 定义解析器
parser = argparse.ArgumentParser(description='FedAvg')
parser.add_argument('-c', '--conf', dest='conf')
arg = parser.parse_args()

# 解析器解析json文件
with open(arg.conf, 'r') as f:
    args = json.load(f)

读取到json文件数据后,首先将这些超参数取出,定义相关的模型实例,参与方实例,计算环境,损失函数和优化器,代码如下:

# 创建中间参数保存目录
test_mkdir(args['save_path'])

# 使用gpu or cpu
os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# 定义使用模型(全连接 or 简单卷积)
net = None
if args['model_name'] == 'mnist_2nn':
    net = Mnist_2NN()
elif args['model_name'] == 'mnist_cnn':
    net = Mnist_CNN()

# 如果gpu设备不止一个,并行计算
if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    net = torch.nn.DataParallel(net)
net = net.to(dev)

# 定义损失函数和优化器
loss_func = F.cross_entropy
opti = optim.Adam(net.parameters(), lr=args['learning_rate'])

# 定义多个参与方,导入训练、测试数据集
myClients = ClientsGroup('mnist', args['IID'], args['num_of_clients'], dev)
testDataLoader = myClients.test_data_loader
trainDataLoader = myClients.train_data_loader

# 每轮迭代的参与方个数
num_in_comm = int(max(args['num_of_clients'] * args['cfraction'], 1))

# 初始化全局参数
global_parameters = {}
for key, var in net.state_dict().items():
    global_parameters[key] = var.clone()

# 定义噪声的类型和幅度
dp = args['noise']
sigma = args['sigma']

紧接着开始全局迭代,首先打乱参与方的顺序,选取超参数中确定的百分比个数的参与方,接着初始化全局参数,并分别对选中的参与方进行梯度下降,将梯度下降返回的本地参数传回,累加到全局参数中,梯度下降结束后,更新全局参数(即梯度平均),根据设定的val_freq和save_freq进行模型评估和保存参数的操作。

# 全局迭代轮次
for i in range(args['num_comm']):
    print("communicate round {}".format(i+1))
	# 打乱排序,确定num_in_comm个参与方
    order = np.random.permutation(args['num_of_clients'])
    clients_in_comm = ['client{}'.format(i) for i in order[0:num_in_comm]]

    sum_parameters = None
    
    # 可视化进度条对选中参与方local_epoch
    for client in tqdm(clients_in_comm):
    	# 本地梯度下降
        local_parameters = myClients.clients_set[client].localUpdate(args['epoch'], args['batchsize'], net,
                                                                     loss_func, opti, global_parameters)
        # 初始化sum_parameters
        if sum_parameters is None:
            sum_parameters = {}
            for key, var in local_parameters.items():
                sum_parameters[key] = var.clone()
        else:
            for var in sum_parameters:
                sum_parameters[var] = sum_parameters[var] + local_parameters[var]
	
	# 更新全局梯度参数
    for var in global_parameters:
        global_parameters[var] = (sum_parameters[var] / num_in_comm)

	# 不进行计算图构建(无需反向传播)
    with torch.no_grad():
    	# 满足评估的条件,用测试集进行数据评估
        if (i + 1) % args['val_freq'] == 0:
        	# strict表示key、val严格重合才能执行(false不对齐部分默认初始化)
            net.load_state_dict(global_parameters, strict=True)
            sum_accu = 0
            num = 0
            for data, label in testDataLoader:
                data, label = data.to(dev), label.to(dev)
                preds = net(data)
                preds = torch.argmax(preds, dim=1)
                sum_accu += (preds == label).float().mean()
                num += 1
            print('accuracy: {}'.format(sum_accu / num))

	# 根据格式和给定轮次保存参数信息
    if (i + 1) % args['save_freq'] == 0:
        torch.save(net, os.path.join(args['save_path'],
                                     '{}_num_comm{}_E{}_B{}_lr{}_num_clients{}_cf{}'.format(args['model_name'],
                                                                                            i, args['epoch'],
                                                                                            args['batchsize'],
                                                                                            args['learning_rate'],
                                                                                            args['num_of_clients'],
                                                                                            args['cfraction'])))

为了能够实时检测训练集上模型的accuracy,可以再加一个训练集模型评估,即:

   # 遍历每个测试数据
   for data, label in testDataLoader:
       # 转成gpu数据
       data, label = data.to(dev), label.to(dev)
       # 预测(返回结果是概率向量)
       preds = net(data)
       # 取最大概率label
       preds = torch.argmax(preds, dim=1)               
       sum_accu += (preds == label).float().mean()
       num += 1
   print('val_accuracy: {}'.format(sum_accu / num))

   # 遍历每个训练数据
   for data, label in trainDataLoader:
       # 转成gpu数据
       data, label = data.to(dev), label.to(dev)
       # 预测(返回结果是概率向量)
       preds = net(data)
       # 取最大概率label
       preds = torch.argmax(preds, dim=1)              
       sum_accu += (preds == label).float().mean()
       num += 1
   print('train_accuracy: {}'.format(sum_accu / num))

这也便于观察模型是否出现了过拟合的情况。
以上是对整个FedAvg代码部分的完整介绍,接下来对其性能进行简单评估,测试其在Non-IID和IID数据集上的性能表现。

1.5 性能评估

1.5.1 Non-IID和IID

首先对比一下在独立同分布和非独立同分布FedAvg下,简单卷积神经网络模型的性能。这里我提前确定了的超参数,其中学习率为0.005,local-epoch为3,global-epoch为20,不设定噪声(noise=0),是否IID在conf.json中设定IID=1 or 0,首先是是在参与方为2,并且参与率为100%的情况下,执行联邦学习,结果如下:

在这里插入图片描述
可以清晰看出,经过20轮全局迭代后,Non-IID的模型性能与IID的模型性能有很大差异,差异主要体现在前8轮,IID下的模型经过两轮全局迭代后accuracy就达到了99%,而Non-IID经过6轮全局迭代accuracy才超过95%,这样的对比可能还是不够明显,因为十轮后二者的accuracy都在99%左右徘徊,那么我们把参与率设置为50%,模拟训练中参与方退出的场景,这样得到的结果如下:

在这里插入图片描述

结果是非常惊人的,经过二十轮迭代,Non-IID模型的accuracy仍然徘徊在50%~60%之间,而IID模型的accuracy几乎不受参与方的参与率影响,这也说明在实际应用场景下,模型的训练效果要比理想情况下差很多。
此外,根据上述两图,val-acc和train-acc的结果差异不大,说明经过20轮训练后模型没有出现过拟合的情况,因此在之后的实验中,只拿val-acc作为评估的标准。
参与方的参与率会对模型产生很大的影响,但是这是在参与方不多的情况(为2),如果参与方很多,那么结果会不会有不同呢?下面我将单独对IID环境和Non-IID环境下多个参与方的场景训练结果进行评估。

1.5.2 IID场景参与方的影响

本小节设置了4组对照实验,在IID场景下,设置参与方个数为2,10,50和100,参与方百分百参与到模型训练中,可以看到随着参与方的增多,前三轮的模型性能会有所影响,但是三轮之后,模型的accuracy都能达到95%以上,说明随着参与方数量的增加,满足一定轮数的global-epoch,模型的性能并没有明显损失。在这里插入图片描述

那么在IID场景下,如果参与方中间退出,随着参与方数量的增加,会有明显影响吗,为此,我又做了四组对照实验,设置参与方参与的百分比为50%,结果如下:
在这里插入图片描述

结果和参与方全部参与的结果几乎无异,都是前三轮的模型性能会有所影响,但是三轮之后,模型的accuracy都能达到95%以上,所以可以得出一个结论,在IID场景下,如果没有严格的global-epoch的限制(比如global-epoch不超过5)模型的性能几乎不受参与方个数的影响,并且模型能够允许在训练过程中参与方退出。

1.5.2 Non-IID场景参与方的影响

本小节同样设置了4组对照实验,在Non-IID场景下,设置参与方个数为2,10,50和100,并且参与方的参与率为100%,模型表现如下图所示:
在这里插入图片描述

和预想中的一样,即使是两个参与方的情况下,也需要六轮全局迭代才能达到稳定的accuracy,随着参与方的增加,模型性能波动幅度增大,达到稳定的accuracy也需要更大的迭代次数,这就是数据分布不均匀对模型训练产生的影响,当然我们也可以想象,如果参与方的参与率为50%,结果肯定比全部参与的场景有着更大的差距。这时候又涉及另外一个问题,随着参与方的增多,模型训练达到稳定的accuracy需要更大的迭代次数的规律还存在吗,或者说,参与方多的场景训练出的模型性能要比参与方少的模型性能低的规律还存在吗,结果如下图所示:
在这里插入图片描述
想必这个结果出乎了许多人的意料,结果竟然恰恰相反!随着参与方的增大,模型的性能可以在20轮后迅速达到稳定,并且准确率也达到了99%,相反参与方低的如2的模型,直到100轮后,模型仍然未能达到最优,还在继续训练,accuracy也不到90%,这究竟是为什么呢?
让我们回归到数据本身去分析,模型训练结果差,是因为每轮接触的模型种类少,MNIST有10种结果,在Non-IID场景,如果参与方为2,那么50%的参与率下,每轮只能接触5种结果,而随着参与方的增多,每轮接触的种类也同时增多,当每轮每种结果都能学习到的时候,那么模型的性能自然更好。

1.6 FedAvg总结

经过上面的介绍和实验,可以看出,FedAvg代码还是非常精简干练的,各个模块分工明确,出色的模拟了联邦学习场景下多个参与方参与训练,有助于初学者对联邦学习的理解与实践。实验部分是对联邦学习场景下的数据分布情况的评估,结果符合实验预期,也进一步强化了读者对联邦学习的理解。接下来就是本文的重中之重,将联邦学习隐私保护算法加入到联邦学习环境中,可以说,没有隐私保护的联邦学习不是真正的联邦学习,充其量只是分布式机器学习,那么就让我们开始吧!

2. 差分隐私

本文实现的差分隐私包括拉普拉斯机制和高斯机制,对于这两种机制我将分别进行讲解。首先还是要对差分隐私的概念进行简单的介绍,虽然联邦学习允许每个参与方私有训练数据保存在本地,但是恶意的参与方或者诚实但好奇的服务器可能会对参与方的中间参数发起推理攻击,以推断出参与方的私有数据。为了保护参与方免受这些隐私攻击,差分隐私即DP技术被广泛应用。DP是一个严格的数学框架,其中当且仅当训练数据集中任意单个数据在随机化机制 A \mathcal{A} A作用下的输出发生统计上不可分辨的变化时, A \mathcal{A} A被认为是差分私有的。正式地说,DP的定义如下:
具有域D和范围R的随机机制 A : D → R \mathcal{A}:\mathcal{D}\rightarrow\mathcal{R} A:DR,被认为满足 ( ϵ , δ ) (\epsilon,\delta) (ϵ,δ)-DP的条件是,如果任何两个相邻的数据集 D , D ′ ∈ D {D,D}^\prime\in\mathcal{D} D,DD最多只在一个数据样本上不同,并且对于所有的 S ∈ R \mathcal{S}\in\mathcal{R} SR,有:
P r [ A ( D ) ∈ S ] ≤ e ϵ P r [ A ( D ′ ) ∈ S ] + δ Pr\left[\mathcal{A}\left(D\right)\in\mathcal{S}\right]\le e^\epsilon Pr\left[\mathcal{A}\left(D^\prime\right)\in\mathcal{S}\right]+\delta Pr[A(D)S]eϵPr[A(D)S]+δ
其中, ϵ \epsilon ϵ为隐私预算, δ \delta δ表示满足 ϵ − D P \epsilon-DP ϵDP被破坏的概率。

2.1 拉普拉斯机制与高斯机制

常见的差分隐私机制有拉普拉斯机制和高斯机制,拉普拉斯机制满足 ( ϵ , 0 ) (\epsilon,0) (ϵ,0)-DP,高斯机制满足 ( ϵ , δ ) (\epsilon,\delta) (ϵ,δ)-DP,后者相对前者更为松散,可以理解为满足 ( ϵ , 0 ) (\epsilon,0) (ϵ,0)-DP,但有 δ \delta δ的概率被破坏。拉普拉斯机制主要针对的是数值型的查询,在介绍拉普拉斯机制之前,先对 l 1 l_1 l1敏感度进行定义。一个函数 f : D → R f:\mathcal{D}\rightarrow\mathcal{R} f:DR l 1 l_1 l1敏感度为:

在这里插入图片描述

其中 D , D ′ ∈ D D,D^\prime\in\mathcal{D} D,DD △ f \triangle f f刻画了单个记录改变函数f的输出,最大能改变多少。拉普拉斯机制简单来说就是在函数f的实数输出上加上噪声, △ f \triangle f f可以控制噪声的幅度,如果 △ f \triangle f f较大,那么相应的噪声幅度就应该更大一些,这样才提供更高的隐私保证。加入的噪声满足拉普拉斯概率分布。拉普拉斯分布是一个连续的分布,均值 μ = 0 \mu=0 μ=0的拉普拉斯分布的概率函数为:
L a p ( x | b ) = 1 2 b exp ⁡ ( − ∣ x ∣ b ) Lap\left(x\middle| b\right)=\frac{1}{2b}\exp{\left(-\frac{\left|x\right|}{b}\right)} Lap(xb)=2b1exp(bx)
分布的方差 σ 2 = 2 b 2 \sigma^2=2b^2 σ2=2b2,拉普拉斯分布图像如下图所示:
在这里插入图片描述
现在对拉普拉斯机制进行定义,给定任意函数 f : D → R f:\mathcal{D}\rightarrow\mathcal{R} f:DR,拉普拉斯机制为:
f ′ ( x ) = f ( x ) + L a p ( △ f ϵ ) f^\prime\left(x\right)=f\left(x\right)+Lap\left(\frac{\triangle f}{\epsilon}\right) f(x)=f(x)+Lap(ϵf)
其中 ϵ \epsilon ϵ是隐私预算, L a p ( △ f ϵ ) Lap(\frac{\triangle f}{\epsilon}) Lap(ϵf)是从均值 μ = 0 \mu=0 μ=0 b = △ f ϵ b=\frac{\triangle f}{\epsilon} b=ϵf的拉普拉斯分布中采样的独立同分布随机变量。

高斯机制定义如下:
给定任意一个可以将数据集 D ∈ D D\in\mathcal{D} DD映射到向量 o ∈ R d o\in\mathbb{R}^d oRd的函数f,通过向输出向量o的d个坐标添加高斯噪声实现 ( ϵ , δ ) (\epsilon,\delta) (ϵ,δ)-DP,即:
A ( D ) = f ( D ) + N ( 0 , σ 2 I d ) \mathcal{A}\left(D\right)=f\left(D\right)+\mathcal{N}\left(0,{\sigma^2I}_d\right) A(D)=f(D)+N(0,σ2Id)
其中 N \mathcal{N} N表示高斯分布, I d I_d Id表示d维单位矩阵, σ 2 \sigma^2 σ2是噪声方差,并且噪声标准差 σ \sigma σ(也称噪声幅度)与f的 l 2 l_2 l2查询灵敏度 △ f \triangle f f成正比, △ f \triangle f f被定义为:
△ f = m a x D , D ′ ∣ ∣ f ( D ) − f ( D ′ ) ∣ ∣ 2 \triangle f={max}_{{D,D}^\prime}||f(D)-f(D')||_2 f=maxD,D∣∣f(D)f(D)2

2.2 拉普拉斯机制的实现

为了保证参与方中间隐私参数不被泄露,需要对参与方上传到中央服务器的中间参数添加噪声。在FedAvg代码中,应该在server.py中参与方返回中间参数的位置添加噪声。
在这里插入图片描述

numpy自带拉普拉斯分布,调用np.random.laplace()就可以生成符合拉普拉斯分布的向量,这里举一个例子如下:

import numpy as np

noise = np.random.laplace(0, 0.1, 10)
print(noise)

上面的代码是指生成均值为0,b为0.1,向量维度为10的拉普拉斯分布,输出的结果为:
在这里插入图片描述是不是很简单,短短一行代码就可以生成拉普拉斯分布的噪声,只要应用到中间的参数中即可,由于神经网络中每层模型参数的shape都是不同的,所以要针对特定的层次生成噪声,此外生成的噪声是np.array结构,要转换为PyTorch的tensor,这就有了如下代码:

noise = torch.tensor(np.random.laplace(0, sigma, parameters.shape)).to(dev)
local_parameters[key].add_(noise)

to(dev)是为了在gpu中计算,通过如上操作拉普拉斯噪声就成功注入到中间参数中。

为了进一步探究拉普拉斯噪声幅度对模型性能的影响,在IID,两方全部参与的场景下,分别对 ϵ = 1 , 5 , 10 , 100 \epsilon=1,5,10,100 ϵ=1,5,10,100进行实验,结果如下所示:
在这里插入图片描述
可以明显看出,隐私预算越大,产生的噪声越少,对模型的性能影响就越小,当隐私预算小于10时,模型的性能受到明显的影响,甚至不如随机猜测得到的结果,因此在保证模型性能的前提下,最大程度保护参与方隐私, ϵ \epsilon ϵ设置为10最为合适。

2.2 高斯机制的实现

高斯机制的实现就更为简单了,torch中内置了高斯分布的函数,直接调用即可生成,如下所示:

torch.cuda.FloatTensor(parameters.shape).normal_(0, sigma)

生成的noise和拉普拉斯噪声处理方式相同,也是直接加到对应层的模型参数中,为了进一步探究高斯噪声幅度对模型性能的影响,在IID,两方全部参与的场景下,分别对 ϵ = 1 , 5 , 10 , 100 \epsilon=1,5,10,100 ϵ=1,5,10,100进行实验,结果如下所示:
在这里插入图片描述
和拉普拉斯机制对模型的影响结果差不多,隐私预算越大,产生的噪声越少,对模型的性能影响就越小,当隐私预算小于10时,模型的性能受到明显的影响,甚至不如随机猜测得到的结果,因此在保证模型性能的前提下,最大程度保护参与方隐私, ϵ \epsilon ϵ设置为10最为合适。此外对比相同的隐私预算下,高斯机制对模型性能的影响要小于拉普拉斯机制的影响,这也是和它们分布满足的DP等级有关。

2.3 差分隐私整合

为了进一步完善代码,方便执行多种差分隐私操作,首先设定超参数noise,0表示没有噪声,1表示拉普拉斯噪声,2表示高斯噪声,设定超参数sigma,表示噪声的幅度,进一步,封装生成噪声函数,代码如下:

def add_noise(parameters, dp, dev): 
    noise = None
    # 不加噪声
    if dp == 0:
        return parameters
    # 拉普拉斯噪声
    elif dp == 1:
        noise = torch.tensor(np.random.laplace(0, sigma, parameters.shape)).to(dev)
    # 高斯噪声
    else:
        noise = torch.cuda.FloatTensor(parameters.shape).normal_(0, sigma)
    
    return parameters.add_(noise)
    

函数输入中间参数,噪声机制,以及使用的GPU环境,返回添加噪声的中间参数。这样在参与方返回中间参数的位置只需要调用噪声生成函数即可,如下所示:
在这里插入图片描述
差分隐私部分的实现还是特别简单的,只需要短短几行代码就可以实现,当然前提是对整个代码的逻辑都理解的透彻,否则即使知道如何实现,但是不知道应该运用在何处,也只是徒劳。
差分隐私虽然能够混淆隐私数据,但是对于数据的效果如同打上马赛克一样,还是会有隐私信息泄露,下一章将加入同态加密算法Paillier,对隐私进一步进行保护。

3. Paillier同态加密算法

Paillier加密算法是一种非对称加密算法,支持同态加密,特别是在加法同态方面表现出色。这意味着使用Paillier加密算法,可以在不解密的情况下,对加密数据进行算术加法操作,这使得Paillier算法在隐私保护计算和安全多方计算等领域有着广泛的应用。

Paillier算法实现步骤如下:

密钥生成:

  1. 生成两个大素数p,q(p,q要等长)。
  2. 计算 n = p q {\displaystyle n=pq} n=pq λ = lcm ⁡ ( p − 1 , q − 1 ) {\displaystyle \lambda =\operatorname {lcm} (p-1,q-1)} λ=lcm(p1,q1),lcm表示最小公倍数。
  3. 定义 L ( x ) = x − 1 n {\displaystyle L(x)={\frac {x-1}{n}}} L(x)=nx1
  4. 随机选取一个小于n2的正整数g,并且存在 μ = ( L ( g λ   m o d   n 2 ) ) − 1   m o d    n {\displaystyle \mu =(L(g^{\lambda }{\bmod {n}}^{2}))^{-1}{\bmod\, {n}}} μ=(L(gλmodn2))1modn
  5. 得到公钥(n,g),私钥(λ,μ)。

加密过程:
对于明文m, m ∈ Z n m\in \mathbb{Z}_{n} mZn,选择随机数 r < n r<n r<n,加密过程为 c = g m r n (   m o d    n ) c=g^mr^n(\bmod\, n) c=gmrn(modn)

解密过程:
m = L ( c λ   m o d    n 2 ) ∗ u   m o d    n = L ( c λ   m o d    n 2 ) L ( g λ   m o d    n 2 ) m=L(c^\lambda \bmod\, n^2)*u \bmod\, n=\frac{L(c^\lambda \bmod\, n^2)}{L(g^\lambda \bmod\, n^2)} m=L(cλmodn2)umodn=L(gλmodn2)L(cλmodn2)

公式的正确性验证就不继续推导了,感兴趣的朋友可以从网上查阅相关内容。接下来的内容主要针对Paillier算法在联邦学习实战中的应用。

3.1 FedAvg应用

Paillier算法被封装在phe包中,直接调用该包即可,由于加密密钥是中央服务器下发的,因此密钥也是在中央服务器端生成,代码如下:

public_key, private_key = paillier.generate_paillier_keypair(n_length=1024)

接着封装好中间参数加密函数和解密函数,如下所示:

# torch转list加密
def encrypt_vector(public_key, parameters):
    parameters = parameters.flatten(0).cpu().numpy().tolist()
    parameters = [public_key.encrypt(parameter) for parameter in parameters]
    return parameters
    
# list解密
def decrypt_vector(private_key, parameters):
    parameters = [private_key.decrypt(parameter) for parameter in parameters]
    return parameters

因为Paillier只能对list数组中的元素进行加密,因此在加密函数中,首先将parameters由tensor展平转换为list,再对其中的元素进行加密,注意,密文是存储在public_key类中,因此返回的parameter数组是public_key类数组,这样才能由解密类进行解密,返回后的list虽然是明文,但是是展平的list明文,因此在这个过程中,要提前存储模型每层的形状,然后将解密的list转为Tensor进行Reshape,从而恢复为原始形状,再更新全局参数。

# 可视化进度条对选中参与方local_epoch
for client in tqdm(clients_in_comm):
    # 本地梯度下降
    local_parameters = myClients.clients_set[client].localUpdate(args['local_epoch'], args['batch_size'], net,
                                                                 loss_func, opti, global_parameters)
    
    # 初始化sum_parameters
    if sum_parameters is None:
        sum_parameters = {}
        parameters_shape = {}
        for key, var in local_parameters.items():
            sum_parameters[key] = var.clone()
            parameters_shape[key] = var.shape
            sum_parameters[key] = add_noise(sum_parameters[key], dp, dev)
            sum_parameters[key] = encrypt_vector(public_key, sum_parameters[key])

    else:
        for key in sum_parameters:
            sum_parameters[key] = np.add(sum_parameters[key], encrypt_vector(public_key, add_noise(local_parameters[key], dp, dev)))

# 更新全局梯度参数
for var in global_parameters:
    sum_parameters[var] = decrypt_vector(private_key, sum_parameters[var])
    sum_parameters[var] = torch.reshape(torch.Tensor(sum_parameters[var]), parameters_shape[var])
    global_parameters[var] = (sum_parameters[var].to(dev) / num_in_comm)

这里有一点需要注意,项目代码结构中,解密过程也是运行在中央服务器,这样的实现过程只能防范加密信息在传输过程中隐私泄露问题,但是不能够防范诚实但好奇的中央服务器对隐私参数的攻击,正确的做法应该是将全局加密参数分发给参与方,参与方在本地接收并解密获得全局加密参数,这才是最安全的中间参数传输方式。(这一点将会在日后应用在真实场景下进行改进,因为代码仍然是本地测试,如果模拟一千个参与方,这样的解密成本将会是巨大的。)

3.2 性能测试

接下来对经过Paillier加密的FedAvg进行性能评估,选定两方参与的联邦学习环境,在MNIST数据集下,模型训练迭代情况如下图所示:
在这里插入图片描述取消加密,模型训练情况如下图所示:
在这里插入图片描述
Paillier算法对模型训练过程中的精确度没有任何影响,但是效率大大降低,在没有经过同态加密的情况下,每轮全局迭代只需要3s,但是经过Paillier加密之后,每轮全局迭代耗费235s,效率比接近80,但是也只有通过这样的方式,才能更好保护参与方的隐私。

4. 项目总结

本篇博客算是联邦学习比较硬核的项目了,虽然只能在本地单机模拟运行,但是万变不离其中,掌握其中的思想在联机情况下同样可以很好运行。通过同态加密算法Paillier和差分隐私结合,这样的混合加密机制可以很好保护参与方的隐私,并且即使解密后的参数被中央服务器获取,经过扰动后的中间参数也无法很好复原原始图像,等于说是给参与方的隐私参数层层设防。此外,除了MNIST数据集,现在的代码也已经更新了对CIFAR-10数据集的支持。最后,如果本篇博客对您有所帮助,或者代码对您有所启发,您的一个点赞将会是我巨大的动力!

参考链接

https://www.jianshu.com/p/8b15be58a5ee
https://blog.csdn.net/sinianluoye/article/details/82855059
https://github.com/heroding77/fedavg_encrypt
https://github.com/WHDY/FedAvg
https://zhuanlan.zhihu.com/p/259282416

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐