1.概述
在本文中,我们将探讨一些清除Apache Kafka主题中的数据的策略。
2.清理方案
在学习清理数据的策略之前,让我们先了解一个需要清除活动的简单方案。
2.1场景
经过配置的保留时间后,Apache Kafka中的消息会自动过期。但是,在某些情况下,我们可能希望消息立即删除。
假设在应用程序代码中引入了一个缺陷,该缺陷在Kafka主题中生成消息。到集成错误修复程序时, Kafka主题中已经有许多损坏的消息可供使用。
此类问题在开发环境中最常见,我们希望获得快速结果。因此,批量删除消息是一个合理的事情。
2.2。模拟
为了模拟这种情况,让我们从Kafka安装目录中purge-scenario
$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181
接下来,让我们使用shuf
命令**生成随机数据并将其提供给kafka-console-producer.sh
**脚本:
$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
我们必须注意,我们已经使用了tee
命令来保存仿真数据,以备后用。
最后,让我们验证使用者是否可以使用该主题中的消息:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages
3.消息过期
purge-scenario
主题中生成的消息的默认保留期为7天。要清除消息,我们可以**retention.ms
主题级别的属性临时重置**为十秒钟,然后等待消息过期:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10
接下来,让我们验证消息是否已从该主题过期:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
最后,我们可以将该主题的原始保留期恢复为7天:
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
通过这种方法,Kafka将针对purge-scenario
主题清除所有分区中的消息。
4.选择性记录删除
有时,我们可能希望选择性地删除特定主题的一个或多个分区中的记录。我们可以使用kafka-delete-records.sh
脚本满足这些要求。
delete-config.json
配置文件中指定分区级别的偏移量。
让我们使用offset=-1
partition=1
的所有消息:
{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}
接下来,让我们继续删除记录:
$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json
我们可以验证我们仍然能够从partition=0
读取:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages
但是,当我们从partition=1
读取时,将没有记录可处理:
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
5.删除并重新创建主题
清除Kafka主题的所有消息的另一种解决方法是删除并重新创建它。但是,只有将delete.topic.enable
属性设置为 启动Kafka服务器时为**true**
:
$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true
要删除主题,我们可以使用kafka-topics.sh
脚本:
$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
让我们通过列出主题来验证它:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
确认不再列出该主题后,我们现在可以继续并重新创建它。
六,结论
在本教程中,我们模拟了需要清除Apache Kafka主题的场景。此外,我们探索了**多种策略来完全或选择性地清除分区中的数据**。
0 评论