Spark MLlib源代码解读之KMeans(下)


之前看过Kmeans的源代码,但是对于Spark KMeans生成初始中心点的方法没有理解到位,
最近又看了一下,再次补充一下。Spark生成初始中心点有一个方法叫做initKMeansParallel。
整个代码包含有 Kmeans类和localKmeans类,localkmeans类主要用于实现KMeans++方法来实现得到中心点。


initKMeansParallel大致思路

分布式实现KMeans中心点的方式大致如下:(有点绕)。

首先就像之前博客所讲的,Spark 的KMeans真正执行的有runs个任务,这个runs需要我们自己去指定,默认为1.表示的是会去进行runs个KMeans任务,然后选择其中的最小的cost作为最合适的聚类结果。所以这个initKMeansParallel方法最后返回的结果也是Array[Array[VectorWithNorm]]。是一个二维数组,表示的是有runs组中心点,每组中心点有k个向量。


在真正进行迭代的时候,每次迭代都会筛选到2*k个点,这个筛选的过程是初始化一个rand用于生成随机数,
这个sumCosts表示的是之前通过aggregate聚合好的每个runs参数下的总的cost值。
可以看到如果 (2 · c(r) · k)/sumCosts(r) > rand随机生成的数的话,则将这个point选中。

 val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>

        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        pointsWithCosts.flatMap { case (p, c) =>
          val rs = (0 until runs).filter { r =>  //这个其实是在算概率, 2* c(r)*k/sumCosts(r)> rand.nextDouble
            //表示的是满足这个条件的点呗过滤下来
            rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
          }
          if (rs.length > 0) Some(p, rs) else None //大于0的话,用some封装,否则返回none。
        }
      }.collect()

选中了之后,需要更新这个centers的值。

def mergeNewCenters(): Unit = { //合并新的中心点到中心
      var r = 0
      while (r < runs) {
        centers(r) ++= newCenters(r)
        newCenters(r).clear()
        r += 1
      }
    }

注意这个mergeNewCenters,其中的centers是一个二维数组,而每一行其实是一个ArrayBuffer。merge的时候,每一行会不停的往里面添加向量。

 val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
  mergeNewCenters()  // 合并一下新的中心点。
      chosen.foreach { case (p, rs) =>
        rs.foreach(newCenters(_) += p.toDense)
      }

通过多次迭代之后,对于每一个runs参数我们会生成一系列的可能不少于K个的待选中心点。我们需要对每一个runs下面的中心点的参数加以权重。加权重的方式是如果在(i, j)第i个runs下的第j个待选中心点如果是最近的点的话,则((i, j), 1.0)。即这个对应的待选中心点加1,这样子的点在以后被筛选到的可能性会大。

val bcCenters = data.context.broadcast(centers)
    val weightMap = data.flatMap { p =>
      Iterator.tabulate(runs) { r =>
        ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
      }
    }.reduceByKey(_ + _).collectAsMap() //聚合之后返回的是一个map类型。

最后调用LocalKMeans. LocalKMeans.kMeansPlusPlus方法来求得finalCenters。

 val finalCenters = (0 until runs).par.map { r =>
      val myCenters = centers(r).toArray //表示的是第r个并行度下的中心点数组。

      val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray //获取weights的方式,刚才我们得到了一个Map类型,如果有的haul,返回值,没有的话返回0.
      LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
    }

LocalKMeans.kMeansPlusPlus这个方法大致是通过距离初始点尽可能远的点找到合适的中心点。当找到k个中心点之后,又再一次使用了一个local 版本的Kmeans算法来继续对这些生成的中心点,进行迭代,知道这些点不改变了之后,返回需要的结果。每次调用一次LocalKMeans.kMeansPlusPlus,返回k个中心点。总共调用runs次,则返回有[runs][k]个我们需要的中心点。


好接下来看看完整的代码吧。

 private def initKMeansParallel(data: RDD[VectorWithNorm])
  : Array[Array[VectorWithNorm]] = {

    //初始化中心及costs,tabluate方法返回一个数组,长度为runs。
    val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
    var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))

    // Initialize each run's first center to a random point.
    val seed = new XORShiftRandom(this.seed).nextInt()
    //初始化第一个中心点
    val sample = data.takeSample(true, runs, seed).toSeq //随机筛选出一些中心点。 
    val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) //获取一个长度为为runs的中心点数组 

    /** Merges new centers to centers. */
    def mergeNewCenters(): Unit = { //合并新的中心点到中心
      var r = 0
      while (r < runs) {
        centers(r) ++= newCenters(r)
        newCenters(r).clear()
        r += 1
      }
    }

    // On each step, sample 2 * k points on average for each run with probability proportional
    // to their squared distance from that run's centers. Note that only distances between points
    // and new centers are computed in each iteration.
    //每次迭代的过程,抽取2*k个样本,每次迭代计算样本点与中心店的距离
    var step = 0
    while (step < initializationSteps) {
      val bcNewCenters = data.context.broadcast(newCenters) //新的中心点
      val preCosts = costs

     //j将数据点和cost通过拉链操作连接在一起,返回一个(point,cost)
     //这个会在下一步的时候通过调用math。min方法,找出cost(r) 和通过kmeans的pointcost方法返回的最小的cost值
     //并将这个值更新到costs数组。同时将这个costs数组cache到缓存。
      costs = data.zip(preCosts).map { case (point, cost) =>
          Array.tabulate(runs) { r =>
            math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
          }
        }.persist(StorageLevel.MEMORY_AND_DISK)

      //接下来聚合costs,聚合后的返回值,为一个Array,其内部的元素为Double类型。
      //注意接下来这个aggregate算子,他接收三个参数,第一个参数接收一个初始值,这个初始值,首先会作用到每个分区,
      //应用于每个分区的函数是接下来定义的第一个SeqOp函数。
      //这个参数会在每个分区发挥作用。
      //接下来有一个combOp函数,这个函数会在对每一个聚合后的结果发挥作用。相当于前面函数作用后的结果,会在后面继续发挥作用.
      //需要注意的是,aggregate算子,的初始参数在第二个函数页发挥作用。而aggregateByKey算子不会发挥作用。

      val sumCosts = costs
        .aggregate(new Array[Double](runs))(
          //接下来计算的方式如下所示,由于有一个并行度,每一个并行度会有一个自己的costs数组,所以计算costs数组的时候会
          // 分开对每一个costs数组进行计算。第一个函数的意思是:合并第二个参数v(也是一个costs值)到第一个s里面。
         //然后返回一个s数组。相当于每一个分区都会有这个s数组
          seqOp = (s, v) => {
            // s += v
            var r = 0
            while (r < runs) {
              s(r) += v(r)
              r += 1
            }
            s
          },
          //接下来对于不同的分区,计算s数组的值的和。和刚才一样,因为有一个并行度,所以默认的是对每一个并行度下的cost数组进行计算。
          //然后返回这个s0。s0是一个数组。从0~runs的一个double类型的数组。每一个对应的元素包含的是对应的在第r次并行度下的cost值

          combOp = (s0, s1) => {
            // s0 += s1
            var r = 0
            while (r < runs) {
              s0(r) += s1(r)
              r += 1
            }
            s0
          }
        )

      bcNewCenters.unpersist(blocking = false) // 去掉在内存中的缓存
      preCosts.unpersist(blocking = false)

      val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>

        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        pointsWithCosts.flatMap { case (p, c) =>
          val rs = (0 until runs).filter { r =>  //这个其实是在算概率, 2* c(r)*k/sumCosts(r)> rand.nextDouble
            //表示的是满足这个条件的点呗过滤下来
            rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
          }
          if (rs.length > 0) Some(p, rs) else None //大于0的话,用some封装,否则返回none。
        }
      }.collect()

      mergeNewCenters()  // 合并一下新的中心点。
      chosen.foreach { case (p, rs) =>
        rs.foreach(newCenters(_) += p.toDense)
      }
      step += 1
    }

    mergeNewCenters()
    costs.unpersist(blocking = false)

    val bcCenters = data.context.broadcast(centers)
    val weightMap = data.flatMap { p =>
      Iterator.tabulate(runs) { r =>
        ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
      }
    }.reduceByKey(_ + _).collectAsMap()

    bcCenters.unpersist(blocking = false)

    val finalCenters = (0 until runs).par.map { r =>
      val myCenters = centers(r).toArray //表示的是第r个并行度下的中心点数组。
      val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
      LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
    }

    finalCenters.toArray
  }
}
rivate[mllib] object LocalKMeans extends Logging {


  def kMeansPlusPlus(
      seed: Int, //种子点
      points: Array[VectorWithNorm], //中心点
      weights: Array[Double], //权重
      k: Int, //中心点个数
      maxIterations: Int  //最大的迭代次数
  ): Array[VectorWithNorm] = { //最终这个方法返回的是一个中心点和范数数组,表示的是第r个并行度下的中心点向量和范数
    val rand = new Random(seed)  // 初始化随机数,最大值为seed,而seed是每个对应的runs参数,即第一个kmeans任务。
    val dimensions = points(0).vector.size  //中心点的维度数
    val centers = new Array[VectorWithNorm](k) //存储中心点向量,共有k个。

    // Initialize centers by sampling using the k-means++ procedure.
    //通过kmeans++的方式来生成初始化的第一个中心点。
    centers(0) = pickWeighted(rand, points, weights).toDense //第一个中心点。

    for (i <- 1 until k) {
      // Pick the next center with a probability proportional to cost under current centers
      //接下来通过概率的方式找到剩下的中心点,距离当前中心点距离越远,可能性越大。
      val curCenters = centers.view.take(i) //取出第一个,由于刚开始初始化,所以这个值为0.

      val sum = points.view.zip(weights).map { case (p, w) => // (p, w)表示的是p指的是VectorWithNorm,w指的是权重。
        w * KMeans.pointCost(curCenters, p) //将权重值乘以p距离当前的中心点的距离,注意当前的p也是中心点。
      }.sum
      val r = rand.nextDouble() * sum // 将这个值最为累计score的最大值,
      var cumulativeScore = 0.0
      var j = 0
      while (j < points.length && cumulativeScore < r) {
        //找到一个尽可能接近r的累计score的值,然后记录下j。
        cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
        j += 1
      }

      if (j == 0) {
        logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
          s" Using duplicate point for center k = $i.")
        centers(i) = points(0).toDense
      } else {
        centers(i) = points(j - 1).toDense //返回points(j-1).
      }
    }

      // 通过上述的方式找到了k的聚类中心点。这k个聚类中心点同属于第runs个并行度下。

    // Run up to maxIterations iterations of Lloyd's algorithm
    //接下来有迭代maxIterations的执行Lloyd算法。    
   //此时计算的时候数据为我们之前的中心点数据,而上一步生成的centers数组作为中心点,然后不断的迭代,知道找到了合适的中心点。
    val oldClosest = Array.fill(points.length)(-1)
    var iteration = 0
    var moved = true
    while (moved && iteration < maxIterations) { //  如果移动了的话,
      moved = false
      val counts = Array.fill(k)(0.0) //这个表示属于这个点的数目,
      val sums = Array.fill(k)(Vectors.zeros(dimensions)) // 这个用来累加第i个向量,
      var i = 0

      while (i < points.length) {
        val p = points(i) // 之前的数据中心点作为数据点。
        val index = KMeans.findClosest(centers, p)._1  // 表示计算的中心点的index,

        axpy(weights(i), p.vector, sums(index))   // 这一步用来更新这个sum,sum(index) += p.vector * weights.

        counts(index) += weights(i)   /// 更新count。

        if (index != oldClosest(i)) {
          moved = true
          oldClosest(i) = index //  如果移动了的话,更新原来存在的index。
        }
        i += 1
      }

      //每一轮计算完成之后需要重新对中心点进行更新。
      // Update centers
      var j = 0
      while (j < k) {
        if (counts(j) == 0.0) { //如果第j个中心点周边的point数目是0的话,
          // Assign center to a random point
          centers(j) = points(rand.nextInt(points.length)).toDense //随机安排一个点。
        } else {
          scal(1.0 / counts(j), sums(j))  //否则的话,更新这个中心点。
          centers(j) = new VectorWithNorm(sums(j))
        }
        j += 1
      }
      iteration += 1
    }

    if (iteration == maxIterations) {
      logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"Local KMeans++ converged in $iteration iterations.")
    }

    centers  //最后返回中心点。
  }

  private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = {
    val r = rand.nextDouble() * weights.sum  //weights存储的是这个runs并行度下的所有的权重,这个对其进行求和。
    var i = 0
    var curWeight = 0.0
    while (i < data.length && curWeight < r) { //找到一个累加权重尽可能接近r的i值,返回这个data
      curWeight += weights(i)
      i += 1
    }
    data(i - 1) //当这个i值尽可能的接近r的时候,返回这个i对应的元素,将其作为第一个初始化点。
  }
}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐