一、概述
使用Apache Kafka 的客户端应用程序通常属于两类,即生产者和消费者。生产者和消费者都要求底层Kafka 服务器启动并运行,然后才能分别开始生产和消费工作。
在本文中,我们将学习一些确定Kafka 服务器是否正在运行的策略。
2. 使用Zookeeper 命令
找出是否存在活动代理的最快方法之一是使用Zookeeper 的dump
命令。dump
命令是可用于管理Zookeeper 服务器的4LW命令之一**。**
让我们继续使用nc
命令通过正在侦听2181 端口的Zookeeper 服务器发送转储命令:
$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0
在执行命令时,我们会看到在Zookeeper 服务器上注册的临时代理ID 列表。如果不存在临时ID,则没有任何代理节点正在运行。
此外,重要的是要注意,需要在zookeeper.properties
或zoo.cfg
配置文件中通常可用的配置中明确允许dump
命令:
lw.commands.whitelist=dump
或者,我们也可以使用Zookeeper API 来查找活动代理列表。
3. 使用Apache Kafka 的AdminClient
如果我们的生产者或消费者是Java 应用程序,那么我们可以使用Apache Kafka 的AdminClient
类来确定Kafka 服务器是否已启动。
让我们定义KafkaAdminClient
类来包装AdminClient
类的实例,以便我们可以快速测试我们的代码:
public class KafkaAdminClient {
private final AdminClient client;
public KafkaAdminClient(String bootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
props.put("request.timeout.ms", 3000);
props.put("connections.max.idle.ms", 5000);
this.client = AdminClient.create(props);
}
}
接下来,让我们在KafkaAdminClient
类中定义verifyConnection()
方法来验证客户端是否可以连接到正在运行的代理服务器:
public boolean verifyConnection() throws ExecutionException, InterruptedException {
Collection<Node> nodes = this.client.describeCluster()
.nodes()
.get();
return nodes != null && nodes.size() > 0;
}
最后,让我们通过连接到正在运行的Kafka 集群来测试我们的代码:
@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
boolean alive = kafkaAdminClient.verifyConnection();
assertThat(alive).isTrue();
}
4. 使用kcat
实用程序
我们可以使用kcat
(以前的kafkacat
)命令来查看是否有正在运行的Kafka 代理节点。为此,让我们使用-L
选项来显示现有主题的元数据:
$ kcat -b localhost:9092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:9092/bootstrap):
1 brokers:
broker 0 at 192.168.1.53:9092 (controller)
1 topics:
topic "demo-topic" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
接下来,让我们在代理节点关闭时执行相同的命令:
$ kcat -b localhost:9092 -t demo-topic -L -m 1
%3|1660579562.937|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)
对于这种情况,我们会收到“连接被拒绝”错误,因为没有正在运行的代理节点。此外,我们必须注意,通过使用-m
选项将请求超时限制为1 秒,我们能够快速失败。
5. 使用UI 工具
对于不需要自动检查的实验性POC 项目,我们可以依靠诸如Offset Explorer之类的UI 工具。但是,如果我们要验证企业级Kafka 客户端的代理节点状态,不建议使用这种方法。
让我们使用Offset Explorer 使用Zookeeper 主机和端口详细信息连接到Kafka 集群:
我们可以在左侧窗格中看到正在运行的代理列表。而已。我们只需单击一下按钮即可获得它。
六,结论
在本教程中,我们探索了一些使用Zookeeper 命令、Apache 的AdminClient
和kcat
实用程序的命令行方法,然后使用基于UI 的方法来确定Kafka 服务器是否已启动。
0 评论