目录

一、Dask介绍

二、使用说明

安装

三、测试

1、单个文件中实现功能

2、运行多个可执行文件


最近在写并行计算相关部分,用到了python的Dask库。

Dask官网:Dask | Scale the Python tools you love

一、Dask介绍

Dask是一个灵活的并行和分布式计算库,旨在处理大规模数据集。它提供了类似于Pandas 和 NumPy 的数据结构,但能够有效处理比内存更大的数据集。通过使用Dask,可以在单台机器或分布式集群中运行,更方便处理大规模数据。

Dask是一个用于Python的并行计算模块,从单机多核扩展到拥有数千台机器的数据中心。它既由低级任务API,也有更高级面向数据的API。低级任务API支持Dask与多种Python库的集成,公共API为围绕Dask发展的各种工具的生态系统提供了基础。

Dask相较于Spark这些大数据处理框架,更轻量级。Dask更侧重与其他框架,如:Numpy、Pandas,Scikit-learning相结合,从而使其能更加方便进行分布式并行计算。

Dask存在三种最基本的数据结构,分别是:Arrays、Dataframes以及Bags

二、使用说明

安装

pip install dask
python -m pip install "dask[array]"
python -m pip install "dask[distributed]"
python -m pip install "dask[dataframe]"

先测试是否已经安装了模块,命令行进入到python3编辑器:

from dask.distributed import Client, progress

没有报缺少模块错误,则说明是可以正常执行的。

三、测试

1、单个文件中实现功能

下述的主要数据处理在定义计算任务函数calculate_value(num)中,即将计算任务函数处理32次。

from dask.distributed import Client, progress
import time

# 定义计算任务的函数
def calculate_value(num):
    num_float = float(num) * 0.33
    num_double = float(num) * 0.33  
    return num_float, num_double

# 设置Dask客户端
def setup_client():
    from dask.distributed import Client, LocalCluster
    
    cluster = LocalCluster()
    client = Client(cluster)
    
    scheduler_info = client.scheduler_info()
    ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())
    
    print(f"Connected to Dask cluster with {ncores} cores")
    
    return client

# 提交任务并收集结果
def submit_tasks(client, num, num_tasks=32):
    # 创建任务列表
    tasks = [client.submit(calculate_value, num) for _ in range(num_tasks)]
    
    # 等待所有任务完成,并显示进度
    progress(tasks)
    
    # 收集结果
    results = [task.result() for task in tasks]
    return results

# 主函数
def main():
    num = 558558571  # 这是您要处理的数字
    client = setup_client()  # 设置Dask客户端
    
    # 提交32个任务
    results = submit_tasks(client, num)
    
    # 打印结果
    for i, (num_float, num_double) in enumerate(results):
        print(f"Task {i+1} - num_float: {num_float}, num_double: {num_double}")
    
    # 关闭客户端连接
    client.close()

if __name__ == "__main__":
    main()

运行上述的python程序:

python3 my_dask_script.py

执行结果如下:

此时表示运行了32个task。

在运行的时候如果提示:

表明 dask-scheduler 无法启动,原因是端口 8787 已经被占用了。

解决方法:

1、查找并终止占用端口 8787 的进程

(1)先安装lsof:

apt install lsof

(2)查看占用端口进程:

lsof -i :8787

(3)通过进程的 PID 使用 kill 命令终止该进程:

kill -9 PID

2、修改 dask-scheduler 使用的端口

dask-scheduler --port 8888

再次重新启动查看 dask-scheduler 使用的端口:

dask-scheduler

2、运行多个可执行文件

我在同目录中创建了一个test.cc文件,为简单的打印数据,内容如下:

#include <iostream>
#include <iomanip>
 
int main() {
    int num = 558558571;
    float num_float = static_cast<float>(num) * 0.33;
    double num_double = static_cast<double>(num) * 0.33;
    
    std::cout << "num value: " << num << std::endl;
    std::cout << std::fixed << std::setprecision(2);
    std::cout << "num_float value: " << num_float << std::endl;
    std::cout << "num_double value: " << num_double << std::endl;
    return 0;
}

此时将上述的test.cc编译:

g++ -o main test.cc

然后新建一个my_dask_script.py文件,内容如下:

from dask.distributed import Client, LocalCluster
import os

# 定义执行外部程序的函数
def run_external_program():
    cmd = './main'  # 您的外部程序命令
    os.system(cmd)  # 使用os.system来执行命令

# 设置Dask客户端
def setup_client():
    from dask.distributed import Client, LocalCluster
    
    cluster = LocalCluster()
    client = Client(cluster)
    
    scheduler_info = client.scheduler_info()
    ncores = sum(worker['nthreads'] for worker in scheduler_info['workers'].values())
    
    print(f"Connected to Dask cluster with {ncores} cores")
    
    return client

# 提交任务到Dask集群
def submit_tasks(client, num_tasks=32):
    futures = [client.submit(run_external_program) for _ in range(num_tasks)]
    return futures

# 主函数
def main():
    client = setup_client()  # 设置Dask客户端
    futures = submit_tasks(client)  # 提交任务
    
    # 等待所有任务完成
    client.gather(futures)
    
    # 关闭客户端连接
    client.close()

if __name__ == "__main__":
    main()

运行结果:

此时表示上述的可执行文件main已运行了32份。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐