Celery worker 是一个简单、灵活、可靠的分布式系统,可以处理大量消息,同时为操作提供维护此类系统所需的工具。在本教程中,我们将学习如何使用 Flask 和 Redis 实现 Celery。
在我们深入学习本教程之前,让我们简要介绍一下 Celery 工人。
什么是芹菜工人?
Celery是一个任务管理系统,您可以使用它在不同的机器或线程之间分配任务。它允许您拥有一个任务队列,并且可以实时调度和处理任务。这个任务队列由不断寻找新工作来执行的工作人员监控。
让我们试着通过一个例子来理解芹菜工人的角色。想象一下,您在汽车服务站进行汽车维修。您在接待处提出请求,让您的汽车得到维修。
-
信息
您在接待处提出请求,让您的汽车得到维修。此请求是一条消息。 -
任务
维修您的汽车是要执行的任务。 -
任务队列
和你一样,车站也会有其他顾客。所有等待服务的客户汽车都是任务队列的一部分。 -
工人
车站的机械师是执行任务的工人。
Celery 利用代理在多个工作人员之间分配任务并管理任务队列。在上面的示例中,将您的汽车服务请求从接待处带到车间的服务员是经纪人。代理的作用是在客户端和工作人员之间传递消息。
消息代理的一些示例包括 Redis、RabbitMQ、Apache Kafka 和 Amazon SQS。
在本教程中,我们将使用 Redis 作为代理,Celery 作为工作线程,Flask 作为网络服务器。我们将通过两个从简单到复杂任务的示例向您介绍 Celery 的概念。
让我们开始安装 Celery、Flask 和 Redis。
先决条件
要学习本教程,您需要具备 Flask 的基本知识。你不需要对 Redis 有任何了解;但是,熟悉它会很好。
安装 Flask、Celery 和 Redis
首先,使用 Python 3 和升级版 pip 使用新的虚拟环境启动一个新项目:
$ python3 -m venv venv $ . venv/bin/activate $ pip install --upgrade pip
并安装 Flask、Celery 和 Redis:(以下命令包括我们在本教程中使用的版本)
$ pip install celery == 4.4.1 Flask == 1.1.1 redis == 3.4.1
在本地运行 Redis
使用以下命令在本地机器(假设 Linux 或 macOS)上运行 Redis 服务器:
$ wget http://download.redis.io/redis-stable.tar.gz $ tar xvzf redis-stable.tar.gz $ rm redis-stable.tar.gz $ cd redis-stable $ make
现在编译完成了。一些二进制文件在redis-stable/
中的src
目录中可用,例如redis-server
(这是您需要运行的 Redis 服务器)和redis-cli
(这是您可能需要与 Redis 对话的命令行客户端)。
要全局运行服务器(从您机器上的任何位置)而不是每次都移动到src
目录,您可以使用以下命令:
$ make install
redis-server
的二进制文件现在在您的/usr/local/bin
目录中可用。您可以使用以下命令运行服务器:
$ redis-server
服务器现在在端口 6379(默认端口)上运行。让它在单独的终端上运行。
我们需要另外两个新的终端选项卡,以便以后能够运行 Web 服务器和 Celery worker。但在运行 Web 服务器之前,让我们将 Flask 与 Celery 集成,让我们的 Web 服务器准备好运行。
集成烧瓶和芹菜
将 Flask 与 Celery 集成起来很容易。让我们从简单开始,编写以下导入和实例化代码:
from flask import Flask from celery import Celery app = Flask ( __name__ ) app.config[ "CELERY_BROKER_URL" ] = "redis://localhost:6379" celery = Celery ( app.name, broker = app.config[ "CELERY_BROKER_URL" ]) celery.conf.update ( app.config )
app
是您将用于运行 Web 服务器的 Flask 应用程序对象。 celery
是您将用于运行 Celery worker 的 Celery 对象。
请注意,此处的CELERY_BROKER_URL
配置设置为您在计算机上本地运行的 Redis 服务器。您可以将其更改为您想要使用的任何其他消息代理。
celery
对象将应用程序名称作为参数,并将broker
参数设置为您在配置中指定的参数。
要将 Flask 配置添加到 Celery 配置中,请使用conf.update
方法对其进行更新。
现在,是时候运行 Celery worker 了。
运行 Celery worker
在新的终端选项卡中,运行以下命令:
( venv ) $ celery -A app.celery worker --loglevel = info
其中celery
是您在本教程中使用的 Celery 版本(4.4.1),带有-A
选项来指定要使用的 celery 实例(在我们的例子中,它是app.py
文件中的celery
,所以它是 app.py 文件中的 celery app.celery
), worker
是运行 worker 的子命令, --loglevel=info
将详细日志级别设置为INFO
。
您将看到类似以下内容:
在运行 Web 服务器之前,让我们深入了解 Flask 应用程序内部的逻辑。
运行 Flask Web 服务器
让我们首先使用task()
方法添加 celery 任务装饰器,并将我们想要作为 Celery 任务运行的函数包装到该装饰器中:
@celery.task () def add ( x, y ) : return x + y
所以这个简单的add
函数现在是一个 Celery 任务。然后添加到您的 Web 服务器的路由以处理对根 URL 的 GET 请求,并将一对数字作为参数添加到 add 函数:
@app.route ( '/' ) def add_task () : for i in range ( 10000 ) : add.delay ( i, i ) return jsonify ({ 'status' : 'ok' })
注意:不要忘记从flask
模块中导入jsonify
函数,以便能够返回 JSON 数据。
现在,使用以下命令运行 Web 服务器:
$flask run
当您在浏览器中检查localhost:5000
URL 时,您应该会看到以下响应:
{ "status" : "ok" }
请注意,此响应不会立即显示在您的浏览器上。这是因为 Redis 服务器监听客户端并将任务排入该add_task
函数的任务队列。然后,Celery 工作人员必须等待每个任务开始执行。
这演示了 Celery 如何利用 Redis 将任务分配给多个工作人员并管理任务队列。
您可以在此GitHub 存储库中找到本教程的完整代码示例。
结论
在本教程中,您了解了 Celery worker 是什么以及如何在 Flask 应用程序中将它与 Redis 等消息代理一起使用。
Celery worker 用于将数据密集型进程卸载到后台,从而提高应用程序的效率。 Celery 具有高可用性,单个 Celery 工作人员每分钟可以处理数百万个任务。
随着 Celery 工人大规模执行关键任务,监控他们的表现也很重要。借助一些终端命令,Celery 可用于检查和管理工作节点。如果您需要全面了解 Celery 集群的性能,您可以使用SigNoz – 一个开源可观察性平台。
它可以监控应用程序的所有组件——从应用程序指标和数据库性能到基础设施监控。例如,它可用于监控 Python 应用程序的性能问题和错误。
SigNoz 是完全开源的,可以托管在您的基础设施中。您可以通过访问其 GitHub 存储库来试用 SigNoz ?
如果您想了解有关使用 SigNoz 监控 Python 应用程序的更多信息,请随时点击以下链接:
原文: https://dev.to/signoz/celery-worker-tutorial-on-how-to-set-up-with-flask-redis-532d