拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 在Apache Kafka中配置消息保留期限

在Apache Kafka中配置消息保留期限

白鹭 - 2021-11-24 865 0 0

1.概述

当生产者向Apache Kafka发送消息时,它将消息附加到日志文件中并在配置的持续时间内保留。

在本教程中,我们将学习为Kafka主题配置基于时间的消息保留属性

2.基于时间的保留

保留期属性到位后,消息具有TTL(生存时间) 。到期时,邮件将标记为删除,从而释放磁盘空间。

相同的保留期限属性适用于给定Kafka主题内的所有消息。此外,我们可以在创建主题之前设置这些属性,也可以在运行时为预先存在的主题更改它们

在以下各节中,我们将学习如何通过代理配置进行调整,以设置新主题的保留期以及主题级别的配置以在运行时对其进行控制

3.服务器级配置

Apache Kafka支持服务器级别的保留策略,我们可以通过完全配置以下三个基于时间的配置属性之一来进行调整

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

重要的是要了解,Kafka会用较高的值覆盖较低的精度值。因此, log.retention.ms的优先级最高

3.1。基本

首先,让我们通过执行Apache Kafka目录中grep命令来检查默认值是否保留:

$ grep -i 'log.retention.[hms].*\=' config/server.properties

 log.retention.hours=168

在这里我们可以注意到默认的保留时间是7天

要仅将消息保留十分钟,我们可以在config/server.properties log.retention.minutes属性的值:

log.retention.minutes=10

3.2。新主题的保留期

Apache Kafka软件包包含几个Shell脚本,我们可以使用它们执行管理任务。我们将使用它们来创建一个辅助脚本functions.sh ,该脚本将在本教程的过程中使用.

让我们从在functions.sh添加两个函数开始,分别创建一个主题并描述其配置:

function create_topic {

 topic_name="$1"

 bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \

 --partitions 1 --replication-factor 1 \

 --zookeeper localhost:2181

 }



 function describe_topic_config {

 topic_name="$1"

 ./bin/kafka-configs.sh --describe --all \

 --bootstrap-server=0.0.0.0:9092 \

 --topic ${topic_name}

 }

接下来,让我们创建两个独立脚本, create-topic.shget-topic-retention-time.sh

bash-5.1# cat create-topic.sh

 #!/bin/bash

 . ./functions.sh

 topic_name="$1"

 create_topic "${topic_name}"

 exit $?
bash-5.1# cat get-topic-retention-time.sh

 #!/bin/bash

 . ./functions.sh

 topic_name="$1"

 describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'

 exit $?

我们必须注意, describe_topic_config将提供为该主题配置的所有属性。因此,我们使用了awk retention.ms属性添加了一个过滤器。

最后,让我们启动Kafka环境,并验证新示例主题的保留期配置:

bash-5.1# ./create-topic.sh test-topic

 Created topic test-topic.

 bash-5.1# ./get-topic-retention-time.sh test-topic

 retention.ms=600000

创建并描述了主题之后,我们会注意到, retention.ms设置为600000 (十分钟)。这实际上是从我们先前在server.properties文件中**log.retention.minutes属性派生的。**

4.主题级配置

启动Broker服务器后, log.retention.{hours|minutes|ms}服务器级别的属性将变为只读。另一方面,我们可以访问retention.ms属性,可以在主题级别进行调整。

让我们在functions.sh脚本中添加一个方法来配置主题的属性:

function alter_topic_config {

 topic_name="$1"

 config_name="$2"

 config_value="$3"

 ./bin/kafka-configs.sh --alter \

 --add-config ${config_name}=${config_value} \

 --bootstrap-server=0.0.0.0:9092 \

 --topic ${topic_name}

 }

然后,我们可以在alter-topic-config.sh脚本中使用它:

#!/bin/sh

 . ./functions.sh



 alter_topic_retention_config $1 $2 $3

 exit $?

test-topic保留时间设置为五分钟,然后进行验证:

bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000

 Completed updating config for topic test-topic.



 bash-5.1# ./get-topic-retention-time.sh test-topic

 retention.ms=300000

5.验证

到目前为止,我们已经了解了如何在Kafka主题中配置邮件的保留期限。是时候验证保留超时后消息确实过期了。

5.1。生产者-消费者

让我们在functions.sh中produce_messageconsume_message functions.sh.在内部,它们分别使用kafka-console-producer.shkafka-console-consumer.sh来生成/使用消息:

function produce_message {

 topic_name="$1"

 message="$2"

 echo "${message}" | ./bin/kafka-console-producer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --topic ${topic_name}

 }



 function consume_message {

 topic_name="$1"

 timeout="$2"

 ./bin/kafka-console-consumer.sh \

 --bootstrap-server=0.0.0.0:9092 \

 --from-beginning \

 --topic ${topic_name} \

 --max-messages 1 \

 --timeout-ms $timeout

 }

我们必须注意,使用者总是从头开始阅读消息,因为我们需要一个能够读取Kafka中任何可用消息的使用者。

接下来,让我们创建一个独立的消息生成器:

bash-5.1# cat producer.sh

 #!/bin/sh

 . ./functions.sh

 topic_name="$1"

 message="$2"



 produce_message ${topic_name} ${message}

 exit $?

最后,让我们有一个独立的消息使用者:

bash-5.1# cat consumer.sh

 #!/bin/sh

 . ./functions.sh

 topic_name="$1"

 timeout="$2"



 consume_message ${topic_name} $timeout

 exit $?

5.2。讯息到期

现在我们已经准备好基本设置,让我们生成一条消息并立即使用两次:

bash-5.1# ./producer.sh "test-topic-2" "message1"

 bash-5.1# ./consumer.sh test-topic-2 10000

 message1

 Processed a total of 1 messages

 bash-5.1# ./consumer.sh test-topic-2 10000

 message1

 Processed a total of 1 messages

因此,我们可以看到使用者正在重复使用任何可用消息。

现在,让我们介绍五分钟的睡眠延迟,然后尝试使用该消息:

bash-5.1# sleep 300 && ./consumer.sh test-topic 10000

 [2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)

 org.apache.kafka.common.errors.TimeoutException

 Processed a total of 0 messages

如预期的那样,使用者没有找到任何要使用的消息,因为该消息已经超过了保留期

6.局限性

在内部,Kafka Broker维护另一个名为log.retention.check.interval.ms.此属性确定检查消息是否过期的频率。

所以,为了保持保留策略有效,我们必须确保的值log.retention.check.interval.ms比的属性值低retention.ms对于任何给定的主题。

7.结论

在本教程中,我们探索了Apache Kafka以了解基于时间的消息保留策略。在此过程中,我们创建了简单的Shell脚本来简化管理活动。后来,我们创建了一个独立的使用者和生产者,以验证保留期后邮件的有效期。

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *