1.概述
当生产者向Apache Kafka发送消息时,它将消息附加到日志文件中并在配置的持续时间内保留。
在本教程中,我们将学习为Kafka主题配置基于时间的消息保留属性。
2.基于时间的保留
保留期属性到位后,消息具有TTL(生存时间) 。到期时,邮件将标记为删除,从而释放磁盘空间。
相同的保留期限属性适用于给定Kafka主题内的所有消息。此外,我们可以在创建主题之前设置这些属性,也可以在运行时为预先存在的主题更改它们。
在以下各节中,我们将学习如何通过代理配置进行调整,以设置新主题的保留期以及主题级别的配置以在运行时对其进行控制。
3.服务器级配置
Apache Kafka支持服务器级别的保留策略,我们可以通过完全配置以下三个基于时间的配置属性之一来进行调整:
-
log.retention.hours
-
log.retention.minutes
-
log.retention.ms
重要的是要了解,Kafka会用较高的值覆盖较低的精度值。因此, log.retention.ms
的优先级最高。
3.1。基本
首先,让我们通过执行Apache Kafka目录中grep
命令来检查默认值是否保留:
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
在这里我们可以注意到默认的保留时间是7天。
要仅将消息保留十分钟,我们可以在config/server.properties
log.retention.minutes
属性的值:
log.retention.minutes=10
3.2。新主题的保留期
Apache Kafka软件包包含几个Shell脚本,我们可以使用它们执行管理任务。我们将使用它们来创建一个辅助脚本functions.sh
,该脚本将在本教程的过程中使用.
让我们从在functions.sh
添加两个函数开始,分别创建一个主题并描述其配置:
function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}
function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
接下来,让我们创建两个独立脚本, create-topic.sh
和get-topic-retention-time.sh
:
bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
我们必须注意, describe_topic_config
将提供为该主题配置的所有属性。因此,我们使用了awk
retention.ms
属性添加了一个过滤器。
最后,让我们启动Kafka环境,并验证新示例主题的保留期配置:
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
创建并描述了主题之后,我们会注意到, retention.ms
设置为600000
(十分钟)。这实际上是从我们先前在server.properties
文件中**log.retention.minutes
属性派生的。**
4.主题级配置
启动Broker服务器后, log.retention.{hours|minutes|ms}
服务器级别的属性将变为只读。另一方面,我们可以访问retention.ms
属性,可以在主题级别进行调整。
让我们在functions.sh
脚本中添加一个方法来配置主题的属性:
function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
然后,我们可以在alter-topic-config.sh
脚本中使用它:
#!/bin/sh
. ./functions.sh
alter_topic_retention_config $1 $2 $3
exit $?
test-topic
保留时间设置为五分钟,然后进行验证:
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
5.验证
到目前为止,我们已经了解了如何在Kafka主题中配置邮件的保留期限。是时候验证保留超时后消息确实过期了。
5.1。生产者-消费者
让我们在functions.sh中produce_message
和consume_message
functions.sh.
在内部,它们分别使用kafka-console-producer.sh
和kafka-console-consumer.sh
来生成/使用消息:
function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}
我们必须注意,使用者总是从头开始阅读消息,因为我们需要一个能够读取Kafka中任何可用消息的使用者。
接下来,让我们创建一个独立的消息生成器:
bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"
produce_message ${topic_name} ${message}
exit $?
最后,让我们有一个独立的消息使用者:
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"
consume_message ${topic_name} $timeout
exit $?
5.2。讯息到期
现在我们已经准备好基本设置,让我们生成一条消息并立即使用两次:
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
因此,我们可以看到使用者正在重复使用任何可用消息。
现在,让我们介绍五分钟的睡眠延迟,然后尝试使用该消息:
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
如预期的那样,使用者没有找到任何要使用的消息,因为该消息已经超过了保留期。
6.局限性
在内部,Kafka Broker维护另一个名为log.retention.check.interval.ms.
此属性确定检查消息是否过期的频率。
所以,为了保持保留策略有效,我们必须确保的值log.retention.check.interval.ms
比的属性值低retention.ms
对于任何给定的主题。
7.结论
在本教程中,我们探索了Apache Kafka以了解基于时间的消息保留策略。在此过程中,我们创建了简单的Shell脚本来简化管理活动。后来,我们创建了一个独立的使用者和生产者,以验证保留期后邮件的有效期。
0 评论