Spark MLlib 朴素贝叶斯NaiveBayes 源代码分析

基本原理介绍


首先是基本的条件概率求解的公式。

P(A|B)=P(AB)P(B)

在现实生活中,我们经常会碰到已知一个条件概率,求得两个时间交换后的概率的问题。也就是在已知P(A|B)的情况下,如何求得P(B|A). 其中P(A|B)表示在事件B发生的前提下, 事件A发生的概率。
其中P(A)指的是先验概率或者叫做边缘概率。因为他不需要考虑任何B方面的影响。
P(A|B)指的是B发生后A的条件概率。
P(B)是B的先验概率或者是边缘概率,也被称为标准化常量(normalized constant)。
这里给出贝叶斯的计算公式:

P(B|A)=P(A|B)P(B)P(A)


对于给定的分类项,求解此分类项在各个类别下出现的概率。哪个最大,哪个就被认为是分好类的类型。

假设 A={a1,a2,a3…}为待分类项,a1,a2是每个待分类项的属性。
有一个类别的集合B={y1,y2….}.我们需要计算各个类别的先验概率,并取对数,计算公式如下.

p(i)=log(p(yi))
=log((i类别的次数+平滑因子)/(总次数+类别数*平滑因子))

我们需要计算P(y1|A),P(y2|A),P(y3|A)等等。求得计算结果的最大值,我们可以认为这个特征属性的类别为y。

首先我们需要计算在每一个类别下的各个特征属性的概率,即P(a1|y1),P(a2|y1),P(a3|y1),
P(a1|y2),P(a2|y2),P(a3|y2,P(a1|y3),P(a2|y3),P(a3|y3)等。

在各个类别下的各个特征属性的条件概率估计,并取对数。
theta(i)(j)=log(p(aj|yi))
=log(sumTermFreqs(j)+平滑因子)-thetaLogDenom.

其中theta(i)(j)是i类别下j的特征的概率,sumTermFreqs(j)是特征j出现的次数,thetaLogDenom 分为如下的两个类型:

  • 1 多项式模型:thetaLogDenom=log(sumTermFreqs.values.sum+numFeatures*lamda).其中sumTermFreqs.values.sum解释为类i下的单词总数,numFeatures是特征数量,lamda是平滑因子。
  • 2 伯努利模型:thetaLogDenom=log(n+2.0*lamda).
    文本分类的时候n可以视作是类别i下的单词总数。lamda表示平滑因子。

假设各个特征属性的概率是相互独立的,有贝叶斯定理,我们有,

P(yi|A)=P(A|yi)P(yi)P(A)

由于分母为常数,所以只需要分子求值即可。又因为各个属性相互独立。可以得到:

P(A|yi)P(yi)=P(a1|yi)P(a2|yi)P(a3|yi)=P(yi)j=1m(P(aj|yi))

对上式同取log。

log(p(x|yi)p(yi))=log(p(yi))+j=1mlog(p(aj|yi))


spark 的源码实现分析:
分布式的实现方法大致如下:首先对样本进行聚合操作,统计所有标签出现的次数,和对应的特征之和。即对labledPoint(label, features)这样子的一个元祖采用一个combineByKey聚合函数:对同一标签数据进行聚合统计操作。
通过聚合操作后,可以通过聚合结果先验概率,条件概率。然后返回一个贝叶斯模型。
预测的时候,将需要预测的样本向量乘以theta矩阵,即条件概率矩阵。然后计算每个样本属于每个类别的概率,然后选取其中的最大项作为类别。


NaiveBayes 源码大致框架:

  • 1,NaiveBayes分类伴生对象:NaiveBayes
  • 1.1 包含静态train方法,根据输入参数,初始化NaiveBayes类,通过run方法进行训练。
  • 2,贝叶斯分类:NaiveBayes
  • 2.1 run方法:开始训练贝叶斯模型,这个方法通过计算各个类别下的先验概率和条件概率来计算贝叶斯模型。
  • 3 贝叶斯模型类:NaiveBayesModel:
  • 4 预测计算:predict方法: 根据之前计算的先验概率和条件概率,计算样本属于每个类别的概率。去最大的项做为样本类别。

好了接下来我们来看看NaiveBayes的源代码,首先是其伴生对象。


1,object NaiveBayes:
伴生对象主要定义了训练静态贝叶斯的分类模型的train方法。train方法通过设置训练参数进行模型训练,主要有如下的参数:

input:训练样本,其格式为RDD[LabeledPoint],其中labeledPoint格式为(label,features)。
lamda:平滑因子,防止分母出现0.

/**
 *朴素贝叶斯的伴生对象。
 */
@Since("0.9.0")
object NaiveBayes {

  private[spark] val Multinomial: String = "multinomial" //表示的是多项式类型

  private[spark] val Bernoulli: String = "bernoulli" // 表示的是伯努利类型


  private[spark] val supportedModelTypes = Set(Multinomial, Bernoulli)

  //训练贝叶斯模型,根据训练样本,类型为RDD[LabeledPoint]
  //LabeledPointoink,其格式为(label,features)

  def train(input: RDD[LabeledPoint]): NaiveBayesModel = {
    new NaiveBayes().run(input)
  }

  @Since("0.9.0")
  //这个train 方法,除了上一个的基本参数之外,传入了一个平滑因子,lamda
  //
  def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
    new NaiveBayes(lambda, Multinomial).run(input)
  }


  @Since("1.4.0")
  //输入样本,平滑因子,还有模型的类别,分别为多项式类型,和伯努利类型。
  def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel = {
    require(supportedModelTypes.contains(modelType),
      s"NaiveBayes was created with an unknown modelType: $modelType.")
    new NaiveBayes(lambda, modelType).run(input)
  }

}

2, 接下来是NaiveBayes的主类:
class NaiveBayes:我们首先来看看它的基本的构造器和基本方法:

/**
 *朴素贝叶斯分类器的类,训练一个朴素贝叶斯模型,根据rdd样本数据,
 * 其格式为(label,features).
 *
 *训练朴素贝叶斯分类器模型,可以通过TF-IDF 生成向量。用于文档分类,
 *如果让向量为0-1模式,则可以应用于bernoulli NB,输入的特征必须是非负的。
 *
 */

 class NaiveBayes private (
    private var lambda: Double,
    private var modelType: String) extends Serializable with Logging {

  import NaiveBayes.{Bernoulli, Multinomial}

  @Since("1.4.0")
  def this(lambda: Double) = this(lambda, NaiveBayes.Multinomial)

  @Since("0.9.0")
  def this() = this(1.0, NaiveBayes.Multinomial) //在这个参数里面默认的平滑因子是1

  /** Set the smoothing parameter. Default: 1.0. */
  @Since("0.9.0")
  def setLambda(lambda: Double): NaiveBayes = { //设置平滑因子。
    this.lambda = lambda
    this
  }

接下来我们来看看整个朴素贝叶斯最为重要的run方法,所有的核心代码都在这个里面。

  • 1 NaiveBayes的run方法用来训练模型,这个方法主要用于计算先验概率和条件概率。这个方法的实现的逻辑是:首先对样本进行聚合。以label为key,调用combineByKey方法,聚合里面的同一个label的features,得到所有的label的统计(label,(count, features之和))。

  • 2 根据先验概率的计算公式p=log((n+lamda)/(numDocuments+numLabels*lamda))计算每个label的先验概率。根据条件概率log((sumTermFreqs(j)+lamda)/thetaLogDenom)计算每个label的先验概率。

  • 3 根据条件概率log((sumTermFreqs(j)+lamda)/thetaLogDenom)计算在各个label下面的各个features的条件概率,返回的是一个二维数组。

  • 4 最后通过标签列表,类别先验概率,特征的条件概率,类型生成一个朴素贝叶斯模型。

run方法的代码如下:

def run(data: RDD[LabeledPoint]): NaiveBayesModel = {


    //在这个里面定义了一个函数,来判断输入的特征向量的值。要求所有的向量值非负
    val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
      val values = v match {   //判断向量的类型,是密集向量还是稀疏向量
        case sv: SparseVector => sv.values // 根据不同类型的向量,得到其value值。
        case dv: DenseVector => dv.values
      }
      //判断是不是所有的值都大于0,否则抛出一个错误。多项式型需要每一个value值得大于0.
      if (!values.forall(_ >= 0.0)) {
        throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")
      }
    }

    //检测所有的伯努利的值,要求所有的向量值为0或者是1.
    val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
      val values = v match {
        case sv: SparseVector => sv.values
        case dv: DenseVector => dv.values
      }

      if (!values.forall(v => v == 0.0 || v == 1.0)) {
        throw new SparkException(
          s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")
      }
    }


    //从这个地方开始对数据进行计算。
    //对于每一个特征进行聚合,求得每一个标签的对应的特征的频数,
    //aggretaded表示通过label为key,聚合同一个label的features特征。他的返回格式是 (label,(计数,features之和))

    //注意这个combineByKey 算子: 其中,createCombiner表示,当combineByKey第一次遇到值为k的Key时,调用createCombiner函数,将v转换为c
    //然后是第二个mergeValue: combineByKey不是第一次遇到值为k的Key时,调用mergeValue函数,将v累加到c中
    //mergeCombiners:将两个c,合并成一个。

    ///注意首先计算的是条件概率,返回个是为(label,(计数,features之和)),
    //表示每个样本标签的数量的和,和其对应的样本向量的特征之和。

    val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)](


      //完成从V->C类型的转换,(v:Vector)=》(c:(long,vector))
      createCombiner = (v: Vector) => {  // 根据上面所说的,输入的是一个vector,通过createCombiner函数将
        if (modelType == Bernoulli) {    //将这个v转换成c的格式。格式为(1,densevector)类型。第二步需要做的就是每次遇到一个v,将其合并为c。
          requireZeroOneBernoulliValues(v)
        } else {
          requireNonnegativeValues(v)
        }
        (1L, v.copy.toDense)  //这个是第一个函数的返回值,即将(c:Vector)=》(v:(Long,DenseVector))
      },

      //mergeValue指的是。当接下来遇到vector的时候,将vector合并到c中去。前提是他们的key必须相同。
      //过程如下:(c:(Long,DenseVector),v:Vector => (c:(Long,DenseVector))) 
      mergeValue = (c: (Long, DenseVector), v: Vector) => {
        requireNonnegativeValues(v) //判断向量的是否符合条件
        BLAS.axpy(1.0, v, c._2)  //c._2=c._2+v
        (c._1 + 1L, c._2) //这个地方继续返回一个元祖,其中对于第一个值进行加1操作。这里的c._2表示的是c._2=c._2+v
      },


      //接下来对根据相同的key来合并多个c。
      //mergeCombiners。过程如下(c1:(Long, DenseVector),c2: (Long, DenseVector)) => c:(c:(Long,DenseVector))
      mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => {
        BLAS.axpy(1.0, c2._2, c1._2) //c1._2=c1._2+c2._2
        (c1._1 + c2._1, c1._2)  // 返回一个c。表示在一个key的情况下的每一个特征的数量之和加上其向量之和。
      }
    ).collect().sortBy(_._1)  //有一个排序操作是根据key来排序


    val numLabels = aggregated.length //aggregated的长度表示的是类别标签的个数

    var numDocuments = 0L  //这个表示文档的数量
    //注意这个aggredaged的格式。首先它是一个数组。数组的元素的元祖的第二个值的格式为(Long,DenseVector)格式。
    //下面的这个n表示的是某一个特征下的数量。这个方法主要是对其进行汇总。总文档的数量进行汇总

    aggregated.foreach { case (_, (n, _)) =>
      numDocuments += n
    }

    //这个是用于计算特征的数量
    val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }

       //labels类别标签的列表。
    val labels = new Array[Double](numLabels)

    //pi类别的先验概率
    val pi = new Array[Double](numLabels)

    //这个表示theta这个特征在各个类别下的概率。是个二维数组,theta[i][j],i表示第i个类别,j表示第j个特征。
    val theta = Array.fill(numLabels)(new Array[Double](numFeatures))

    //计算总文档数量的对数值,用于计算theta值。在下面使用到。    
    val piLogDenom = math.log(numDocuments + numLabels * lambda)
    var i = 0
    aggregated.foreach { case (label, (n, sumTermFreqs)) =>

      //aggregated的每一行表示label,和计数还有其特征向量之和。
      labels(i) = label //表示取出第一个标签放在lebels(i)里面。

      pi(i) = math.log(n + lambda) - piLogDenom //计算先验概率,并取log。log((n+lamda)/(numDocuments+numlabels*lamda))

      val thetaLogDenom = modelType match {
        case Multinomial => 
        math.log(sumTermFreqs.values.sum + numFeatures * lambda) //多项式模型,比如说计算类a下的文章的总数
        case Bernoulli =>
         math.log(n + 2.0 * lambda) //贝努力模型,比如说用于计算类a下的文章的总数。
        case _ =>
          throw new UnknownError(s"Invalid modelType: $modelType.")
      }
      var j = 0 //每一个i类别下,都需要计算j类别的条件概率,每次j从0开始
      while (j < numFeatures) {
        //这个用于计算各个特征在各个类别下的条件概率
        //表示的是类别i下这个特征j的次数,除以总的出现次数
        theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
        j += 1
      }
      i += 1  //这里面有两层循环,i表示的是类别。j表示的是特征。先从类别开始循环。然后在开始对特征进行循环。
    }

     //最后生成模型。包括类别标签列表,类别先验概率,各个类别下每个特征的条件概率,多项式和伯努利
    new NaiveBayesModel(labels, pi, theta, modelType)
  }
}

3 最后是贝叶斯分类模型 class NaiveBayesModel

训练完成后,会生成贝叶斯分类模型。其包含如下参数:labels–类别标签列表. pi–每个类别的先验概率,theta–各个特征在各个类别下的先验概率,modelType–多项式或者是伯努利模型。
模型类主要包含一下的方法,即predict方法,load方法和save方法。
首先我们来看看其构造器:

class NaiveBayesModel private[spark] (
    @Since("1.0.0") val labels: Array[Double],  //这个表示的是labels数组
    @Since("0.9.0") val pi: Array[Double],     //这个表示的是先验概率数组
    @Since("0.9.0") val theta: Array[Array[Double]], //这个表示theta这个特征在各个类别下的概率。条件概率数组,二维数组。
    @Since("1.4.0") val modelType: String)  //这个表示类别
  extends ClassificationModel with Serializable with Saveable 

  private[mllib] def this(labels: Array[Double], pi: Array[Double], theta: Array[Array[Double]]) =
    this(labels, pi, theta, NaiveBayes.Multinomial)

接下来是predict方法,默认的输入参数的类型是RDD[Vector],这个方法内部调用的是predict(testData:Vector)方法。该方法的返回值是一个double类型。

//这个方法会将输入的rdd转换为向量,然后调用下一个predict方法
override def predict(testData: RDD[Vector]): RDD[Double] = {
    val bcModel = testData.context.broadcast(this)    //广播一下模型,
    testData.mapPartitions { iter => //采用mappartition进行操作,对每一个分区进行操作。
      val model = bcModel.value //在每个分区里面获取广播变量值,
      iter.map(model.predict) //对分区的每一个元素,调用predict方法。调用的是下面一个方法。
    }
  }

  override def predict(testData: Vector): Double = {
    modelType match {
      case Multinomial => //如果是多项式类型的,则调用multinomialCalculation
        labels(multinomialCalculation(testData).argmax) //注意这个地方,会调用求最大值,下面类似
      case Bernoulli =>   //如果是伯努利类型的,bernoulliCalculation
        labels(bernoulliCalculation(testData).argmax)
    }
  }

下面是两个计算方法,分别是multinomialCalculation和bernoulliCalculation。

private def multinomialCalculation(testData: Vector) = {
    val prob = thetaMatrix.multiply(testData) //用条件概率矩阵,乘以样本向量。 theta*testData
    BLAS.axpy(1.0, piVector, prob) //prob=1.0*piVector+prob (本来是相乘的,但是取log之后变成相加,结果是一样的。)
    prob  //得到结果之后,去向量的最大值。
  }

  private def bernoulliCalculation(testData: Vector) = {
    testData.foreachActive((_, value) =>
      if (value != 0.0 && value != 1.0) { 如果不满足条件的话
        throw new SparkException(
          s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
      }
    )
    val prob = thetaMinusNegTheta.get.multiply(testData) //用条件概率矩阵,乘以样本向量。theta*testData
    BLAS.axpy(1.0, piVector, prob)  //prob=1.0*piVector+prob
    BLAS.axpy(1.0, negThetaSum.get, prob)
    prob
  }

接下来就是最基本的加载和保存的方法了。分为save方法和load方法。

 def load(sc: SparkContext, path: String): NaiveBayesModel = {
      val sqlContext = SQLContext.getOrCreate(sc)
      // Load Parquet data.
      val dataRDD = sqlContext.read.parquet(dataPath(path))
      // Check schema explicitly since erasure makes it hard to use match-case for checking.
      checkSchema[Data](dataRDD.schema)
      val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)
      assert(dataArray.length == 1, s"Unable to load NaiveBayesModel data from: ${dataPath(path)}")
      val data = dataArray(0)
      val labels = data.getAs[Seq[Double]](0).toArray
      val pi = data.getAs[Seq[Double]](1).toArray
      val theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArray
      val modelType = data.getString(3)
      new NaiveBayesModel(labels, pi, theta, modelType)
    }
def save(sc: SparkContext, path: String, data: Data): Unit = {
      val sqlContext = SQLContext.getOrCreate(sc)
      import sqlContext.implicits._

      // Create JSON metadata.
      val metadata = compact(render(
        ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
          ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
      sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))

      // Create Parquet data.
      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
      dataRDD.write.parquet(dataPath(path))
    }

好了,谢谢大家,以上就是我对于spark mllib的朴素贝叶斯的代码的解析。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐