ZkManager.scala 3.51 KB
Newer Older
杜发飞 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
package com.hikcreate.data.util

import com.hikcreate.data.common.Logging
import com.hikcreate.data.constant.Const
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.kafka.common.protocol.SecurityProtocol

class ZkManager(buildProducer:() => ZkClient) extends Logging with Serializable {

  lazy val zkClient: ZkClient = buildProducer()
  lazy val zkUtils:ZkUtils = ZkUtils.apply(zkClient,isZkSecurityEnabled = false)

  def getBeginOffset(topics:Seq[String],groupId:String): mutable.HashMap[TopicPartition,Long]  ={
    val fromOffsets = mutable.HashMap.empty[TopicPartition,Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)
    partitionMap.foreach{ topicPartitions =>
      val topic = topicPartitions._1
      val partitions = topicPartitions._2
      val topicDirs = new ZKGroupTopicDirs(groupId,topic)
      partitions.foreach{ partition =>
        val tp = new TopicPartition(topic,partition)
        val kafkaOffset = getOffsetForKafka(tp)
        val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
        zkUtils.makeSurePersistentPathExists(zkPath)
        Option(zkUtils.readData(zkPath)._1) match {
          case Some(zkOffset) =>
            if(zkOffset.toLong < kafkaOffset) fromOffsets += tp->kafkaOffset
            else fromOffsets += tp->zkOffset.toLong
          case None =>
            fromOffsets += tp->kafkaOffset
        }
      }
    }
    fromOffsets
  }

  def getOffsetForKafka(topicPartition:TopicPartition,time: Long = OffsetRequest.EarliestTime): Long ={
    val brokerId = zkUtils.getLeaderForPartition(topicPartition.topic,topicPartition.partition).get
    val broker = zkUtils.getBrokerInfo(brokerId).get
    val endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
    val consumer = new SimpleConsumer(endpoint.host,endpoint.port,10000,100000,"getMinOffset")
    val tp = TopicAndPartition(topicPartition.topic,topicPartition.partition)
    val request= OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(time, 1)))
    consumer.getOffsetsBefore(request).partitionErrorAndOffsets(tp).offsets.head
  }

  def saveEndOffset(offsetRanges:ArrayBuffer[OffsetRange],groupId:String): Unit = {
    offsetRanges.foreach{ offsetRange =>
      val topicDirs = new ZKGroupTopicDirs(groupId,offsetRange.topic)
      val zkPath = s"${topicDirs.consumerOffsetDir}/${offsetRange.partition}"
      zkUtils.updatePersistentPath(zkPath,offsetRange.untilOffset.toString)
    }
  }

}

object ZkManager extends Logging {

  def apply(zkServer:String): ZkManager = {
    val createProducerCallback= () => {
      val zkClient = ZkUtils.createZkClient(zkServer,30000,30000)
      sys.addShutdownHook{
        info("Execute hook thread: " + this)
        zkClient.close()
      }
      zkClient
    }
    new ZkManager(createProducerCallback)
  }

  def main(args: Array[String]): Unit = {
    val zkManager = ZkManager(Const.zkKafka)
    zkManager.zkUtils.updatePersistentPath("/consumers/hive/offsets/tbd-transport-data-gathering/1",0.toString)
    zkManager.zkUtils.updatePersistentPath("/consumers/hive/offsets/tbd-transport-data-gathering/2",0.toString)
    zkManager.zkUtils.updatePersistentPath("/consumers/hive/offsets/tbd-transport-data-gathering/0",0.toString)
  }
}