拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 如何清除Apache Kafka主题数据?

如何清除Apache Kafka主题数据?

白鹭 - 2021-11-24 683 0 0

1.概述

在本文中,我们将探讨一些清除Apache Kafka主题中的数据的策略

2.清理方案

在学习清理数据的策略之前,让我们先了解一个需要清除活动的简单方案。

2.1场景

经过配置的保留时间后,Apache Kafka中的消息会自动过期。但是,在某些情况下,我们可能希望消息立即删除。

假设在应用程序代码中引入了一个缺陷,该缺陷在Kafka主题中生成消息。到集成错误修复程序时, Kafka主题中已经有许多损坏的消息可供使用。

此类问题在开发环境中最常见,我们希望获得快速结果。因此,批量删除消息是一个合理的事情。

2.2。模拟

为了模拟这种情况,让我们从Kafka安装目录中purge-scenario

$ bin/kafka-topics.sh \

 --create --topic purge-scenario --if-not-exists \

 --partitions 2 --replication-factor 1 \

 --zookeeper localhost:2181

接下来,让我们使用shuf命令**生成随机数据并将其提供给kafka-console-producer.sh**脚本:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \

 | tee -a /tmp/kafka-random-data \

 | bin/kafka-console-producer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --topic purge-scenario

我们必须注意,我们已经使用了tee命令来保存仿真数据,以备后用。

最后,让我们验证使用者是否可以使用该主题中的消息:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario \

 --max-messages 3

 76696

 49425

 1744

 Processed a total of 3 messages

3.消息过期

purge-scenario主题中生成的消息的默认保留期为7天。要清除消息,我们可以**retention.ms主题级别的属性临时重置**为十秒钟,然后等待消息过期:

$ bin/kafka-configs.sh --alter \

 --add-config retention.ms=10000 \

 --bootstrap-server=0.0.0.0:9092 \

 --topic purge-scenario \

 && sleep 10

接下来,让我们验证消息是否已从该主题过期:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario \

 --max-messages 1 --timeout-ms 1000

 [2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)

 org.apache.kafka.common.errors.TimeoutException

 Processed a total of 0 messages

最后,我们可以将该主题的原始保留期恢复为7天:

$ bin/kafka-configs.sh --alter \

 --add-config retention.ms=604800000 \

 --bootstrap-server=0.0.0.0:9092 \

 --topic purge-scenario

通过这种方法,Kafka将针对purge-scenario主题清除所有分区中的消息。

4.选择性记录删除

有时,我们可能希望选择性地删除特定主题的一个或多个分区中的记录。我们可以使用kafka-delete-records.sh脚本满足这些要求。

delete-config.json配置文件中指定分区级别的偏移量。

让我们使用offset=-1 partition=1的所有消息:

{

 "partitions": [

 {

 "topic": "purge-scenario",

 "partition": 1,

 "offset": -1

 }

 ],

 "version": 1

 }

接下来,让我们继续删除记录:

$ bin/kafka-delete-records.sh \

 --bootstrap-server localhost:9092 \

 --offset-json-file delete-config.json

我们可以验证我们仍然能够从partition=0读取:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario --partition=0 \

 --max-messages 1 --timeout-ms 1000

 44017

 Processed a total of 1 messages

但是,当我们从partition=1读取时,将没有记录可处理:

$ bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning --topic purge-scenario \

 --partition=1 \

 --max-messages 1 --timeout-ms 1000

 [2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)

 org.apache.kafka.common.errors.TimeoutException

 Processed a total of 0 messages

5.删除并重新创建主题

清除Kafka主题的所有消息的另一种解决方法是删除并重新创建它。但是,只有delete.topic.enable属性设置为 启动Kafka服务器时为**true**

$ bin/kafka-server-start.sh config/server.properties \

 --override delete.topic.enable=true

要删除主题,我们可以使用kafka-topics.sh脚本:

$ bin/kafka-topics.sh \

 --delete --topic purge-scenario \

 --zookeeper localhost:2181

 Topic purge-scenario is marked for deletion.

 Note: This will have no impact if delete.topic.enable is not set to true.

让我们通过列出主题来验证它:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

确认不再列出该主题后,我们现在可以继续并重新创建它。

六,结论

在本教程中,我们模拟了需要清除Apache Kafka主题的场景。此外,我们探索了**多种策略来完全或选择性地清除分区中的数据**。

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *