本文参考论文:

平时没怎么用py,命名什么的可能不太规范..

目录

一、算法介绍

(一)算法基本概念

(二)算法执行步骤

(三)算法优缺点

1.优点

2.缺点

二、参数配置

(一)确定eps半径值

(二)确定minPts邻域内点数

三、DBSCAN算法python实现

(一)实验步骤

(二)评估指标

1.轮廓系数

2.其他指标

(三)代码实现


一、算法介绍

        DBSCAN聚类算法是一种基于空间密度有传递性质的聚类算法,将簇定义为密度相连的点的最大的集合,可以将高密度点区域划分为簇,并有效地过滤低密度点区域,可以在含有噪声的数据集中识别任意形状和数量的簇。

(一)算法基本概念

        DBSCAN算法的关键思想是:对于聚类的每个点,给定半径的邻域必须包含至少最小数量的点,即邻域内的密度必须超过某个阈值。数据集中特定点的密度通过该点的Eps半径之内包含的点数来计算。

Eps邻域:给定对象半径Eps内的邻域,用NEps(p)表示点p的Eps半径内的点的集合。

MinPts:给定邻域NEps(p)需要包含的点的最小点数,用于决定点p是簇的核心部分、边界点或噪声。

核心点:如果点p的Eps邻域内包含至少MinPts个点,则该点为核心点。

边界点:不是核心点,但在某个核心点的Eps邻域内。

噪声点:即不是核心点,也不是边界点的点。

直接密度可达:如果p在q的Eps邻域内,且q是一个核心点,则p从q出发是直接密度可达的。

密度可达:如果存在一个对象链p1,p2,……,pn,如果pi+1是从pi出发关于Eps和MinPts直接密度可达的,那么pn是从p1关于Eps和MinPts密度可达的。

密度相连:如果p和q都是从O关于Eps和MinPts直接密度可达的,那么p和q是密度相连的。

(二)算法执行步骤

1.选择一个未访问的数据点p作为起始点;

2.计算点p的Eps邻域内的数据点数量,如果数量大于等于MinPts,则将p标记为核心点,并创建一个新的簇;

3.将点p加入到当前簇中,并将p的Eps邻域内的所有未访问数据点加入到当前簇中;

4.对当前簇中的每个数据点q进行以下操作:

(1)如果q是核心点,则将q的Eps邻域内的所有未访问数据点加入到当前簇中;

(2)如果q不是核心点,但是位于其他簇的Eps邻域内,则将q标记为边界点,并将其加入到当前簇中;

5.当没有更多的数据点可以加入到当前簇时,当前簇被认为是一个完整的簇;

6.选择下一个未访问的数据点作为起始点,重复步骤2到步骤5,直到所有的数据点都被访问过;

7.标记剩余未分配的数据点为噪声点。

图片来自开头提到的第二篇论文

(三)算法优缺点

1.优点

(1)DBSCAN算法可以发现任意形状的簇。

(2)DBSCAN算法相比其他聚类算法,无需实现指定簇的数量,而通过设置合适的邻域半径和簇最小点数来自动确定簇的数量。

(3)DBSCAN算法将低密度区域视为噪声,不将其归入任何簇中,对于噪声不敏感,具有鲁棒性。

2.缺点

(1)DBSCAN算法对邻域半径、簇最小点数两个参数敏感,不同的参数选择可能会导致完全不同的聚类结果。

(2)如果数据集的密度不均匀,聚类结果可能会受影响,簇的形状可能会不连续或断裂。

二、参数配置

这部分用python实现了文章2的方式,利用图标可视化结果确定参数。

(一)确定eps半径值

计算每个数据点的距离,生成距离矩阵Distn*n,然后将距离矩阵按行升序排序,这样矩阵的每一行就是相应数据点到其他所有点距离的排序,每一列就是距离每个数据点最近的第i个距离值的集合;然后按列绘制距离值的概率密度分布曲线。

1.dist.py

import math
import numpy as np


# 计算两点间距离
def calculate_dist(t1, t2):
    dis = math.sqrt(np.power((t1[0] - t2[0]), 2) + np.power((t1[1] - t2[1]), 2))
    return dis

2.caculate_point_dist.py

import dist


# 得到每个点之间的距离矩阵
def calculate_point_dist(data):
    length = len(data)
    point_dist = dist.np.zeros((length, length))

    for i in range(0, length):
        for j in range(i + 1, length):
            distance = dist.calculate_dist(data[i], data[j])
            point_dist[i][j] = distance
            point_dist[j][i] = distance

    dist.np.save('point_dist.npy', point_dist)
    dist.np.savetxt('point_dist.csv', point_dist)

    return point_dist

3.search_eps.py

import calculate_point_dist as cpd
import csv
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn


if __name__ == '__main__':
    # 创建一个空列表来存储数据
    data = []
    # 打开 CSV 文件
    with open('random_data.csv', 'r') as file:
        # 创建 CSV 读取
        reader = csv.reader(file)
        next(reader)
        # 逐行读取数据并添加到列表中
        for row in reader:
            data.append([int(value) for value in row])

    data = np.array(data)
    np.save('random_data.npy', data)

    distance = cpd.calculate_point_dist(data)
    # 每一行升序排序
    distance = np.sort(distance, axis=1)

    plt.figure()
    # 遍历每一列
    for i in range(1, distance.shape[1]):
        column_data = distance[:, i]
        # 绘制概率密度分布曲线
        sn.kdeplot(column_data)
    # 显示图形
    plt.xlabel('Distance')
    plt.ylabel('Density')
    plt.title('Probability Density Distribution')
    plt.show()

生成的概率密度曲线:

由图可见,距离为70的密度已经变得非常小,这些比例很小却和其他点距离明显远很多的点,是噪声的可能性很大,因此可以选取Eps为70。

(二)确定minPts邻域内点数

        计算每个数据点的局部密度值(即Eps为70的邻域范围内的点数)ρi,然后计算每个点距离更高密度点的最近距离,若是最高密度点,则取到数据集中最远点的距离δi。然后以局部密度值为横坐标,到更高密度点的最近距离为纵坐标进行绘图。

1.count_points.py

import dist


# 计算每个点邻域范围内的点数
def count_points(data, distance, eps):
    length = len(data)
    points = dist.np.ones(length)
    for i in range(0, length):
        # marked = -1
        for j in range(i + 1, length):
            # if data[j][0] != marked:
                if distance[i][j] <= eps:
                    points[i] = points[i] + 1
                    points[j] = points[j] + 1
                # else:
                #     marked = data[j][0]
    return points


def count_points_2(data, eps):
    length = len(data)
    points = dist.np.ones(length)
    for i in range(0, length):
        marked = -1
        for j in range(i + 1, length):
            if data[j][0] != marked:
                distance = dist.calculate_dist(data[i], data[j])
                if distance <= eps:
                    points[i] = points[i] + 1
                    points[j] = points[j] + 1
                else:
                    marked = data[j][0]
    return points

2.find_minPts.py

import dist


# 计算每个点与更高密度点的最近距离
def find_minPts(sorted_points, distance):
    length = len(sorted_points)
    min_dist = dist.np.zeros(length)
    for i in range(0, length):
        index_i = sorted_points[i][0]
        temp_min_distance = max(distance[index_i, :])
        for j in range(i + 1, length):
            index_j = sorted_points[j][0]
            if distance[index_i][index_j] < temp_min_distance:
                temp_min_distance = distance[index_i][index_j]
        min_dist[index_i] = temp_min_distance
    return min_dist


def find_minPts_2(sorted_points, data):
    length = len(sorted_points)
    min_dist = dist.np.zeros(length)
    for i in range(0, length):
        index_i = sorted_points[i][0]
        temp = -1
        distance = 1000000000
        for j in range(i + 1, length):
            if sorted_points[j][1] > sorted_points[i][1]:
                index_j = sorted_points[j][0]
                current_distance = dist.calculate_dist(data[index_i], data[index_j])
                if current_distance < distance:
                    temp = index_j
                    distance = current_distance
        if temp == -1:
            min_dist[index_i] = dist.calculate_dist(data[index_i], data[0])
        else:
            min_dist[index_i] = distance
    return min_dist

3.search_minPts.py

import numpy as np
import calculate_point_dist
import count_points
import find_minPts
import csv
import matplotlib.pyplot as plt

if __name__ == '__main__':
    eps = 70
    data = np.load('random_data.npy')
    distance = np.load('point_dist.npy')

    points = count_points.count_points(data, distance, eps)
    sorted_data = sorted(enumerate(points), key=lambda x: x[1])
    min_dist = find_minPts.find_minPts(sorted_data, distance)

    # points = count_points.count_points_2(data, 20)
    # sorted_data = sorted(enumerate(points), key=lambda x: x[1])
    # min_dist = find_minPts.find_minPts_2(sorted_data, data)

    np.savetxt('pts.csv', points, delimiter=',', fmt='%d')
    np.savetxt('min_dist.csv', min_dist, delimiter=',', fmt='%d')

    plt.scatter(points, min_dist, 0.5)
    plt.xlabel('pts')
    plt.ylabel('min_dist')
    # 以eps绘制水平线
    plt.axhline(y=eps, color='r', linestyle='-')
    plt.show()

每个点的ρi与δi的函数关系

由图所示,δi<70(即图中红线以下)的数据点,皆为与更高密度点距离小于Eps的数据点。根据图选取MinPts值,ρi>=MinPts的数据点皆为核心点,ρi<MinPts且δi<=70的数据点可能为边界点或噪声点,而ρi<MinPts且δi>70的数据点皆为噪声点。

三、DBSCAN算法python实现

        本文选取Eps为70,MinPts范围为20-70,并使用预先计算得到的欧氏距离矩阵,在python环境下调用sklearn库进行实验。

(一)实验步骤

1.调用sklearn库中的DBSCAN函数进行聚类得到标签;

2.根据标签计算噪声比,绘制聚类结果图像;

3.获取非噪声点的标签,筛选距离矩阵和标签,调用sklearn库中的silhouette_samples函数计算聚类结果中每个非噪声点的轮廓系数,并求其均值得到聚类整体轮廓系数。

(二)评估指标

1.轮廓系数

        聚类结果的评价可转化为对于簇内和簇见相似度的衡量,簇的凝聚性度量簇内样本密切程度,而分离性度量簇间相异程度,较小的簇内紧密度和较大的簇间分离度可以使聚类结果更加理想化。轮廓系数结合了簇内紧密度和簇间分离度,是类的密集与分散程度的评价指标。轮廓系数的范围为[-1,1],轮廓系数越接近1表示聚类效果越好。

        聚类总体的轮廓系数为所有样本轮廓系数的平均值。

2.其他指标

        聚类结果一般都采用轮廓系数评估,但是针对实际解决的问题,也可以结合噪声比、簇数量等进行评估。

(三)代码实现

1.generate_colors.py

import colorsys
import numpy as np


def generate_colors(labels):
    unique_labels = np.unique(labels)
    num_colors = np.size(unique_labels)
    colors = []
    for i in range(num_colors):
        hue = i / num_colors  # 在0到1之间均匀分布色相值
        saturation = 1.0  # 最大饱和度
        value = 1.0  # 最大亮度
        rgb = colorsys.hsv_to_rgb(hue, saturation, value)
        colors.append(rgb)

    result = []
    size = len(labels)
    for i in range(0, size):
        index = np.where(unique_labels == labels[i])
        result.append(colors[index[0][0]])
    return result

2.dbscan.py

import numpy as np
from sklearn.cluster import DBSCAN
from sklearn import metrics
import matplotlib.pyplot as plt
import generate_colors


def dbscan(data, distance, eps, min_pts):

    # 使用 DBSCAN 进行聚类
    dbscan_object = DBSCAN(eps=eps, min_samples=min_pts, metric='precomputed')
    labels = dbscan_object.fit_predict(distance)
    # 噪声数
    noise_num = np.sum(labels == -1)
    noise_ratio = noise_num / len(labels)
    cluster_number = np.size(np.unique(labels))
    if noise_num > 0:
        cluster_number = cluster_number - 1

    # 保存标签
    labels_file_path = ('result/labels(eps=' + str(eps) + ',minPts=' + str(min_pts) +
                        ',kinds=' + str(cluster_number) + ',noiseRatio=' + str(noise_ratio) + ').csv')
    np.savetxt(labels_file_path, labels, delimiter=',', fmt='%d')

    # 创建一个新的图形,并设置尺寸
    fig, ax = plt.subplots(figsize=(10, 8))
    colors = generate_colors.generate_colors(labels)
    ax.scatter(data[:, 0], data[:, 1], c=colors, s=0.5)
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_title('DBSCAN Clustering')
    # plt.show()
    pic_file_path = 'result/result(eps=' + str(eps) + ',min_pts=' + str(min_pts) + ').png'
    fig.savefig(fname=pic_file_path, dpi=300)

    # 计算轮廓系数
    # 获取非噪声点的索引
    non_noise_indices = np.where(labels != -1)[0]
    # 筛选距离矩阵和标签,只保留非噪声点
    filtered_distance = distance[non_noise_indices][:, non_noise_indices]
    filtered_labels = labels[non_noise_indices]
    silhouette_sample = metrics.silhouette_samples(filtered_distance, filtered_labels, metric='precomputed')
    silhouette_score = np.mean(silhouette_sample)
    silhouette_score_file_path = ('result/silhouette_sample(eps=' + str(eps) + ',minPts=' + str(min_pts)
                                  + ',score=' + str(silhouette_score) + ').csv')
    np.savetxt(silhouette_score_file_path, silhouette_sample, delimiter=',')

3.main.py

import numpy as np
import dbscan


if __name__ == '__main__':

    data = np.load('random_data.npy')
    dist = np.load('point_dist.npy')

    dbscan.dbscan(data, dist, 70, 25)
    dbscan.dbscan(data, dist, 70, 20)
    dbscan.dbscan(data, dist, 70, 30)
    dbscan.dbscan(data, dist, 70, 40)
    dbscan.dbscan(data, dist, 70, 50)
    dbscan.dbscan(data, dist, 70, 60)
    dbscan.dbscan(data, dist, 70, 70)
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐