什么是Celery?

Celery 是一个开源的分布式任务队列框架,用于在分布式系统中进行异步任务调度和处理。它使用消息代理(如 RabbitMQ、Redis)来实现任务的发布和消费,支持任务的并发执行、定时调度和结果收集。

为什么要用Celery?

1.异步任务处理:Celery 提供了一种方便的方式来处理异步任务。对于耗时的操作,可以将其封装为 Celery 的任务,并将任务提交到任务队列中进行处理。这样可以避免阻塞主线程,提高系统的并发性能和响应能力。

2.分布式任务调度:Celery 可以在分布式环境中进行任务调度和处理。可以将任务发布到任务队列中,由多个 Worker 进程并发地处理任务。这使得任务的处理可以水平扩展,并能够适应高负载和大规模的任务处理需求。

3.定时任务调度:Celery 提供了调度器(Beat)功能,可以按照预定的时间表触发任务的执行。这允许定期执行任务,例如生成报告、数据清理等。通过 Celery 的调度功能,可以方便地管理和执行定时任务,而无需编写独立的调度逻辑。

4.可扩展性和弹性:由于 Celery 的分布式特性,你可以根据需求增加或减少 Worker 的数量。这使得系统具有良好的可扩展性和弹性,可以根据负载变化自动调整任务处理的能力。

5.结果收集和处理:Celery 提供了结果存储(Result Backend)功能,用于存储任务的执行结果。通过结果存储,可以方便地获取和查询任务的执行结果。这对于需要对任务的执行结果进行后续处理、分析或展示的情况非常有用。

6.社区支持和生态系统:Celery 是一个广泛使用的任务队列框架,拥有庞大的开发者社区和丰富的生态系统。这意味着你可以轻松地找到大量的文档、教程、插件和解决方案,以满足不同的需求和场景。

Celery 的构成?

任务(Task):任务是执行具体操作的逻辑单元,可以是函数或类的方法。任务将被异步提交到任务队列中进行执行。
任务队列(Task Queue):任务队列是存储待执行任务的消息代理,如 RabbitMQ、Redis 等。它接收任务的发布请求,并将任务分发给可用的 Worker 进行处理。
Worker:Worker 是执行任务的工作进程。它从任务队列中获取待执行的任务,并在后台进行处理。可以通过启动多个 Worker 实例来实现任务的并发处理。
调度器(Beat):调度器用于定时调度任务的执行。它可以按照预定的时间表触发任务的执行,或者在特定的时间间隔内循环执行任务。
结果存储(Backend):结果存储是用于存储任务执行结果的地方,可以是数据库、内存缓存或其他支持的存储介质。任务的执行结果可以通过结果存储进行获取和查询。
在这里插入图片描述

任务调用提交任务执行请求给Broker队列
1.如果是异步任务,worker会立即从队列中取出任务并执行,执行结果保存在Backend中
2.如果是定时任务,任务由Celery Beat进程周期性地将任务发往Broker队列,Worker实时监视消息队列获取队列中的任务执行

怎么使用Celery?

1.创建Celery应用
在Python代码中,首先需要创建一个Celery应用。这可以通过实例化Celery类来完成,指定应用的名称以及消息队列的连接信息。

from celery import Celery

app = Celery('myapp', broker='amqp://guest:guest@localhost//', backend='rpc://')

2. 定义任务
在应用中,需要定义具体的任务函数。使用@app.task装饰器将函数标记为Celery任务

@app.task
def add(x, y):
    return x + y

定义共享任务

@app.shared_task
def shared_task(arg1, arg2):
    # 执行共享任务操作
    return result

3.提交任务
在代码中,可以调用任务函数并将任务提交给Celery进行异步处理。可以使用.delay()方法来提交任务,或使用.apply_async()方法来设置任务参数和其他选项。

result = add.delay(4, 6)

4.启动Celery Worker
启动Celery Worker进程来处理任务的执行。Worker会监听消息队列中的任务,并在有任务到达时进行消费执行。

在终端中执行以下命令:

celery -A yourmodule worker --loglevel=info

5.获取任务结果
可以使用.get()方法获取任务的执行结果。

result = result.get()

6.监控任务状态和结果
Celery提供了一系列的状态和结果相关的方法,用于查询任务的状态、获取任务的结果、检查任务是否完成等。

result = add.delay(4, 6)
if result.ready():
    print("任务已完成")
else:
    print("任务还在执行中")

7.使用Flower进行监控
Celery还提供了Flower工具,用于实时监控任务队列的状态和统计信息。可以使用Flower来查看任务队列的监控界面。

在终端中执行以下命令启动Flower:

celery flower -A yourmodule

然后在浏览器中访问http://localhost:5555即可查看任务队列的监控界面。

8.结果存储后端
Celery允许将任务的执行结果存储在不同的后端中,如数据库、消息队列等。可以根据需要选择适合的结果存储后端。

示例:

app = Celery('myapp', broker='amqp://guest:guest@localhost//', backend='db+sqlite:///results.db')

在上面的示例中,使用SQLite数据库作为任务结果的存储后端。

9.错误处理和重试:
在任务执行过程中可能会发生错误,Celery提供了错误处理和任务重试的机制,以增加任务的稳定性和可靠性。

from celery import Task

class MyTask(Task):
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 3}

    def run(self, *args, **kwargs):
        # 执行任务的代码
        pass

在上面的示例中,任务继承自Task类,并设置了自动重试的配置。

10.并发和扩展性
Celery支持多个工作进程并发地处理任务,可以通过启动多个Worker进程来实现任务的并行处理和提高系统的吞吐量。

celery -A yourmodule worker --concurrency=4

在上面的示例中,启动了4个并发的Worker进程。

celery的常用命令

celery -A your_project_name worker --loglevel=info:
启动Celery Worker进程,以便处理异步任务。-A选项后跟着你的项目名称,–loglevel指定日志级别。

celery -A your_project_name beat --loglevel=info:
启动Celery Beat进程,用于调度周期性任务。-A选项后跟着你的项目名称,–loglevel指定日志级别。

celery -A your_project_name flower:
启动Flower,一个用于监控和管理Celery集群的Web界面。-A选项后跟着你的项目名称。

celery -A your_project_name purge:
清除所有已完成的任务结果。

celery -A your_project_name inspect active:
查看当前活动任务的信息。

celery -A your_project_name inspect scheduled:
查看所有计划任务的信息。

celery -A your_project_name inspect reserved:
查看所有已预订的任务。

celery -A your_project_name inspect revoked:
查看已撤销的任务。

celery -A your_project_name status:
查看Celery Worker的状态。

celery -A your_project_name control shutdown:
关闭Celery Worker。

celery -A your_project_name control pool_grow n:
增加工作池中的工作进程数。

celery -A your_project_name control pool_shrink n:
减少工作池中的工作进程数。

celery -A your_project_name control rate_limit task_name rate:
限制任务的执行速率。

celery -A your_project_name control cancel_consumer worker_name:
取消指定Worker的所有任务。

celery -A your_project_name control enable_events:
启用Celery事件。

celery -A your_project_name control disable_events:
禁用Celery事件。

异步任务

1). 创建一个 Celery 任务

首先,创建一个 Celery 任务。在 Django 项目中,你可以在任何地方创建这个任务,通常将它放在一个独立的文件中,如 tasks.py:

# tasks.py

from celery import shared_task

@shared_task
def my_task(param):
    # 这是一个示例任务,你可以在这里执行你的异步操作
    result = param * 2
    return result

在这个示例中,我们创建了一个名为 my_task 的 Celery 任务,它接受一个参数 param,并返回 param 的两倍。

2). 在视图中调用 Celery 任务

在 Django 视图中,你可以像调用普通函数一样调用 Celery 任务,并将其推送到 Celery worker 进程执行。下面是一个示例视图:

# views.py

from django.shortcuts import render
from .tasks import my_task  # 导入任务

def my_view(request):
    param = 42  # 参数值,你可以根据需要设置
    result = my_task.delay(param)  # 异步执行任务
    return render(request, 'my_template.html', {'result': result.id})

在这个示例中,我们导入了之前创建的 my_task 任务,并使用 delay 方法异步执行它。任务的执行不会阻塞当前视图,这样可以加快响应速度。

3). 处理任务结果

要处理任务的结果,你可以使用任务返回的 AsyncResult 对象。通常,你可以将该对象的 id 存储在数据库中,然后在需要时获取结果:

# views.py

from celery.result import AsyncResult
from .tasks import my_task  # 导入任务

def get_task_result(request, task_id):
    result = AsyncResult(task_id)
    if result.ready():
        # 任务已完成,你可以获取结果
        result_value = result.get()
    else:
        # 任务尚未完成,你可以采取其他操作或等待
        result_value = None
    return render(request, 'result_template.html', {'result_value': result_value})

在这个示例中,我们导入了 AsyncResult 类,并使用 task_id 获取任务结果。如果任务已完成,我们可以使用 result.get() 获取结果值。

周期性任务

要使用 Celery 实现周期性任务,你需要配置 Celery 定时任务调度器(Celery Beat)。Celery Beat 是 Celery 的一个组件,它允许你安排定时任务,并在指定的时间间隔内运行任务。以下是如何实现周期性任务的步骤:

1) 配置 Celery Beat

首先,确保你的 Celery 项目中已经配置了 Celery Beat。通常,你需要创建一个名为 celery.py 的文件,其中包含 Celery 和 Celery Beat 的配置。确保在配置中设置 CELERY_BROKER_URL 和 CELERY_RESULT_BACKEND,以及指定调度器:

# celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

# 设置 Django 的环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')

app = Celery('your_project_name')

# 配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动从所有注册的应用中发现异步任务
app.autodiscover_tasks()

# 定义定时任务
app.conf.beat_schedule = {
    'my-periodic-task': {
        'task': 'your_app.tasks.my_periodic_task',  # 任务路径
        'schedule': crontab(minute=0, hour=0),  # 设置定时规则,每天零点执行
    },
}

在上面的示例中,我们配置了一个名为 my-periodic-task 的周期性任务,它将每天在午夜执行(每天零点)。

2)创建周期性任务

在 your_app/tasks.py 文件中创建周期性任务的实际代码:

# your_app/tasks.py

from celery import shared_task

@shared_task
def my_periodic_task():
    # 这是一个周期性任务的示例
    # 在这里执行你的周期性任务代码
    result = "Periodic task executed."
    return result

在这个示例中,my_periodic_task 函数是一个周期性任务的示例,可以在其中执行任何周期性操作。

3)运行 Celery Beat

最后,运行 Celery Beat 以启动定时任务调度器:

celery -A your_project_name beat

现在,my-periodic-task 将在每天零点执行。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐