如何在 Ubuntu 20.04 上安装 Apache Kafka
作者选择了 Write for DOnations 计划。
介绍
兔子MQ。虽然它通常用作发布/订阅 消息传递系统,但许多组织也将它用于日志聚合,因为它为已发布的消息提供持久存储。
发布/订阅消息系统允许一个或多个生产者发布消息,而无需考虑消费者的数量或他们将如何处理消息。已订阅的客户端会自动收到有关更新和新消息创建的通知。该系统比客户端定期轮询以确定是否有新消息可用的系统更有效和可扩展。
在本教程中,您将在 Ubuntu 20.04 上安装和配置 Apache Kafka 2.8.2。
先决条件
要继续,您将需要:
- 初始服务器设置指南(如果您没有设置非根用户)。安装少于 4GB RAM 可能会导致 Kafka 服务失败。
- 如何在 Ubuntu 20.04 上使用 APT 安装 Java。 Kafka 是用 Java 编写的,因此它需要 JVM。
第 1 步 — 为 Kafka 创建用户
因为 Kafka 可以通过网络处理请求,所以您的第一步是为该服务创建一个专用用户。如果有人破坏了 Kafka 服务器,这可以最大限度地减少对 Ubuntu 机器的损害。您将在此步骤中创建一个专用的 kafka
用户。
以非根 sudo
用户身份登录到您的服务器,然后创建一个名为 kafka
的用户:
- sudo adduser kafka
按照提示设置密码,创建kafka
用户。
接下来,使用 adduser
命令将 kafka
用户添加到 sudo
组。您需要这些权限来安装 Kafka 的依赖项:
- sudo adduser kafka sudo
您的 kafka
用户现已准备就绪。使用su
登录kafka
账号:
- su -l kafka
现在您已经创建了一个特定于 Kafka 的用户,您可以下载并提取 Kafka 二进制文件了。
第 2 步 — 下载并解压 Kafka 二进制文件
在此步骤中,您将下载 Kafka 二进制文件并将其解压缩到 kafka
用户主目录中的专用文件夹中。
首先,在 /home/kafka
中创建一个名为 Downloads
的目录来存储您的下载:
- mkdir ~/Downloads
使用 curl
下载 Kafka 二进制文件:
- curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz
创建一个名为 kafka
的目录并移动到该目录。您将使用此目录作为 Kafka 安装的基本目录:
- mkdir ~/kafka && cd ~/kafka
使用 tar
命令提取您下载的存档:
- tar -xvzf ~/Downloads/kafka.tgz --strip 1
您指定 --strip 1
标志以确保存档的内容被提取到 ~/kafka/
本身而不是另一个目录(例如 ~/kafka /kafka_2.13-2.8.2/
) 在里面。
现在您已经成功下载并提取了二进制文件,您可以开始配置您的 Kafka 服务器。
第 3 步 — 配置 Kafka 服务器
Kafka topic 是可以向其发布消息的类别、组或提要名称。但是,Kafka 的默认行为不允许您删除主题。要对此进行修改,您必须编辑配置文件,您将在此步骤中执行此操作。
Kafka 的配置选项在 server.properties
中指定。使用 nano
或您喜欢的编辑器打开此文件:
- nano ~/kafka/config/server.properties
首先,添加一个允许您删除 Kafka 主题的设置。将以下行添加到文件底部:
delete.topic.enable = true
其次,您将通过修改 log.dirs
属性来更改存储 Kafka 日志的目录。找到 log.dirs
属性并将现有路由替换为突出显示的路由:
log.dirs=/home/kafka/logs
保存并关闭文件。
现在您已经配置了 Kafka,您可以创建 systemd
单元文件以在启动时运行和启用 Kafka 服务器。
第 4 步 — 创建 systemd 单元文件并启动 Kafka 服务器
在本节中,您将为 Kafka 服务创建 systemd
单元文件。这些文件将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,例如启动、停止和重新启动 Kafka。
Kafka 使用官方 Zookeeper 文档。您将使用 Zookeper 作为这些单元文件的服务。
为 zookeeper
创建单元文件:
- sudo nano /etc/systemd/system/zookeeper.service
在文件中输入以下单位定义:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
[Unit]
部分指定 Zookeeper 需要网络和文件系统才能启动。
[Service]
部分指定 systemd
应该使用 zookeeper-server-start.sh
和 zookeeper-server-stop.sh
用于启动和停止服务的 shell 文件。它还指定如果 Zookeeper 异常退出,则应重新启动它。
添加此内容后,保存并关闭文件。
接下来,为 kafka
创建 systemd 服务文件:
- sudo nano /etc/systemd/system/kafka.service
在文件中输入以下单位定义:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
[Unit]
部分指定此单元文件依赖于 zookeeper.service
,这将确保 zookeeper
在 时自动启动kafka
服务启动。
[Service]
部分指定 systemd
应该使用 kafka-server-start.sh
和 kafka-server-stop.sh
用于启动和停止服务的 shell 文件。它还指定如果 Kafka 异常退出,则应重新启动它。
保存并关闭文件。
现在您已经定义了单位,使用以下命令启动 Kafka:
- sudo systemctl start kafka
为确保服务器已成功启动,请检查 kafka
单元的日志:
- sudo systemctl status kafka
您将收到如下输出:
Output● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset>
Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s ago
Main PID: 17770 (sh)
Tasks: 69 (limit: 4677)
Memory: 321.9M
CGroup: /system.slice/kafka.service
├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho>
└─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>
您现在有一个 Kafka 服务器侦听端口 9092
,这是 Kafka 服务器使用的默认端口。
您已经启动了 kafka
服务。但是如果你重启你的服务器,Kafka 不会自动重启。要在服务器启动时启用 kafka
服务,请运行以下命令:
- sudo systemctl enable zookeeper
您将收到一个符号链接已创建的响应:
OutputCreated symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.
然后运行这个命令:
- sudo systemctl enable kafka
您将收到一个符号链接已创建的响应:
OutputCreated symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
在此步骤中,您启动并启用了 kafka
和 zookeeper
服务。在下一步中,您将检查 Kafka 安装。
第 5 步 — 测试 Kafka 安装
在此步骤中,您将测试 Kafka 安装。您将发布和使用 Hello World 消息以确保 Kafka 服务器按预期运行。
在 Kafka 中发布消息需要:
- 生产者,负责将记录和数据发布到主题。
- 消费者,从主题中读取消息和数据。
首先,创建一个名为 TutorialTopic
的主题:
- ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用 kafka-console-producer.sh
脚本从命令行创建生产者。它需要 Kafka 服务器的主机名、端口和主题作为参数。
您将收到主题已创建的回复:
OutputCreated topic TutorialTopic.
现在将字符串 Hello, World
发布到 TutorialTopic
主题:
- echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,使用 kafka-console-consumer.sh
脚本创建一个 Kafka 消费者。它期望 ZooKeeper 服务器的主机名和端口以及主题名称作为参数。以下命令使用来自 TutorialTopic
的消息。请注意 --from-beginning
标志的使用,它允许使用在消费者启动之前发布的消息:
- ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您将在终端中收到 Hello, World
响应:
OutputHello, World
该脚本将继续运行,等待更多消息发布。要对此进行测试,请打开一个新的终端窗口并登录到您的服务器。请记住以您的 kafka
用户身份登录:
- su -l kafka
在这个新终端中,启动生产者发布第二条消息:
- echo "Hello World from Sammy at DigitalOcean!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
此消息将加载到原始终端中消费者的输出中:
OutputHello, World
Hello World from Sammy at DigitalOcean!
完成测试后,按 CTRL+C
停止原始终端中的使用者脚本。
您现在已经在 Ubuntu 20.04 上安装并配置了 Kafka 服务器。在下一步中,您将执行一些快速任务来加强 Kafka 服务器的安全性。
第 6 步 — 加固 Kafka 服务器
安装完成后,您可以删除 kafka
用户的管理员权限并加固 Kafka 服务器。
在执行此操作之前,注销并以任何其他非 root sudo
用户身份重新登录。如果您仍在运行开始本教程时使用的同一个 shell 会话,请键入 exit
。
从 sudo 组中删除 kafka
用户:
- sudo deluser kafka sudo
为了进一步提高 Kafka 服务器的安全性,请使用 passwd
命令锁定 kafka
用户的密码。此操作确保没有人可以使用此帐户直接登录服务器:
- sudo passwd kafka -l
-l
标志锁定更改用户密码的命令 (passwd
)。
此时,只有 root
或 sudo
用户可以使用以下命令以 kafka
身份登录:
- sudo su - kafka
以后,如果你想解锁更改密码的能力,使用 passwd
和 -u
选项:
- sudo passwd kafka -u
您现在已经成功地限制了 kafka
用户的管理员权限。您已准备好开始使用 Kafka。您可以选择执行下一步,这会将 KafkaT 添加到您的系统中。
第 7 步 — 安装 KafkaT(可选)
KafkaT 的开发旨在提高您查看有关 Kafka 集群的详细信息以及从命令行执行某些管理任务的能力。因为它是一个 Ruby gem,所以您需要 Ruby 才能使用它。您还需要 build-essential
包来构建 KafkaT
所依赖的其他 gem。
使用 apt
安装 Ruby 和 build-essential
包:
- sudo apt install ruby ruby-dev build-essential
您现在可以使用 gem
命令安装 KafkaT:
- sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
Wno-error=format-overflow
编译标志是在 kafkat
安装过程中抑制 Zookeeper 的警告和错误所必需的。
安装完成后,您将收到已完成的响应:
Output...
Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds
8 gems installed
KafkaT 使用 .kafkatcfg
作为配置文件来确定 Kafka 服务器的安装目录和日志目录。它还应该有一个将 KafkaT 指向您的 ZooKeeper 实例的入口。
创建一个名为 .kafkatcfg
的新文件:
- nano ~/.kafkatcfg
添加以下行以指定有关 Kafka 服务器和 Zookeeper 实例的所需信息:
{
"kafka_path": "~/kafka",
"log_path": "/home/kafka/logs",
"zk_path": "localhost:2181"
}
保存并关闭文件。您现在可以使用 KafkaT 了。
要查看有关所有 Kafka 分区的详细信息,请尝试运行此命令:
- kafkat partitions
您将收到以下输出:
Output[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible.
/var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated
...
Topic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
...
输出将包括 TutorialTopic
和 __consumer_offsets
,这是 Kafka 用于存储客户端相关信息的内部主题。您可以安全地忽略以 __consumer_offsets
开头的行。
要了解有关 KafkaT 的更多信息,请参阅其 GitHub 存储库。
结论
您现在可以在 Ubuntu 服务器上安全地运行 Apache Kafka。您可以使用 Kafka 客户端将 Kafka 集成到您最喜欢的编程语言中。
要了解有关 Kafka 的更多信息,您还可以查阅其文档。