如何使用 Celery 和 RabbitMQ 在 Ubuntu VPS 上排队任务如何使用 Celery 和 RabbitMQ 在 Ubuntu VPS 上排队任务如何使用 Celery 和 RabbitMQ 在 Ubuntu VPS 上排队任务如何使用 Celery 和 RabbitMQ 在 Ubuntu VPS 上排队任务
  • 文章
  • 正则表达式
    • 工具
  • 登录
找到的结果: {phrase} (显示: {results_count} 共: {results_count_total})
显示: {results_count} 共: {results_count_total}

加载更多搜索结果...

搜索范围
模糊匹配
搜索标题
搜索内容
发表 admin at 2025年2月28日
类别
  • 未分类
标签

如何使用 Celery 和 RabbitMQ 在 Ubuntu VPS 上排队任务

介绍

异步或非阻塞处理是一种将某些任务的执行与程序的主要流程分开的方法。这为您提供了几个优势,包括允许面向用户的代码不间断地运行。

消息传递是程序组件可以用来通信和交换信息的一种方法。它可以同步或异步实现,并且可以允许离散进程毫无问题地进行通信。对于此类用途,消息传递通常作为传统数据库的替代方案来实现,因为消息队列通常会实现其他功能,提供更高的性能,并且可以完全驻留在内存中。

Celery 是一个建立在异步消息传递系统上的任务队列。它可以用作可以转储编程任务的桶。通过任务的程序可以继续执行并响应地运行,稍后它可以轮询 celery 以查看计算是否完成并检索数据。

虽然 celery 是用 Python 编写的,但它的协议可以用任何语言实现。它甚至可以通过 webhooks 与其他语言一起使用。

通过在程序环境中实施作业队列,您可以轻松卸载任务并继续处理来自用户的交互。这是提高应用程序响应能力并且在执行长时间运行的计算时不会被锁定的简单方法。

在本指南中,我们将使用 RabbitMQ 作为消息系统在 Ubuntu 12.04 VPS 上安装和实现 celery 作业队列。

安装组件

安装芹菜

Celery 是用 Python 编写的,因此,它很容易安装,就像我们处理常规 Python 包一样。

我们将通过创建虚拟环境来安装我们的消息系统来遵循处理 Python 包的推荐过程。这有助于我们保持环境稳定而不影响更大的系统。

从 Ubuntu 的默认存储库安装 Python 虚拟环境包:

sudo apt-get update
sudo apt-get install python-virtualenv

我们将创建一个消息传递目录,我们将在其中实现我们的系统:

mkdir ~/messaging
cd ~/messaging

我们现在可以创建一个虚拟环境,我们可以使用以下命令在其中安装 celery:

virtualenv --no-site-packages venv

配置虚拟环境后,我们可以通过键入以下内容来激活它:

source venv/bin/activate

您的提示将更改以反映您现在正在我们上面创建的虚拟环境中操作。这将确保我们的 Python 包安装在本地而不是全局。

如果在任何时候我们需要停用环境(不是现在),您可以键入:

deactivate

现在我们已经激活了环境,我们可以使用 pip 安装 celery:

pip install celery

安装 RabbitMQ

Celery 需要一个消息传递代理来处理来自外部源的请求。这个代理人被称为“经纪人”。

可供选择的代理有很多选项,包括关系数据库、NoSQL 数据库、键值存储和实际的消息传递系统。

我们将配置 celery 以使用 RabbitMQ 消息系统,因为它提供了强大、稳定的性能并且与 celery 交互良好。这是一个很好的解决方案,因为它包含的功能与我们的预期用途非常吻合。

我们可以通过 Ubuntu 的存储库安装 RabbitMQ:

sudo apt-get install rabbitmq-server

安装后,RabbitMQ 服务会在我们的服务器上自动启动。

创建芹菜实例

为了使用 celery 的任务队列功能,我们在安装后的第一步必须是创建一个 celery 实例。这是一个简单的导入包的过程,创建一个“app”,然后设置 celery 将能够在后台执行的任务。

让我们在我们的消息传递目录中创建一个名为 tasks.py 的 Python 脚本,我们可以在其中定义我们的工作人员可以执行的任务。

sudo nano ~/messaging/tasks.py

我们应该做的第一件事是从 celery 包中导入 Celery 函数:

from celery import Celery

之后,我们可以创建一个连接到默认 RabbitMQ 服务的 celery 应用程序实例:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

Celery 函数的第一个参数是将添加到任务前面以识别它们的名称。

backend 参数是一个可选参数,如果您希望查询后台任务的状态或检索其结果,则该参数是必需的。

如果您的任务只是做一些工作然后退出的简单函数,没有返回一个有用的值供您的程序使用,您可以省略这个参数。如果只有您的某些任务需要此功能,请在此处启用它,我们可以根据具体情况进一步禁用它。

broker 参数指定连接到我们的代理所需的 URL。在我们的例子中,这是在我们的服务器上运行的 RabbitMQ 服务。 RabbitMQ 使用名为“amqp”的协议运行。如果 RabbitMQ 在其默认配置下运行,则 celery 可以使用 amqp:// 方案以外的任何其他信息进行连接。

构建芹菜任务

仍然在这个文件中,我们现在需要添加我们的任务。

每个 celery 任务都必须使用装饰器 @app.task 引入。这允许 celery 识别可以将其队列函数添加到的函数。在每个装饰器之后,我们只需创建一个我们的工人可以运行的函数。

我们的第一个任务将是一个简单的函数,它打印出一个字符串到控制台。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def print_hello():
    print 'hello there'

因为这个函数不返回任何有用的信息(而是将它打印到控制台),我们可以告诉 celery 不要使用后端来存储关于这个任务的状态信息。这在引擎盖下不那么复杂,需要的资源也更少。

<前>

app=Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)

接下来,我们将添加另一个生成素数的函数(取自 RosettaCode)。这可能是一个长时间运行的进程,因此它是一个很好的例子,说明我们在等待结果时如何处理异步工作进程。

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task(ignore_result=True)
def print_hello():
    print 'hello there'

@app.task
def gen_prime(x):
    multiples = []
    results = []
    for i in xrange(2, x+1):
        if i not in multiples:
            results.append(i)
            for j in xrange(i*i, x+1, i):
                multiples.append(j)
    return results

因为我们关心这个函数的返回值是什么,并且因为我们想知道它何时完成(以便我们可以使用结果等),所以我们没有将 ignore_result 参数添加到这第二个任务。

保存并关闭文件。

启动 Celery 工作进程

我们现在可以启动一个工作进程,它将能够接受来自应用程序的连接。它将使用我们刚刚创建的文件来了解它可以执行的任务。

启动工作实例就像使用 celery 命令调用应用程序名称一样简单。我们将在字符串末尾包含一个 \& 字符,以将我们的工作进程置于后台:

celery worker -A tasks &

这将启动一个应用程序,然后将其与终端分离,允许您继续将其用于其他任务。

如果你想启动多个 worker,你可以通过使用 -n 参数命名每个 worker 来实现:

celery worker -A tasks -n one.%h &
celery worker -A tasks -n two.%h &

%h 将在命名 worker 时替换为主机名。

要停止工作人员,您可以使用 kill 命令。我们可以查询进程id,然后根据这些信息淘汰worker。

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill

这将允许工作人员在退出之前完成其当前任务。

如果你希望在不等待他们完成任务的情况下关闭所有 worker,你可以执行:

ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

使用队列处理工作

我们可以使用我们生成的工作进程在后台完成我们程序的工作。

我们将探索 Python 解释器中的不同选项,而不是创建一个完整的程序来演示它是如何工作的:

python

在提示符下,我们可以将我们的功能导入到环境中:

from tasks import print_hello
from tasks import gen_prime

如果您测试这些功能,它们似乎没有任何特殊功能。第一个函数按预期打印一行:

print_hello()
hello there

第二个函数返回素数列表:

primes = gen_prime(1000)
print primes

如果我们给第二个函数一个更大范围的数字来检查,执行会在计算时挂起:

primes = gen_prime(50000)

输入“CTRL-C”停止执行。这个过程显然不是在后台计算。

要访问后台工作程序,我们需要使用 .delay 方法。 Celery 用额外的功能包装我们的函数。此方法用于将函数传递给 worker 以执行。它应该立即返回:

primes = gen_prime.delay(50000)

这个任务现在由我们之前启动的工人执行。因为我们为应用程序配置了一个 backend 参数,所以我们可以检查计算状态并访问结果。

要检查任务是否完成,我们可以使用 .ready 方法:

primes.ready()
False

值为 \False 表示任务仍在运行,结果尚不可用。当我们得到值为 \True 时,我们可以对答案做一些事情。

primes.ready()
True

我们可以使用 .get 方法获取值。

如果我们已经验证该值是使用 .ready 方法计算的,那么我们可以像这样使用该方法:

print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523,
. . .

但是,如果您在调用 .get 之前没有使用过 .ready 方法,您很可能想要添加一个“超时”选项,这样您的程序就不会t 被迫等待结果,这会破坏我们实施的目的:

print primes.get(timeout=2)

如果超时,这将引发异常,您可以在程序中处理。

结论

虽然这些信息足以让您开始在您的程序中使用 celery,但它只是触及该库全部功能的皮毛。 Celery 允许您以有趣的方式将后台任务串联在一起、对任务进行分组以及组合函数。

虽然 celery 是用 Python 编写的,但它可以通过 webhooks 与其他语言一起使用。这使得将任务移至后台非常灵活,无论您选择何种语言。

贾斯汀·埃林伍德

©2015-2025 艾丽卡 support@alaica.com