Celery 异步框架执行定时任务

Celery是什么?

    Celery是Python开发的简单、灵活可靠的、处理大量消息的分布式任务调度模块 专注于实时处理的异步任务队列 同时也支持任务调度 可做异步任务,可做定时任务

celery是python的一个异步框架,架构如下:

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成.

启动celery服务(配置好broker和backend),手动或自动添加任务到broker中,broker将任务分发给worker,worker后台异步执行任务,执行完成后将结果存放到backend中

一.celery环境搭建:

1.安装celery:

# 已安装Python环境
pip3 install celery

# 验证安装 查看版本
celery --version

2.安装redis(也可以使用rabbit MQ)

使用redis作为backend和broker,需要本机安装redis,并启动redis-server,还需要安装Python第三方模块redis

pip install redis

二.celery使用:

2.1 celery项目目录结构:

(venv) jeanettian@KP-MBP-1179 src % tree celery_task -I __pycache__
celery_task
├── __init__.py
├── celery.py
├── celery_config.py
├── task.py
└── test_celery.py

注意:此处定义的celery工程名为celery_task,celeryt_task下必须要有__init__.py文件.

2.1.1 celery.py文件内容:

from __future__ import absolute_import
from celery import Celery

app = Celery("celery_task.task",
             # redis作为消息中间件,broker用来存储待执行的任务
             broker="redis://127.0.0.1:6379/4",
             # backend用来存储执行结果回调
             backend="redis://127.0.0.1:6379/5",
             # 注意必须要有include,用来指定任务
             include=[celery_task.task]
             )
app.config_from_object("celery_task.celery_config")

2.1.2 celery_config.py文件内容:

from datetime import timedelta

from celery.schedules import crontab

task_serializer = json
result_serializer = json
accept_content = [json]
# 时区设置
timezone = Asia/Shanghai
# celery默认开启自己的日志
# False表示不关闭
worker_hijack_root_logger = False
# 存储结果过期时间,过期后自动删除
# 单位为秒
result_expires = 60 * 60 * 24

# 需要执行任务的配置
beat_schedule = {
    celery_task.tasks: {
        task: celery_task.task.add,
        schedule: timedelta(seconds=20),  # 启动每20s执行一次
        args: (2, 2)  # 此处的args参数会传给定时任务指定函数的形参。
    },
    # celery_app.tasks: {
    #     task: celery_app.task.task2,
    #     schedule: crontab(minute=*/2),
    #     args: ()
    # }
}

2.1.3 task.py文件内容:

from .celery import app


@app.task
def add(x, y):
    print("+" * 50)
    return f"result is: {x+y}"


@app.task
def mul(x, y):
    print("-" * 50)
    return x - y


@app.task
def sum_x(numbers):
    print("*" * 50)
    return sum(numbers)

2.1.4 test_celery.py文件内容:

from .task import add, mul, sum_x

add.delay()
mul.delay(2, 2)
sum_x.delay([2, 2, 2, 2])

2.2 进入celery_task工程所在目录,执行命令启动worker执行定时任务

(此命令为即为多workers并发执行,几个wokers,就开几个bash执行此命令):

celery -A celery_task worker -l INFO

2.3 另开一个bash,进入celery_task工程所在目录,执行命令启动心跳监听分发任务:

celery -A celery_task beat

2.4 查看任务执行结果:

每隔20s执行一次任务.

经验分享 程序员 微信小程序 职场和发展