Article From:https://www.cnblogs.com/qwangxiao/p/9971006.html

A consumer group can consume more than one topic. In the past, a consumer consumed more than one topic. This time, a consumer group consumed more than one topic through direct connection, and did a small test. The result is correct. By looking at the client of zookeeper, ZooKeeper recorded the offset

package day04

/*
Consumption of multiple topics
*/
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderDemoYY1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“yy”).setMaster(“local[*]”)
val ssc = new StreamingContext(conf,Duration(5000))
//Consumption of 3 topics
val topic1 = “wc”
val topic2 =”wc1″
val topic3 =”wc2″
//Group name
val groupid =”GPMMVV”
//zookeeperaddress
val zkQuorum = “hadoop01:2181,hadoop02:2181,hadoop03:2181”
//brokerList
val brokerList = “hadoop01:9092,hadoop02:9092,hadoop03:9092”
//Put the consumed partition into the Set collection and pass it in as a parameter at the first reading.
val topics = Set(topic1,topic2,topic3)
//ListBufferTimely and orderly, ordered by Subscripts
val topicsList = ListBuffer[String](topic1,topic2,topic3)
//Setting parameters of Kafka
val kafkaParams = Map(
“metadata.broker.list”->brokerList,
“groupid”->groupid,
“auto.offset.reset”->kafka.api.OffsetRequest.SmallestTimeString
//Read from scratch by default
)

//new ListBufferUsed to store ZKGroupTopicDirs, to store the address of offset
//Because there are multiple topics, there are also multiple ZKGroupTopicDirs.
var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]()
//Add a new ZKGroup TopicDirs to the zkGTList based on the topicList
for(tp <- topicsList){
val topicDirs = new ZKGroupTopicDirs(groupid,tp)
zkGTList += topicDirs
}
//Create a new zkClient to obtain and update offsets
val zkClient = new ZkClient(zkQuorum)
//Create a new InputDStream, if var, because there are two cases, consumption? No consumption? Assignment according to circumstances
var kafkaDStream :InputDStream[(String,String)] = null
//Create a Map, (key, value) – (corresponding time Topic and partition, offset)
var fromOffset = Map[TopicAndPartition,Long]()

//Get whether each topic has been consumed
var childrens:ListBuffer[Int] =new ListBuffer[Int]()
var flag = false //True if Topic is consumed
for (topicDir <- zkGTList){ //Cyclic storage of offset
//Get the offset ZKGroupTopicDirs object in each topic partition by zkClient. countChidren
val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir)
childrens +www.mhylpt.com= child
if(child>0){
flag = true
}
}

if(flag){//Consumption
for(z <- 0 until topics.size){ //Get the corresponding child and ZKGroup TopicDirs according to the following table of topicsList
val child = childrens(z)
val gpDirs = zkGTList(z)
val topicn = topicsList(z)
for(i <- 0 until child)www.mcyllpt.com/{
//Loop child, u gets the offset of each partition of topic by using the zkClient. readData method
val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+”/”+i)
val tp = new TopicAndPartition(www.michenggw.com/ topicn,i)
fromOffset += tp -> offset.toLong
}
}
//The result is the key of kafka, which is null by default and value is the value in kafka.
val messageHandler =www.gcyl159.com/ (mmd:MessageAndMetadata[String,String])=www.gcyl152.com>{
(mmd.key(),mmd.message())
}
//Create kafkaDStream
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
ssc,kafkaParams,fromOffset,messageHandler
)
}else{//Not read before
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topics
)
}

/*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir)
val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir)
if(children1>0 || children2>0){
if(children1>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+”/”+i)
val tp = new TopicAndPartition(topic1,i)
fromOffset += tp ->offset.toLong
}
}
if(children2>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+”/”+i)
val tp = new TopicAndPartition(topic2,i)
fromOffset += tp ->offset.toLong
}
}
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
(mmd.key(),mmd.message())
}
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaParams,fromOffset,messageHandler)
}else{
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
}*/

var offsetRanges = Array[OffsetRange]www.hjpt521.com() //Partition offset for each topic used to record updates

kafkaDStream.foreachRDD(kafkaRDD=>{
//kafkaRDDIs a KafkaRDD that can be converted into HasOffsetRanges objects to obtain offsetRanges
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRDD.foreach(println)www.365soke.com //Printing

for(o <- offsetRanges){
val topicNN: String = o.topic //Get topic
val offset: Long = o.untilOffset //Obtaining offset
val partition: Int = o.partition //Get partition
val i = topicsList.indexOf(topicNN) //Find the subscript of topic by topicList, and find the corresponding ZKGroup TopicDirs
val gpDir = zkGTList(i)
//Update offset through ZkUtils
ZkUtils.updatePersistentPath(zkClient,gpDir.consumerOffsetDir+”/”+partition,offset.toString)
/*if(topicNN.equals(topic1)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs1.consumerOffsetDir+”/”+partition,offset.toString)
}else if(topicNN.equals(topic2)){
ZkUtils.updatePersistentPath(zkClient,zKGroupTopicDirs2.consumerOffsetDir+”/”+partition,offset.toString)
}*/
}
})

ssc.start()
ssc.awaitTermination(www.dfgjyl.cn)

You can view the offset in / consumers through the zookeeper client.
Of my three topics, WC and WC1 have only one partition, and you can see from the figure below that the offset of 0 partition of WC1 is 13.

Leave a Reply

Your email address will not be published. Required fields are marked *