celery的用法详解
Celery 是一个开源的分布式任务队列框架,用于在分布式系统中进行异步任务调度和处理。它使用消息代理(如 RabbitMQ、Redis)来实现任务的发布和消费,支持任务的并发执行、定时调度和结果收集。
什么是Celery?
Celery 是一个开源的分布式任务队列框架,用于在分布式系统中进行异步任务调度和处理。它使用消息代理(如 RabbitMQ、Redis)来实现任务的发布和消费,支持任务的并发执行、定时调度和结果收集。
为什么要用Celery?
1.异步任务处理:Celery 提供了一种方便的方式来处理异步任务。对于耗时的操作,可以将其封装为 Celery 的任务,并将任务提交到任务队列中进行处理。这样可以避免阻塞主线程,提高系统的并发性能和响应能力。
2.分布式任务调度:Celery 可以在分布式环境中进行任务调度和处理。可以将任务发布到任务队列中,由多个 Worker 进程并发地处理任务。这使得任务的处理可以水平扩展,并能够适应高负载和大规模的任务处理需求。
3.定时任务调度:Celery 提供了调度器(Beat)功能,可以按照预定的时间表触发任务的执行。这允许定期执行任务,例如生成报告、数据清理等。通过 Celery 的调度功能,可以方便地管理和执行定时任务,而无需编写独立的调度逻辑。
4.可扩展性和弹性:由于 Celery 的分布式特性,你可以根据需求增加或减少 Worker 的数量。这使得系统具有良好的可扩展性和弹性,可以根据负载变化自动调整任务处理的能力。
5.结果收集和处理:Celery 提供了结果存储(Result Backend)功能,用于存储任务的执行结果。通过结果存储,可以方便地获取和查询任务的执行结果。这对于需要对任务的执行结果进行后续处理、分析或展示的情况非常有用。
6.社区支持和生态系统:Celery 是一个广泛使用的任务队列框架,拥有庞大的开发者社区和丰富的生态系统。这意味着你可以轻松地找到大量的文档、教程、插件和解决方案,以满足不同的需求和场景。
Celery 的构成?
任务(Task):任务是执行具体操作的逻辑单元,可以是函数或类的方法。任务将被异步提交到任务队列中进行执行。
任务队列(Task Queue):任务队列是存储待执行任务的消息代理,如 RabbitMQ、Redis 等。它接收任务的发布请求,并将任务分发给可用的 Worker 进行处理。
Worker:Worker 是执行任务的工作进程。它从任务队列中获取待执行的任务,并在后台进行处理。可以通过启动多个 Worker 实例来实现任务的并发处理。
调度器(Beat):调度器用于定时调度任务的执行。它可以按照预定的时间表触发任务的执行,或者在特定的时间间隔内循环执行任务。
结果存储(Backend):结果存储是用于存储任务执行结果的地方,可以是数据库、内存缓存或其他支持的存储介质。任务的执行结果可以通过结果存储进行获取和查询。
任务调用提交任务执行请求给Broker队列
1.如果是异步任务,worker会立即从队列中取出任务并执行,执行结果保存在Backend中
2.如果是定时任务,任务由Celery Beat进程周期性地将任务发往Broker队列,Worker实时监视消息队列获取队列中的任务执行
怎么使用Celery?
1.创建Celery应用
在Python代码中,首先需要创建一个Celery应用。这可以通过实例化Celery类来完成,指定应用的名称以及消息队列的连接信息。
from celery import Celery
app = Celery('myapp', broker='amqp://guest:guest@localhost//', backend='rpc://')
2. 定义任务
在应用中,需要定义具体的任务函数。使用@app.task装饰器将函数标记为Celery任务
@app.task
def add(x, y):
return x + y
定义共享任务
@app.shared_task
def shared_task(arg1, arg2):
# 执行共享任务操作
return result
3.提交任务
在代码中,可以调用任务函数并将任务提交给Celery进行异步处理。可以使用.delay()方法来提交任务,或使用.apply_async()方法来设置任务参数和其他选项。
result = add.delay(4, 6)
4.启动Celery Worker
启动Celery Worker进程来处理任务的执行。Worker会监听消息队列中的任务,并在有任务到达时进行消费执行。
在终端中执行以下命令:
celery -A yourmodule worker --loglevel=info
5.获取任务结果
可以使用.get()方法获取任务的执行结果。
result = result.get()
6.监控任务状态和结果
Celery提供了一系列的状态和结果相关的方法,用于查询任务的状态、获取任务的结果、检查任务是否完成等。
result = add.delay(4, 6)
if result.ready():
print("任务已完成")
else:
print("任务还在执行中")
7.使用Flower进行监控
Celery还提供了Flower工具,用于实时监控任务队列的状态和统计信息。可以使用Flower来查看任务队列的监控界面。
在终端中执行以下命令启动Flower:
celery flower -A yourmodule
然后在浏览器中访问http://localhost:5555即可查看任务队列的监控界面。
8.结果存储后端
Celery允许将任务的执行结果存储在不同的后端中,如数据库、消息队列等。可以根据需要选择适合的结果存储后端。
示例:
app = Celery('myapp', broker='amqp://guest:guest@localhost//', backend='db+sqlite:///results.db')
在上面的示例中,使用SQLite数据库作为任务结果的存储后端。
9.错误处理和重试:
在任务执行过程中可能会发生错误,Celery提供了错误处理和任务重试的机制,以增加任务的稳定性和可靠性。
from celery import Task
class MyTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3}
def run(self, *args, **kwargs):
# 执行任务的代码
pass
在上面的示例中,任务继承自Task类,并设置了自动重试的配置。
10.并发和扩展性
Celery支持多个工作进程并发地处理任务,可以通过启动多个Worker进程来实现任务的并行处理和提高系统的吞吐量。
celery -A yourmodule worker --concurrency=4
在上面的示例中,启动了4个并发的Worker进程。
celery的常用命令
celery -A your_project_name worker --loglevel=info:
启动Celery Worker进程,以便处理异步任务。-A选项后跟着你的项目名称,–loglevel指定日志级别。
celery -A your_project_name beat --loglevel=info:
启动Celery Beat进程,用于调度周期性任务。-A选项后跟着你的项目名称,–loglevel指定日志级别。
celery -A your_project_name flower:
启动Flower,一个用于监控和管理Celery集群的Web界面。-A选项后跟着你的项目名称。
celery -A your_project_name purge:
清除所有已完成的任务结果。
celery -A your_project_name inspect active:
查看当前活动任务的信息。
celery -A your_project_name inspect scheduled:
查看所有计划任务的信息。
celery -A your_project_name inspect reserved:
查看所有已预订的任务。
celery -A your_project_name inspect revoked:
查看已撤销的任务。
celery -A your_project_name status:
查看Celery Worker的状态。
celery -A your_project_name control shutdown:
关闭Celery Worker。
celery -A your_project_name control pool_grow n:
增加工作池中的工作进程数。
celery -A your_project_name control pool_shrink n:
减少工作池中的工作进程数。
celery -A your_project_name control rate_limit task_name rate:
限制任务的执行速率。
celery -A your_project_name control cancel_consumer worker_name:
取消指定Worker的所有任务。
celery -A your_project_name control enable_events:
启用Celery事件。
celery -A your_project_name control disable_events:
禁用Celery事件。
异步任务
1). 创建一个 Celery 任务
首先,创建一个 Celery 任务。在 Django 项目中,你可以在任何地方创建这个任务,通常将它放在一个独立的文件中,如 tasks.py:
# tasks.py
from celery import shared_task
@shared_task
def my_task(param):
# 这是一个示例任务,你可以在这里执行你的异步操作
result = param * 2
return result
在这个示例中,我们创建了一个名为 my_task 的 Celery 任务,它接受一个参数 param,并返回 param 的两倍。
2). 在视图中调用 Celery 任务
在 Django 视图中,你可以像调用普通函数一样调用 Celery 任务,并将其推送到 Celery worker 进程执行。下面是一个示例视图:
# views.py
from django.shortcuts import render
from .tasks import my_task # 导入任务
def my_view(request):
param = 42 # 参数值,你可以根据需要设置
result = my_task.delay(param) # 异步执行任务
return render(request, 'my_template.html', {'result': result.id})
在这个示例中,我们导入了之前创建的 my_task 任务,并使用 delay 方法异步执行它。任务的执行不会阻塞当前视图,这样可以加快响应速度。
3). 处理任务结果
要处理任务的结果,你可以使用任务返回的 AsyncResult 对象。通常,你可以将该对象的 id 存储在数据库中,然后在需要时获取结果:
# views.py
from celery.result import AsyncResult
from .tasks import my_task # 导入任务
def get_task_result(request, task_id):
result = AsyncResult(task_id)
if result.ready():
# 任务已完成,你可以获取结果
result_value = result.get()
else:
# 任务尚未完成,你可以采取其他操作或等待
result_value = None
return render(request, 'result_template.html', {'result_value': result_value})
在这个示例中,我们导入了 AsyncResult 类,并使用 task_id 获取任务结果。如果任务已完成,我们可以使用 result.get() 获取结果值。
周期性任务
要使用 Celery 实现周期性任务,你需要配置 Celery 定时任务调度器(Celery Beat)。Celery Beat 是 Celery 的一个组件,它允许你安排定时任务,并在指定的时间间隔内运行任务。以下是如何实现周期性任务的步骤:
1) 配置 Celery Beat
首先,确保你的 Celery 项目中已经配置了 Celery Beat。通常,你需要创建一个名为 celery.py 的文件,其中包含 Celery 和 Celery Beat 的配置。确保在配置中设置 CELERY_BROKER_URL 和 CELERY_RESULT_BACKEND,以及指定调度器:
# celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
# 设置 Django 的环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')
app = Celery('your_project_name')
# 配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动从所有注册的应用中发现异步任务
app.autodiscover_tasks()
# 定义定时任务
app.conf.beat_schedule = {
'my-periodic-task': {
'task': 'your_app.tasks.my_periodic_task', # 任务路径
'schedule': crontab(minute=0, hour=0), # 设置定时规则,每天零点执行
},
}
在上面的示例中,我们配置了一个名为 my-periodic-task 的周期性任务,它将每天在午夜执行(每天零点)。
2)创建周期性任务
在 your_app/tasks.py 文件中创建周期性任务的实际代码:
# your_app/tasks.py
from celery import shared_task
@shared_task
def my_periodic_task():
# 这是一个周期性任务的示例
# 在这里执行你的周期性任务代码
result = "Periodic task executed."
return result
在这个示例中,my_periodic_task 函数是一个周期性任务的示例,可以在其中执行任何周期性操作。
3)运行 Celery Beat
最后,运行 Celery Beat 以启动定时任务调度器:
celery -A your_project_name beat
现在,my-periodic-task 将在每天零点执行。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)