Celery 异步框架执行定时任务
Celery是什么?
-
Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块 专注于实时处理的异步任务队列 同时也支持任务调度 可做异步任务,可做定时任务
celery是python的一个异步框架,架构如下:
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成.
启动celery服务(配置好broker和backend),手动或自动添加任务到broker中,broker将任务分发给worker,worker后台异步执行任务,执行完成后将结果存放到backend中
一.celery环境搭建:
1.安装celery:
# 已安装Python环境 pip3 install celery # 验证安装 查看版本 celery --version
2.安装redis(也可以使用rabbit MQ)
使用redis作为backend和broker,需要本机安装redis,并启动redis-server,还需要安装Python第三方模块redis
pip install redis
二.celery使用:
2.1 celery项目目录结构:
(venv) jeanettian@KP-MBP-1179 src % tree celery_task -I __pycache__ celery_task ├── __init__.py ├── celery.py ├── celery_config.py ├── task.py └── test_celery.py
注意:此处定义的celery工程名为celery_task,celeryt_task下必须要有__init__.py文件.
2.1.1 celery.py文件内容:
from __future__ import absolute_import
from celery import Celery
app = Celery("celery_task.task",
# redis作为消息中间件,broker用来存储待执行的任务
broker="redis://127.0.0.1:6379/4",
# backend用来存储执行结果回调
backend="redis://127.0.0.1:6379/5",
# 注意必须要有include,用来指定任务
include=[celery_task.task]
)
app.config_from_object("celery_task.celery_config")
2.1.2 celery_config.py文件内容:
from datetime import timedelta
from celery.schedules import crontab
task_serializer = json
result_serializer = json
accept_content = [json]
# 时区设置
timezone = Asia/Shanghai
# celery默认开启自己的日志
# False表示不关闭
worker_hijack_root_logger = False
# 存储结果过期时间,过期后自动删除
# 单位为秒
result_expires = 60 * 60 * 24
# 需要执行任务的配置
beat_schedule = {
celery_task.tasks: {
task: celery_task.task.add,
schedule: timedelta(seconds=20), # 启动每20s执行一次
args: (2, 2) # 此处的args参数会传给定时任务指定函数的形参。
},
# celery_app.tasks: {
# task: celery_app.task.task2,
# schedule: crontab(minute=*/2),
# args: ()
# }
}
2.1.3 task.py文件内容:
from .celery import app
@app.task
def add(x, y):
print("+" * 50)
return f"result is: {x+y}"
@app.task
def mul(x, y):
print("-" * 50)
return x - y
@app.task
def sum_x(numbers):
print("*" * 50)
return sum(numbers)
2.1.4 test_celery.py文件内容:
from .task import add, mul, sum_x add.delay() mul.delay(2, 2) sum_x.delay([2, 2, 2, 2])
2.2 进入celery_task工程所在目录,执行命令启动worker执行定时任务
(此命令为即为多workers并发执行,几个wokers,就开几个bash执行此命令):
celery -A celery_task worker -l INFO
2.3 另开一个bash,进入celery_task工程所在目录,执行命令启动心跳监听分发任务:
celery -A celery_task beat
2.4 查看任务执行结果:
每隔20s执行一次任务.
上一篇:
Python 安装包管理工具 pip
