Spark MLlib源代码解读之KMeans(下)
Spark MLlib源代码解读之KMeans(下)之前看过Kmeans的源代码,但是对于Spark KMeans生成初始中心点的方法没有理解到位,最近又看了一下,再次补充一下。Spark生成初始中心点有一个方法叫做initKMeansParallel。整个代码包含有 Kmeans类和localKmeans类,localkmeans类主要用于实现KMeans++方法来实现得到中心点。init
Spark MLlib源代码解读之KMeans(下)
之前看过Kmeans的源代码,但是对于Spark KMeans生成初始中心点的方法没有理解到位,
最近又看了一下,再次补充一下。Spark生成初始中心点有一个方法叫做initKMeansParallel。
整个代码包含有 Kmeans类和localKmeans类,localkmeans类主要用于实现KMeans++方法来实现得到中心点。
initKMeansParallel大致思路
分布式实现KMeans中心点的方式大致如下:(有点绕)。
首先就像之前博客所讲的,Spark 的KMeans真正执行的有runs个任务,这个runs需要我们自己去指定,默认为1.表示的是会去进行runs个KMeans任务,然后选择其中的最小的cost作为最合适的聚类结果。所以这个initKMeansParallel
方法最后返回的结果也是Array[Array[VectorWithNorm]]
。是一个二维数组,表示的是有runs组中心点,每组中心点有k个向量。
在真正进行迭代的时候,每次迭代都会筛选到2*k个点,这个筛选的过程是初始化一个rand用于生成随机数,
这个sumCosts表示的是之前通过aggregate聚合好的每个runs参数下的总的cost值。
可以看到如果 (2 · c(r) · k)/sumCosts(r) > rand随机生成的数的话,则将这个point选中。
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointsWithCosts.flatMap { case (p, c) =>
val rs = (0 until runs).filter { r => //这个其实是在算概率, 2* c(r)*k/sumCosts(r)> rand.nextDouble
//表示的是满足这个条件的点呗过滤下来
rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
}
if (rs.length > 0) Some(p, rs) else None //大于0的话,用some封装,否则返回none。
}
}.collect()
选中了之后,需要更新这个centers的值。
def mergeNewCenters(): Unit = { //合并新的中心点到中心
var r = 0
while (r < runs) {
centers(r) ++= newCenters(r)
newCenters(r).clear()
r += 1
}
}
注意这个mergeNewCenters,其中的centers是一个二维数组,而每一行其实是一个ArrayBuffer。merge的时候,每一行会不停的往里面添加向量。
val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
mergeNewCenters() // 合并一下新的中心点。
chosen.foreach { case (p, rs) =>
rs.foreach(newCenters(_) += p.toDense)
}
通过多次迭代之后,对于每一个runs参数我们会生成一系列的可能不少于K个的待选中心点。我们需要对每一个runs下面的中心点的参数加以权重。加权重的方式是如果在(i, j)第i个runs下的第j个待选中心点如果是最近的点的话,则((i, j), 1.0)。即这个对应的待选中心点加1,这样子的点在以后被筛选到的可能性会大。
val bcCenters = data.context.broadcast(centers)
val weightMap = data.flatMap { p =>
Iterator.tabulate(runs) { r =>
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
}
}.reduceByKey(_ + _).collectAsMap() //聚合之后返回的是一个map类型。
最后调用LocalKMeans. LocalKMeans.kMeansPlusPlus方法来求得finalCenters。
val finalCenters = (0 until runs).par.map { r =>
val myCenters = centers(r).toArray //表示的是第r个并行度下的中心点数组。
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray //获取weights的方式,刚才我们得到了一个Map类型,如果有的haul,返回值,没有的话返回0.
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
}
LocalKMeans.kMeansPlusPlus
这个方法大致是通过距离初始点尽可能远的点找到合适的中心点。当找到k个中心点之后,又再一次使用了一个local 版本的Kmeans算法来继续对这些生成的中心点,进行迭代,知道这些点不改变了之后,返回需要的结果。每次调用一次LocalKMeans.kMeansPlusPlus,返回k个中心点。总共调用runs次,则返回有[runs][k]个我们需要的中心点。
好接下来看看完整的代码吧。
private def initKMeansParallel(data: RDD[VectorWithNorm])
: Array[Array[VectorWithNorm]] = {
//初始化中心及costs,tabluate方法返回一个数组,长度为runs。
val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
// Initialize each run's first center to a random point.
val seed = new XORShiftRandom(this.seed).nextInt()
//初始化第一个中心点
val sample = data.takeSample(true, runs, seed).toSeq //随机筛选出一些中心点。
val newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) //获取一个长度为为runs的中心点数组
/** Merges new centers to centers. */
def mergeNewCenters(): Unit = { //合并新的中心点到中心
var r = 0
while (r < runs) {
centers(r) ++= newCenters(r)
newCenters(r).clear()
r += 1
}
}
// On each step, sample 2 * k points on average for each run with probability proportional
// to their squared distance from that run's centers. Note that only distances between points
// and new centers are computed in each iteration.
//每次迭代的过程,抽取2*k个样本,每次迭代计算样本点与中心店的距离
var step = 0
while (step < initializationSteps) {
val bcNewCenters = data.context.broadcast(newCenters) //新的中心点
val preCosts = costs
//j将数据点和cost通过拉链操作连接在一起,返回一个(point,cost)
//这个会在下一步的时候通过调用math。min方法,找出cost(r) 和通过kmeans的pointcost方法返回的最小的cost值
//并将这个值更新到costs数组。同时将这个costs数组cache到缓存。
costs = data.zip(preCosts).map { case (point, cost) =>
Array.tabulate(runs) { r =>
math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
}
}.persist(StorageLevel.MEMORY_AND_DISK)
//接下来聚合costs,聚合后的返回值,为一个Array,其内部的元素为Double类型。
//注意接下来这个aggregate算子,他接收三个参数,第一个参数接收一个初始值,这个初始值,首先会作用到每个分区,
//应用于每个分区的函数是接下来定义的第一个SeqOp函数。
//这个参数会在每个分区发挥作用。
//接下来有一个combOp函数,这个函数会在对每一个聚合后的结果发挥作用。相当于前面函数作用后的结果,会在后面继续发挥作用.
//需要注意的是,aggregate算子,的初始参数在第二个函数页发挥作用。而aggregateByKey算子不会发挥作用。
val sumCosts = costs
.aggregate(new Array[Double](runs))(
//接下来计算的方式如下所示,由于有一个并行度,每一个并行度会有一个自己的costs数组,所以计算costs数组的时候会
// 分开对每一个costs数组进行计算。第一个函数的意思是:合并第二个参数v(也是一个costs值)到第一个s里面。
//然后返回一个s数组。相当于每一个分区都会有这个s数组
seqOp = (s, v) => {
// s += v
var r = 0
while (r < runs) {
s(r) += v(r)
r += 1
}
s
},
//接下来对于不同的分区,计算s数组的值的和。和刚才一样,因为有一个并行度,所以默认的是对每一个并行度下的cost数组进行计算。
//然后返回这个s0。s0是一个数组。从0~runs的一个double类型的数组。每一个对应的元素包含的是对应的在第r次并行度下的cost值
combOp = (s0, s1) => {
// s0 += s1
var r = 0
while (r < runs) {
s0(r) += s1(r)
r += 1
}
s0
}
)
bcNewCenters.unpersist(blocking = false) // 去掉在内存中的缓存
preCosts.unpersist(blocking = false)
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointsWithCosts.flatMap { case (p, c) =>
val rs = (0 until runs).filter { r => //这个其实是在算概率, 2* c(r)*k/sumCosts(r)> rand.nextDouble
//表示的是满足这个条件的点呗过滤下来
rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
}
if (rs.length > 0) Some(p, rs) else None //大于0的话,用some封装,否则返回none。
}
}.collect()
mergeNewCenters() // 合并一下新的中心点。
chosen.foreach { case (p, rs) =>
rs.foreach(newCenters(_) += p.toDense)
}
step += 1
}
mergeNewCenters()
costs.unpersist(blocking = false)
val bcCenters = data.context.broadcast(centers)
val weightMap = data.flatMap { p =>
Iterator.tabulate(runs) { r =>
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
}
}.reduceByKey(_ + _).collectAsMap()
bcCenters.unpersist(blocking = false)
val finalCenters = (0 until runs).par.map { r =>
val myCenters = centers(r).toArray //表示的是第r个并行度下的中心点数组。
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
}
finalCenters.toArray
}
}
rivate[mllib] object LocalKMeans extends Logging {
def kMeansPlusPlus(
seed: Int, //种子点
points: Array[VectorWithNorm], //中心点
weights: Array[Double], //权重
k: Int, //中心点个数
maxIterations: Int //最大的迭代次数
): Array[VectorWithNorm] = { //最终这个方法返回的是一个中心点和范数数组,表示的是第r个并行度下的中心点向量和范数
val rand = new Random(seed) // 初始化随机数,最大值为seed,而seed是每个对应的runs参数,即第一个kmeans任务。
val dimensions = points(0).vector.size //中心点的维度数
val centers = new Array[VectorWithNorm](k) //存储中心点向量,共有k个。
// Initialize centers by sampling using the k-means++ procedure.
//通过kmeans++的方式来生成初始化的第一个中心点。
centers(0) = pickWeighted(rand, points, weights).toDense //第一个中心点。
for (i <- 1 until k) {
// Pick the next center with a probability proportional to cost under current centers
//接下来通过概率的方式找到剩下的中心点,距离当前中心点距离越远,可能性越大。
val curCenters = centers.view.take(i) //取出第一个,由于刚开始初始化,所以这个值为0.
val sum = points.view.zip(weights).map { case (p, w) => // (p, w)表示的是p指的是VectorWithNorm,w指的是权重。
w * KMeans.pointCost(curCenters, p) //将权重值乘以p距离当前的中心点的距离,注意当前的p也是中心点。
}.sum
val r = rand.nextDouble() * sum // 将这个值最为累计score的最大值,
var cumulativeScore = 0.0
var j = 0
while (j < points.length && cumulativeScore < r) {
//找到一个尽可能接近r的累计score的值,然后记录下j。
cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
j += 1
}
if (j == 0) {
logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." +
s" Using duplicate point for center k = $i.")
centers(i) = points(0).toDense
} else {
centers(i) = points(j - 1).toDense //返回points(j-1).
}
}
// 通过上述的方式找到了k的聚类中心点。这k个聚类中心点同属于第runs个并行度下。
// Run up to maxIterations iterations of Lloyd's algorithm
//接下来有迭代maxIterations的执行Lloyd算法。
//此时计算的时候数据为我们之前的中心点数据,而上一步生成的centers数组作为中心点,然后不断的迭代,知道找到了合适的中心点。
val oldClosest = Array.fill(points.length)(-1)
var iteration = 0
var moved = true
while (moved && iteration < maxIterations) { // 如果移动了的话,
moved = false
val counts = Array.fill(k)(0.0) //这个表示属于这个点的数目,
val sums = Array.fill(k)(Vectors.zeros(dimensions)) // 这个用来累加第i个向量,
var i = 0
while (i < points.length) {
val p = points(i) // 之前的数据中心点作为数据点。
val index = KMeans.findClosest(centers, p)._1 // 表示计算的中心点的index,
axpy(weights(i), p.vector, sums(index)) // 这一步用来更新这个sum,sum(index) += p.vector * weights.
counts(index) += weights(i) /// 更新count。
if (index != oldClosest(i)) {
moved = true
oldClosest(i) = index // 如果移动了的话,更新原来存在的index。
}
i += 1
}
//每一轮计算完成之后需要重新对中心点进行更新。
// Update centers
var j = 0
while (j < k) {
if (counts(j) == 0.0) { //如果第j个中心点周边的point数目是0的话,
// Assign center to a random point
centers(j) = points(rand.nextInt(points.length)).toDense //随机安排一个点。
} else {
scal(1.0 / counts(j), sums(j)) //否则的话,更新这个中心点。
centers(j) = new VectorWithNorm(sums(j))
}
j += 1
}
iteration += 1
}
if (iteration == maxIterations) {
logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"Local KMeans++ converged in $iteration iterations.")
}
centers //最后返回中心点。
}
private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = {
val r = rand.nextDouble() * weights.sum //weights存储的是这个runs并行度下的所有的权重,这个对其进行求和。
var i = 0
var curWeight = 0.0
while (i < data.length && curWeight < r) { //找到一个累加权重尽可能接近r的i值,返回这个data
curWeight += weights(i)
i += 1
}
data(i - 1) //当这个i值尽可能的接近r的时候,返回这个i对应的元素,将其作为第一个初始化点。
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)