Hadoop

Spark Streaming Kafka 消费中断 Offset 重放消费

1. 背景

    晚上收到告警消息说没实时数据了... 架构是: kafka -> spark streaming -> hbase 消费数据存到 hbase 做实时历史查询, 由于在外面无法及时登录服务器... 最后经排查发现是 yarn 调度卡死导致 spark streaming 任务假死, 处理过程省略xx字..., 待一切恢复之后, 发现历史曲线凹下去一节, 推测肯定是处理问题这段时间数据没消费到, 讲道理这种场景不应该管理好 offset了, 即使中断了也应该从此出继续消费才是呀... 于是检查代码发现... 然后一百种想TMD... 前任留的坑呀... 只创建完val kafkaStream=KafkaUtils.createStream(ssc, zkKafka, groupId, topicsMap)就完事了, 根本没理 offset...

2. 分析

我们在设计基于 kafka 消费系统时通常会按照业务特点,对应消费策略也分几种情况:

  • a. 如果只需要最新数据那么消费策略通常是设置auto.offset.reset=latest (默认值);

  • b. 如果要求每次启动消费从头开始则需设置auto.offset.reset=earliest;

  • c. 例如业务是 kafka -> spark streaming -> hbase 做历史数据存储, 那么自动或手动(需设置enable.auto.commit=false)管理好每次处理完 fetch 来的数据后的 offset 了, 通常客户端都有自动提交和手动提交两种策略, 后者更加灵活如自行存到 redis / mysql / hbase 等。

  • a. 仅使用val kafkaStream=KafkaUtils.createStream(ssc, zkKafka, groupId, topicsMap)是不行的,上面介绍了默认情况下 KafkaConsumer 会使用auto.offset.reset=latest策略,

3. 解决

  • 3.1 首先确保 Spark Streaming 消费 Kafka 的模式支持 offset (即不是 latest 模式),关键代码部分如下,使用 kafka 直接消费模式比 zk 模式性能更高, 同时 offset 的管理也直接提交到 kafka brokers 里, 减少使用 Redis / MySQL 等外部存储依赖. 完整代码请参考
[展开] GenericKafka2HbaseStreaming.scala

   // Skip ......

    val Array(master, brokers, hbaseZk, 
         groupId, topicTables, delayMs, rowKeyFields) = args
    val topicTablesMap = topicTables.split(",")
        .map(x => x.split("->")).map(x => Tuple2(x(0), x(1))).toMap

    val sparkConf = new SparkConf()
        .setAppName(getClass.getSimpleName).setMaster(master)
   val ssc = new StreamingContext(sparkConf, 
        Milliseconds(delayMs.toLong)).checkpoint("checkpoint")

    val kafkaDStream = Kafkas.createDirectStream(ssc, 
        brokers, groupId, topicTablesMap)

    kafkaDStream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(records => {
          if (records != null && !records.isEmpty) {
            val hbConn = HbaseUtils2.createConnDefault(hbaseZk)
            val batchStore = new BatchStore(hbConn, 500)
            records.foreach(record => {
                 // Skip ......
            })
            batchStore.commitAll()
            hbConn.close()
          }
        })

        Kafkas.commitAsync(kafkaDStream, rdd)
      }
    })
    ssc.start()
    ssc.awaitTermination()
[展开] SparkStreamKafkaHelper.scala

object SparkStreamKafkaHelper extends Logging with Serializable {

  /**
   * Create kafka direct input stream with brokers and topics.
   */
  def createDirectStream(ssc: StreamingContext, 
      brokers: String, groupId: String, 
      topicTablesMap: Map[String, String]):
      InputDStream[ConsumerRecord[String, String]] = {
    Asserts.hasText(brokers, 
        "'brokers' must not be empty, please check parameters.")
    Asserts.hasText(groupId, "groupId")
    Asserts.notEmpty(topicTablesMap, 
        "'topicTablesMap' must not be empty, please check parameters.")

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
      classOf[StringDeserializer].getName,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
      classOf[StringDeserializer].getName,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ->
      Boolean.valueOf(System.getProperty("kafka.enable.auto.commit", "false")),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> 
      System.getProperty("kafka.auto.offset.reset", "earliest")) // latest

    // Create direct kafka stream with brokers and topics
    val topics = topicTablesMap.keys.toArray[String]
    Asserts.notNull(topics, "Topics must not be null")
    Asserts.notNullElements(topics, "Topics must elements not be null")

    KafkaUtils.createDirectStream(
      ssc, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
  }

  // Skip ......

  /**
   * Commit offset RDD.
*/ def commitAsync( kafkaDStream: InputDStream[ConsumerRecord[String, String]], rdd: RDD[ConsumerRecord[String, String]]) { // Commit current offsetRanages. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges kafkaDStream.asInstanceOf[CanCommitOffsets] .commitAsync(offsetRanges, new OffsetCommitCallback() { def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], ex: Exception) { if (ex != null) { log.error(String.format( "Failed to async commit kafka brokers. offsetRanges - {}", offsetRanges.toList), ex) } else if (log.isInfoEnabled()) { log.info("Committed offsetRanges - {}", offsetRanges.toList) } } }) } }


  • 3.2 使用 kafka_offset_tool 工具将 对应 group 的 topics 的 offset 重置到故障之前。注 kafka-1.x+ 以后内置了 ./kafka-consumer-groups.sh reset-offsets 命令。

    • 3.2.1 下载工具,注意操作系统版本,其他操作系统需自行编译
wget https://github.com/wl4g/kafka_offset_tool/releases/download/v1.2.5/kafkaOffsetTool_1.2.5_linux_amd64
chmod +x kafkaOffsetTool_1.2.5_linux_amd64
./kafkaOffsetTool_1.2.5_linux_amd64 get-offset -o myoffset.json
  • 3.2.3 (可选) 可手动编辑 myoffset.json,对 group/topic/offset 做特殊删除或修改

  • 3.3.4 使用 offset-calc 命令对 json 文件批量重置 offset 。如下示例是将所有groupId 对所有 topics 的 offset 的值全部回放 1000,即都减 1000,具体多少可根据业务量估算,也可通过 kafka-console-consumer.sh 从头消费查查看,一般存到主题的消息都带有时间,即根据它可以知道目前还没有被 Kafka 清除的最早的数据是什么时候的。

./kafkaOffsetTool_1.2.5_linux_amd64 offset-calc -i myoffset.json -o myoffset-restore.json -I -1000
  • 3.2.5 【重点】重置 offset 到指定值

    ./kafkaOffsetTool_1.2.5_linux_amd64 set-offset -i myoffset-restore.json
  • 3.2.6 再重启 Spark Streaming 消费程序,观察消费的数据是否从重置的 offset 的时间开始的

4. 参考

留言

您的电子邮箱地址不会被公开。