Hadoop

Kafka 生产环境运维姿势

Kafka 生产环境运维姿势

1. 日常运维

1.1 kafka生产消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_ammeter

上述生产的消息key是空的,若想生产指定key的消息,则使用以下:

./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_ammeter --property parse.key=true --property key.separator=:

如,需使用shell脚本生产一批带key的消息:

for (( i=1; i<=10; i++ )); do echo key$i:value$i | kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_ammeter --property parse.key=true --property key.separator=:; done;

1.2 消费消息

# 简单用法,消费最新
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic safeclound_ammeter

# 从头开始消费
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic safeclound_ammeter --from-beginning

# 从指定分区、偏移开始消费(注:验证版本: kafka_0.10.0.x/kafka_1.1.x,必须使用 --bootstrap-server,使用 --zookeeper 无效,必须使用 --new-consumer 否则无效,但 kafka_1.1.x 已过时无需指定)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic safeclound_ammeter --new-consumer --partition 1 --offset 184169591

1.3 创建主题

./kafka-topics.sh --zookeeper localhost:2181 --create --topic safeclound_ammeter --partitions 10 --replication-factor 3

1.4 主题列表

./kafka-topics.sh --zookeeper localhost:2181 --list

1.5 主题详情

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic safeclound_ammeter

1.6 删除主题

./kafka-topics.sh --zookeeper localhost:2181 --delete --topic safeclound_ammeter

1.7 添加分区

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic safeclound_ammeter --partitions 15

1.8 扩容分区(副本数)

  • 1.8.1 准备扩容配置文件(expand-replcas.json):
cat <<-'EOF'> expand-replcas.json
{
    "version": 1,
    "partitions": [{
        "topic": "safeclound_ammeter",
        "partition": 0,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 1,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 2,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 3,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 4,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 5,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 6,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 7,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 8,
        "replicas": [0, 1]
    }, {
        "topic": "safeclound_ammeter",
        "partition": 9,
        "replicas": [0, 1]
    }]
}
EOF
  • 1.8.2 执行扩容
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./expand-replcas.json --execute
  • 1.8.3 检验扩容
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./expand-replcas.json --verify
  • 1.8.4 查看分区新副本数
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic safeclound_ammeter

1.9 groupId 列表

# 仅显示 kf 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 仅显示 zk 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

1.10 groupId 详情

# 仅处理 kf 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-9373

# 仅处理 zk 类型的 consumers
./kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-9373

1.11 offset 重置

注:以下操作 kafka_1.x+ 才可用,kafka_0.10.x 是没有 reset-offset 命令的,老版本请参考(新老通用):生产环境 KAFKA 消费中断如何 OFFSET 重放消费

  • mygroup1 对应所有 topicsoffset 重置到 30 分钟之前
# 仅处理 kf 类型的 consumers
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup1 --reset-offsets --all-topics --by-duration PT0H30M0S

# 仅处理 zk 类型的 consumers
./kafka-consumer-groups.sh --zookeeper localhost:2181 --group mygroup1 --reset-offsets --all-topics --by-duration PT0H30M0S
  • :要实际执行必须增加 --execute

  • 确定位移重设策略——当前支持8种设置规则:

    • --to-earliest:把位移调整到分区当前最小位移
    • --to-latest:把位移调整到分区当前最新位移
    • --to-current:把位移调整到分区当前位移
    • --to-offset <offset>: 把位移调整到指定位移处
    • --shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
    • --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime 格式是 yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
    • --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
    • --from-file <file>:从CSV文件中读取调整策略
  • 确定执行方案——当前支持3种方案(什么参数都不加:只是打印出位移调整方案,不实际执行)

    • --execute:执行真正的位移调整
    • --export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

1.12 查看主题的所有分区数据信息

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic safeclound_co2 --time -2

注: -2表示最早 offset

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic safeclound_co2 --time -1

注: -1表示最新 offset

1.13 查看group.id所有消费情况 (offset)

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group safeclound_spark_test --describe

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper owner-node:2181 --group safeclound_spark_test --topic safeclound_co2

2. FAQ

2.1 生产环境 auto.offset.reset 到底怎么用?

  • a) 需求场景

业务方只需消费最新数据,只需将 auto.offset.reset=latest 就 ok了吧? ---- 但在亲经生产环境血的教训后... 才知道这样设置是不会生效的!
     那咋办呐?还得加上 enable.auto.commit=false, 经各种折腾后了解到,如果 enable.auto.commit=true 会自动提交offset,虽然设置了auto.offset.reset=latest,但是它会优先使用已经存储的 offset 了,因此不会生效。

  • b) 测试目标

为验证 latest 和 earliest 策略对 kafkaConsumer 中断继续的表现

  • c) 测试环境

kafka_0.10.xkafka_1.1.x + kafka-manager-1.3.3.16

  • d) 测试代码
[展开] KafkaConsumerOffsetTests.scala

import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecords
import java.util.Arrays

/**
 * Test cases on the three options of 'auto.offset.reset' (latest|earliest|none).
 *
 * Under the same 'group.id', once 'auto.offset.reset' is set, it cannot be changed,
 * that is, only the initially set value will always be used. Even if it is modified
 * to latest or earliest and restart, it will not take effect.
 *
 * For the summary detail, see:
 * @see https://blogs.wl4g.com/archives/1390
 * @see https://blogs.wl4g.com/archives/91
 * @author 
 * @version 2020-07-12
 */
object KafkaConsumerOffsetTests {

  // kafka-topics.sh --zookeeper localhost:2181 --create --topic testKfOffsetTopic --partitions 2 --replication-factor 1
  // kafka-console-producer.sh --broker-list localhost:9092 --topic testKfOffsetTopic --property parse.key=true --property key.separator=:
  // kafka-console-consumer.sh --zookeeper localhost:2181 --topic testKfOffsetTopic --from-beginning
  def main(args: Array[String]): Unit = {
    val props = new Properties
    props.put("bootstrap.servers", "owner-node4:9092");
    props.put("group.id", getClass.getSimpleName);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("enable.auto.commit", "false");
    // props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "latest"); // latest|none|earliest

    val kfConsumer = new KafkaConsumer[String, String](props)
    kfConsumer.subscribe(Arrays.asList("testKfOffsetTopic1"));
    while (true) {
      println(s"Poll..." + System.currentTimeMillis)
      val records: ConsumerRecords[String, String] = kfConsumer.poll(1000)
      import scala.collection.JavaConverters._
      records.asScala.map(r => {
        val topic = r.topic
        val partition = r.partition
        val offset = r.offset
        val timestamp = r.timestamp
        val key = r.key
        val value = r.value
        println(s"------------------------------------------------------------------------------------------------------")
        println(s"topic: $topic, partition: $partition, offset: $offset, timestamp: $timestamp, key: $key, value: $value")
        println(s"------------------------------------------------------------------------------------------------------")
        kfConsumer.commitSync
      })
    }
  }
}


  • e) 测试案例

    • 同一 group.id 下,若之前有提交各分区的 offset,如果初始配置是 auto.offset.reset=latest, 此时改为 earliest 重启消费,也不会从中断的 offset 继续,而是只消费到最新的(就算之前是有提交 offset);

    • 同一 group.id 下,若之前有提交各分区的 offset,如果初始配置是 auto.offset.reset=earliest,此时重启消费服务会从中断的 offset 继续,而是不是只消费最新的;

    • 同一 group.id 下,若之前有提交各分区的 offset,如果初始配置是 auto.offset.reset=earliest,此时改为 latest 重启消费,还是会从中断的 offset 继续;此时不改配置直接重启消费,发现还是从中断的 offset 继续;此时再改回为 earliest 重启消费,依然只消费最新的;

  • f) 结论1

同一 group.id 下,auto.offset.reset 设置是不可变的,即,每当使用新 group.id 时是使用的那种策略(latest | earliest),之后不管改为那种策略都不会生效,该从已提交 offset 的继续消费还是继续,只会消费最新的依然还是只消费最新的。

  • g) 结论2

实验过程使用了 kafka-console-producer.sh 发送测试消息,kafka-manager 中 consumers 消费详情中的 LogSize 和 Consumer Offset 字段都是表示消息累计数量而不是 bytes。

2.2 线上某个 kafka 节点主机宕机,后人工重启发现一直监听 9092 单开,查看日志发现大量警告:

Found a corrupted index file due to requirement failed: Corrupt index found,

  • 分析:
    由于宕机 kafka 的异步落盘的机制,导致宕机前几秒的数据丢失,然后重启时检查索引文件长度和offset发现相减不为0,标识为损坏,故日志报错.

  • 解决:

    • 首先等待 kafka 自己恢复处理,大约 4~5分钟看数据量(生产亲经历版本是 kafka_0.10.0 自动恢复成功过);
    • 如果等待一段时间发现无法自行恢复,则只能手动删掉损坏索引重启让它自己重建.

3. 参考

一条评论

留言

您的电子邮箱地址不会被公开。