DBSCAN聚类算法及其参数配置-python实现
DBSCAN聚类算法是一种基于空间密度有传递性质的聚类算法,将簇定义为密度相连的点的最大的集合,可以将高密度点区域划分为簇,并有效地过滤低密度点区域,可以在含有噪声的数据集中识别任意形状和数量的簇。
本文参考论文:
平时没怎么用py,命名什么的可能不太规范..
目录
一、算法介绍
DBSCAN聚类算法是一种基于空间密度有传递性质的聚类算法,将簇定义为密度相连的点的最大的集合,可以将高密度点区域划分为簇,并有效地过滤低密度点区域,可以在含有噪声的数据集中识别任意形状和数量的簇。
(一)算法基本概念
DBSCAN算法的关键思想是:对于聚类的每个点,给定半径的邻域必须包含至少最小数量的点,即邻域内的密度必须超过某个阈值。数据集中特定点的密度通过该点的Eps半径之内包含的点数来计算。
Eps邻域:给定对象半径Eps内的邻域,用NEps(p)表示点p的Eps半径内的点的集合。
MinPts:给定邻域NEps(p)需要包含的点的最小点数,用于决定点p是簇的核心部分、边界点或噪声。
核心点:如果点p的Eps邻域内包含至少MinPts个点,则该点为核心点。
边界点:不是核心点,但在某个核心点的Eps邻域内。
噪声点:即不是核心点,也不是边界点的点。
直接密度可达:如果p在q的Eps邻域内,且q是一个核心点,则p从q出发是直接密度可达的。
密度可达:如果存在一个对象链p1,p2,……,pn,如果pi+1是从pi出发关于Eps和MinPts直接密度可达的,那么pn是从p1关于Eps和MinPts密度可达的。
密度相连:如果p和q都是从O关于Eps和MinPts直接密度可达的,那么p和q是密度相连的。
(二)算法执行步骤
1.选择一个未访问的数据点p作为起始点;
2.计算点p的Eps邻域内的数据点数量,如果数量大于等于MinPts,则将p标记为核心点,并创建一个新的簇;
3.将点p加入到当前簇中,并将p的Eps邻域内的所有未访问数据点加入到当前簇中;
4.对当前簇中的每个数据点q进行以下操作:
(1)如果q是核心点,则将q的Eps邻域内的所有未访问数据点加入到当前簇中;
(2)如果q不是核心点,但是位于其他簇的Eps邻域内,则将q标记为边界点,并将其加入到当前簇中;
5.当没有更多的数据点可以加入到当前簇时,当前簇被认为是一个完整的簇;
6.选择下一个未访问的数据点作为起始点,重复步骤2到步骤5,直到所有的数据点都被访问过;
7.标记剩余未分配的数据点为噪声点。
图片来自开头提到的第二篇论文
(三)算法优缺点
1.优点
(1)DBSCAN算法可以发现任意形状的簇。
(2)DBSCAN算法相比其他聚类算法,无需实现指定簇的数量,而通过设置合适的邻域半径和簇最小点数来自动确定簇的数量。
(3)DBSCAN算法将低密度区域视为噪声,不将其归入任何簇中,对于噪声不敏感,具有鲁棒性。
2.缺点
(1)DBSCAN算法对邻域半径、簇最小点数两个参数敏感,不同的参数选择可能会导致完全不同的聚类结果。
(2)如果数据集的密度不均匀,聚类结果可能会受影响,簇的形状可能会不连续或断裂。
二、参数配置
这部分用python实现了文章2的方式,利用图标可视化结果确定参数。
(一)确定eps半径值
计算每个数据点的距离,生成距离矩阵Distn*n,然后将距离矩阵按行升序排序,这样矩阵的每一行就是相应数据点到其他所有点距离的排序,每一列就是距离每个数据点最近的第i个距离值的集合;然后按列绘制距离值的概率密度分布曲线。
1.dist.py
import math
import numpy as np
# 计算两点间距离
def calculate_dist(t1, t2):
dis = math.sqrt(np.power((t1[0] - t2[0]), 2) + np.power((t1[1] - t2[1]), 2))
return dis
2.caculate_point_dist.py
import dist
# 得到每个点之间的距离矩阵
def calculate_point_dist(data):
length = len(data)
point_dist = dist.np.zeros((length, length))
for i in range(0, length):
for j in range(i + 1, length):
distance = dist.calculate_dist(data[i], data[j])
point_dist[i][j] = distance
point_dist[j][i] = distance
dist.np.save('point_dist.npy', point_dist)
dist.np.savetxt('point_dist.csv', point_dist)
return point_dist
3.search_eps.py
import calculate_point_dist as cpd
import csv
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn
if __name__ == '__main__':
# 创建一个空列表来存储数据
data = []
# 打开 CSV 文件
with open('random_data.csv', 'r') as file:
# 创建 CSV 读取
reader = csv.reader(file)
next(reader)
# 逐行读取数据并添加到列表中
for row in reader:
data.append([int(value) for value in row])
data = np.array(data)
np.save('random_data.npy', data)
distance = cpd.calculate_point_dist(data)
# 每一行升序排序
distance = np.sort(distance, axis=1)
plt.figure()
# 遍历每一列
for i in range(1, distance.shape[1]):
column_data = distance[:, i]
# 绘制概率密度分布曲线
sn.kdeplot(column_data)
# 显示图形
plt.xlabel('Distance')
plt.ylabel('Density')
plt.title('Probability Density Distribution')
plt.show()
生成的概率密度曲线:
由图可见,距离为70的密度已经变得非常小,这些比例很小却和其他点距离明显远很多的点,是噪声的可能性很大,因此可以选取Eps为70。
(二)确定minPts邻域内点数
计算每个数据点的局部密度值(即Eps为70的邻域范围内的点数)ρi,然后计算每个点距离更高密度点的最近距离,若是最高密度点,则取到数据集中最远点的距离δi。然后以局部密度值为横坐标,到更高密度点的最近距离为纵坐标进行绘图。
1.count_points.py
import dist
# 计算每个点邻域范围内的点数
def count_points(data, distance, eps):
length = len(data)
points = dist.np.ones(length)
for i in range(0, length):
# marked = -1
for j in range(i + 1, length):
# if data[j][0] != marked:
if distance[i][j] <= eps:
points[i] = points[i] + 1
points[j] = points[j] + 1
# else:
# marked = data[j][0]
return points
def count_points_2(data, eps):
length = len(data)
points = dist.np.ones(length)
for i in range(0, length):
marked = -1
for j in range(i + 1, length):
if data[j][0] != marked:
distance = dist.calculate_dist(data[i], data[j])
if distance <= eps:
points[i] = points[i] + 1
points[j] = points[j] + 1
else:
marked = data[j][0]
return points
2.find_minPts.py
import dist
# 计算每个点与更高密度点的最近距离
def find_minPts(sorted_points, distance):
length = len(sorted_points)
min_dist = dist.np.zeros(length)
for i in range(0, length):
index_i = sorted_points[i][0]
temp_min_distance = max(distance[index_i, :])
for j in range(i + 1, length):
index_j = sorted_points[j][0]
if distance[index_i][index_j] < temp_min_distance:
temp_min_distance = distance[index_i][index_j]
min_dist[index_i] = temp_min_distance
return min_dist
def find_minPts_2(sorted_points, data):
length = len(sorted_points)
min_dist = dist.np.zeros(length)
for i in range(0, length):
index_i = sorted_points[i][0]
temp = -1
distance = 1000000000
for j in range(i + 1, length):
if sorted_points[j][1] > sorted_points[i][1]:
index_j = sorted_points[j][0]
current_distance = dist.calculate_dist(data[index_i], data[index_j])
if current_distance < distance:
temp = index_j
distance = current_distance
if temp == -1:
min_dist[index_i] = dist.calculate_dist(data[index_i], data[0])
else:
min_dist[index_i] = distance
return min_dist
3.search_minPts.py
import numpy as np
import calculate_point_dist
import count_points
import find_minPts
import csv
import matplotlib.pyplot as plt
if __name__ == '__main__':
eps = 70
data = np.load('random_data.npy')
distance = np.load('point_dist.npy')
points = count_points.count_points(data, distance, eps)
sorted_data = sorted(enumerate(points), key=lambda x: x[1])
min_dist = find_minPts.find_minPts(sorted_data, distance)
# points = count_points.count_points_2(data, 20)
# sorted_data = sorted(enumerate(points), key=lambda x: x[1])
# min_dist = find_minPts.find_minPts_2(sorted_data, data)
np.savetxt('pts.csv', points, delimiter=',', fmt='%d')
np.savetxt('min_dist.csv', min_dist, delimiter=',', fmt='%d')
plt.scatter(points, min_dist, 0.5)
plt.xlabel('pts')
plt.ylabel('min_dist')
# 以eps绘制水平线
plt.axhline(y=eps, color='r', linestyle='-')
plt.show()
每个点的ρi与δi的函数关系
由图所示,δi<70(即图中红线以下)的数据点,皆为与更高密度点距离小于Eps的数据点。根据图选取MinPts值,ρi>=MinPts的数据点皆为核心点,ρi<MinPts且δi<=70的数据点可能为边界点或噪声点,而ρi<MinPts且δi>70的数据点皆为噪声点。
三、DBSCAN算法python实现
本文选取Eps为70,MinPts范围为20-70,并使用预先计算得到的欧氏距离矩阵,在python环境下调用sklearn库进行实验。
(一)实验步骤
1.调用sklearn库中的DBSCAN函数进行聚类得到标签;
2.根据标签计算噪声比,绘制聚类结果图像;
3.获取非噪声点的标签,筛选距离矩阵和标签,调用sklearn库中的silhouette_samples函数计算聚类结果中每个非噪声点的轮廓系数,并求其均值得到聚类整体轮廓系数。
(二)评估指标
1.轮廓系数
聚类结果的评价可转化为对于簇内和簇见相似度的衡量,簇的凝聚性度量簇内样本密切程度,而分离性度量簇间相异程度,较小的簇内紧密度和较大的簇间分离度可以使聚类结果更加理想化。轮廓系数结合了簇内紧密度和簇间分离度,是类的密集与分散程度的评价指标。轮廓系数的范围为[-1,1],轮廓系数越接近1表示聚类效果越好。
聚类总体的轮廓系数为所有样本轮廓系数的平均值。
2.其他指标
聚类结果一般都采用轮廓系数评估,但是针对实际解决的问题,也可以结合噪声比、簇数量等进行评估。
(三)代码实现
1.generate_colors.py
import colorsys
import numpy as np
def generate_colors(labels):
unique_labels = np.unique(labels)
num_colors = np.size(unique_labels)
colors = []
for i in range(num_colors):
hue = i / num_colors # 在0到1之间均匀分布色相值
saturation = 1.0 # 最大饱和度
value = 1.0 # 最大亮度
rgb = colorsys.hsv_to_rgb(hue, saturation, value)
colors.append(rgb)
result = []
size = len(labels)
for i in range(0, size):
index = np.where(unique_labels == labels[i])
result.append(colors[index[0][0]])
return result
2.dbscan.py
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn import metrics
import matplotlib.pyplot as plt
import generate_colors
def dbscan(data, distance, eps, min_pts):
# 使用 DBSCAN 进行聚类
dbscan_object = DBSCAN(eps=eps, min_samples=min_pts, metric='precomputed')
labels = dbscan_object.fit_predict(distance)
# 噪声数
noise_num = np.sum(labels == -1)
noise_ratio = noise_num / len(labels)
cluster_number = np.size(np.unique(labels))
if noise_num > 0:
cluster_number = cluster_number - 1
# 保存标签
labels_file_path = ('result/labels(eps=' + str(eps) + ',minPts=' + str(min_pts) +
',kinds=' + str(cluster_number) + ',noiseRatio=' + str(noise_ratio) + ').csv')
np.savetxt(labels_file_path, labels, delimiter=',', fmt='%d')
# 创建一个新的图形,并设置尺寸
fig, ax = plt.subplots(figsize=(10, 8))
colors = generate_colors.generate_colors(labels)
ax.scatter(data[:, 0], data[:, 1], c=colors, s=0.5)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title('DBSCAN Clustering')
# plt.show()
pic_file_path = 'result/result(eps=' + str(eps) + ',min_pts=' + str(min_pts) + ').png'
fig.savefig(fname=pic_file_path, dpi=300)
# 计算轮廓系数
# 获取非噪声点的索引
non_noise_indices = np.where(labels != -1)[0]
# 筛选距离矩阵和标签,只保留非噪声点
filtered_distance = distance[non_noise_indices][:, non_noise_indices]
filtered_labels = labels[non_noise_indices]
silhouette_sample = metrics.silhouette_samples(filtered_distance, filtered_labels, metric='precomputed')
silhouette_score = np.mean(silhouette_sample)
silhouette_score_file_path = ('result/silhouette_sample(eps=' + str(eps) + ',minPts=' + str(min_pts)
+ ',score=' + str(silhouette_score) + ').csv')
np.savetxt(silhouette_score_file_path, silhouette_sample, delimiter=',')
3.main.py
import numpy as np
import dbscan
if __name__ == '__main__':
data = np.load('random_data.npy')
dist = np.load('point_dist.npy')
dbscan.dbscan(data, dist, 70, 25)
dbscan.dbscan(data, dist, 70, 20)
dbscan.dbscan(data, dist, 70, 30)
dbscan.dbscan(data, dist, 70, 40)
dbscan.dbscan(data, dist, 70, 50)
dbscan.dbscan(data, dist, 70, 60)
dbscan.dbscan(data, dist, 70, 70)
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)