Kafka 生产环境运维姿势
Kafka 生产环境运维姿势
1. 日常运维
1.1 kafka生产消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_ammeter
上述生产的消息key是空的,若想生产指定key的消息,则使用以下:
./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_ammeter --property parse.key=true --property key.separator=:
如,需使用shell脚本生产一批带key的消息:
for (( i=1; i<=10; i++ )); do echo key$i:value$i | kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_ammeter --property parse.key=true --property key.separator=:; done;
1.2 消费消息
# 简单用法,消费最新
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic safeclound_ammeter
# 从头开始消费
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic safeclound_ammeter --from-beginning
# 从指定分区、偏移开始消费(注:验证版本: kafka_0.10.0.x/kafka_1.1.x,必须使用 --bootstrap-server,使用 --zookeeper 无效,必须使用 --new-consumer 否则无效,但 kafka_1.1.x 已过时无需指定)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic safeclound_ammeter --new-consumer --partition 1 --offset 184169591
1.3 创建主题
./kafka-topics.sh --zookeeper localhost:2181 --create --topic safeclound_ammeter --partitions 10 --replication-factor 3
1.4 主题列表
./kafka-topics.sh --zookeeper localhost:2181 --list
1.5 主题详情
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic safeclound_ammeter
1.6 删除主题
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic safeclound_ammeter
1.7 添加分区
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic safeclound_ammeter --partitions 15
1.8 扩容分区(副本数)
- 1.8.1 准备扩容配置文件(expand-replcas.json):
cat <<-'EOF'> expand-replcas.json
{
"version": 1,
"partitions": [{
"topic": "safeclound_ammeter",
"partition": 0,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 1,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 2,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 3,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 4,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 5,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 6,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 7,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 8,
"replicas": [0, 1]
}, {
"topic": "safeclound_ammeter",
"partition": 9,
"replicas": [0, 1]
}]
}
EOF
- 1.8.2 执行扩容
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./expand-replcas.json --execute
- 1.8.3 检验扩容
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./expand-replcas.json --verify
- 1.8.4 查看分区新副本数
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic safeclound_ammeter
1.9 groupId 列表
# 仅显示 kf 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 仅显示 zk 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
1.10 groupId 详情
# 仅处理 kf 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-9373
# 仅处理 zk 类型的 consumers
./kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-9373
1.11 offset 重置
注:以下操作
kafka_1.x+
才可用,kafka_0.10.x
是没有reset-offset
命令的,老版本请参考(新老通用):生产环境 KAFKA 消费中断如何 OFFSET 重放消费
- 将
mygroup1
对应所有topics
的offset
重置到 30 分钟之前
# 仅处理 kf 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup1 --reset-offsets --all-topics --by-duration PT0H30M0S
# 仅处理 zk 类型的 consumers
./kafka-consumer-groups.sh --zookeeper localhost:2181 --group mygroup1 --reset-offsets --all-topics --by-duration PT0H30M0S
-
注:要实际执行必须增加
--execute
-
确定位移重设策略——当前支持8种设置规则:
--to-earliest
:把位移调整到分区当前最小位移--to-latest
:把位移调整到分区当前最新位移--to-current
:把位移调整到分区当前位移--to-offset <offset>
: 把位移调整到指定位移处--shift-by N
: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动--to-datetime <datetime>
:把位移调整到大于给定时间的最早位移处,datetime 格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
--by-duration <duration>
:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
--from-file <file>
:从CSV文件中读取调整策略
-
确定执行方案——当前支持3种方案(什么参数都不加:只是打印出位移调整方案,不实际执行)
--execute
:执行真正的位移调整--export
:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
1.12 查看主题的所有分区数据信息
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic safeclound_co2 --time -2
注: -2表示最早 offset
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic safeclound_co2 --time -1
注: -1表示最新 offset
1.13 查看group.id所有消费情况 (offset)
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group safeclound_spark_test --describe
或
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper owner-node:2181 --group safeclound_spark_test --topic safeclound_co2
2. FAQ
2.1 生产环境 auto.offset.reset 到底怎么用?
- a) 需求场景
业务方只需消费最新数据,只需将 auto.offset.reset=latest
就 ok了吧? ---- 但在亲经生产环境血的教训后... 才知道这样设置是不会生效的!
那咋办呐?还得加上 enable.auto.commit=false
, 经各种折腾后了解到,如果 enable.auto.commit=true
会自动提交offset,虽然设置了auto.offset.reset=latest
,但是它会优先使用已经存储的 offset 了,因此不会生效。
- b) 测试目标
为验证 latest 和 earliest 策略对 kafkaConsumer 中断继续的表现
- c) 测试环境
kafka_0.10.x
和 kafka_1.1.x
+ kafka-manager-1.3.3.16
- d) 测试代码
[展开] KafkaConsumerOffsetTests.scala
import org.apache.kafka.clients.consumer.KafkaConsumer import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerRecords import java.util.Arrays /** * Test cases on the three options of 'auto.offset.reset' (latest|earliest|none). * * Under the same 'group.id', once 'auto.offset.reset' is set, it cannot be changed, * that is, only the initially set value will always be used. Even if it is modified * to latest or earliest and restart, it will not take effect. * * For the summary detail, see: * @see https://blogs.wl4g.com/archives/1390 * @see https://blogs.wl4g.com/archives/91 * @author* @version 2020-07-12 */ object KafkaConsumerOffsetTests { // kafka-topics.sh --zookeeper localhost:2181 --create --topic testKfOffsetTopic --partitions 2 --replication-factor 1 // kafka-console-producer.sh --broker-list localhost:9092 --topic testKfOffsetTopic --property parse.key=true --property key.separator=: // kafka-console-consumer.sh --zookeeper localhost:2181 --topic testKfOffsetTopic --from-beginning def main(args: Array[String]): Unit = { val props = new Properties props.put("bootstrap.servers", "owner-node4:9092"); props.put("group.id", getClass.getSimpleName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "false"); // props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "latest"); // latest|none|earliest val kfConsumer = new KafkaConsumer[String, String](props) kfConsumer.subscribe(Arrays.asList("testKfOffsetTopic1")); while (true) { println(s"Poll..." + System.currentTimeMillis) val records: ConsumerRecords[String, String] = kfConsumer.poll(1000) import scala.collection.JavaConverters._ records.asScala.map(r => { val topic = r.topic val partition = r.partition val offset = r.offset val timestamp = r.timestamp val key = r.key val value = r.value println(s"------------------------------------------------------------------------------------------------------") println(s"topic: $topic, partition: $partition, offset: $offset, timestamp: $timestamp, key: $key, value: $value") println(s"------------------------------------------------------------------------------------------------------") kfConsumer.commitSync }) } } }
-
e) 测试案例
-
同一
group.id
下,若之前有提交各分区的 offset,如果初始配置是auto.offset.reset=latest
, 此时改为 earliest 重启消费,也不会从中断的 offset 继续,而是只消费到最新的(就算之前是有提交 offset); -
同一
group.id
下,若之前有提交各分区的 offset,如果初始配置是auto.offset.reset=earliest
,此时重启消费服务会从中断的 offset 继续,而是不是只消费最新的; -
同一
group.id
下,若之前有提交各分区的 offset,如果初始配置是auto.offset.reset=earliest
,此时改为 latest 重启消费,还是会从中断的 offset 继续;此时不改配置直接重启消费,发现还是从中断的 offset 继续;此时再改回为 earliest 重启消费,依然只消费最新的;
-
-
f) 结论1
同一 group.id
下,auto.offset.reset
设置是不可变的,即,每当使用新 group.id
时是使用的那种策略(latest | earliest),之后不管改为那种策略都不会生效,该从已提交 offset 的继续消费还是继续,只会消费最新的依然还是只消费最新的。
- g) 结论2
实验过程使用了 kafka-console-producer.sh
发送测试消息,kafka-manager 中 consumers 消费详情中的 LogSize 和 Consumer Offset 字段都是表示消息累计数量而不是 bytes。
-
注1:以上实验使用的版本为
kafka_0.10.x
和kafka_1.x
(在0.8.x~0.10.x
之间存在offset是否存储于 zk 的区别,当前 4.x 已废除外置 zk 直接内置到 kafka broker 里面了)之后更高主版本的演进就不知是否会改变此策略了) -
注2:以上 测试代码:KafkaConsumerOffsetTests.scala 是在纯 KafkaConsumer 下进行的,如果在 spark-streaming-kafka-010 环境中的
传统 receiver 模式下创建 createRDD(..) 时 spark 会强制覆盖auto.offset.reset
参数为none
,相当于强制使用 offset 提交,注 createDirectStream(..) 模式则不会强制覆盖,参见源码:https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L71。
2.2 线上某个 kafka 节点主机宕机,后人工重启发现一直监听 9092
单开,查看日志发现大量警告:
Found a corrupted index file due to requirement failed: Corrupt index found,
-
分析:
由于宕机 kafka 的异步落盘的机制,导致宕机前几秒的数据丢失,然后重启时检查索引文件长度和offset发现相减不为0,标识为损坏,故日志报错. -
解决:
- 首先等待 kafka 自己恢复处理,大约 4~5分钟看数据量(生产亲经历版本是 kafka_0.10.0 自动恢复成功过);
- 如果等待一段时间发现无法自行恢复,则只能手动删掉损坏索引重启让它自己重建.
3. 参考
- 参考 1: kafka 重启出错 Corrupt index found
- 参考 2: kafka warn-found-a-corrupted-index-file-due-to-requirement-failed-corrupt-index-found
- 参考 3: KAFKA - While recording the replica LEO, the partition
- 参考 4: Kafka consumer group 偏移重设
- TimeoutException: Expiring x record(s) for xx: 30001 ms has passed since batch creation plus linger time
- 参考 5: Kafka Commit cannot be completed since the group has already rebalanced
- 参考 6: jdk8 Metaspace 调优
- 参考 7: 记一次 jdk8 的 Metaspace 区域的调优过程及误解
一条评论
wordpress
up