Article From:

A consumer group can consume more than one topic. In the past, a consumer consumed more than one topic. This time, a consumer group consumed more than one topic through direct connection, and did a small test. The result is correct. By looking at the client of zookeeper, ZooKeeper recorded the offset

package day04

Consumption of multiple topics
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import scala.collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

object OrderDemoYY1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“yy”).setMaster(“local[*]”)
val ssc = new StreamingContext(conf,Duration(5000))
//Consumption of 3 topics
val topic1 = “wc”
val topic2 =”wc1″
val topic3 =”wc2″
//Group name
val groupid =”GPMMVV”
val zkQuorum = “hadoop01:2181,hadoop02:2181,hadoop03:2181”
val brokerList = “hadoop01:9092,hadoop02:9092,hadoop03:9092”
//Put the consumed partition into the Set collection and pass it in as a parameter at the first reading.
val topics = Set(topic1,topic2,topic3)
//ListBufferTimely and orderly, ordered by Subscripts
val topicsList = ListBuffer[String](topic1,topic2,topic3)
//Setting parameters of Kafka
val kafkaParams = Map(
//Read from scratch by default

//new ListBufferUsed to store ZKGroupTopicDirs, to store the address of offset
//Because there are multiple topics, there are also multiple ZKGroupTopicDirs.
var zkGTList:ListBuffer[ZKGroupTopicDirs] =new ListBuffer[ZKGroupTopicDirs]()
//Add a new ZKGroup TopicDirs to the zkGTList based on the topicList
for(tp <- topicsList){
val topicDirs = new ZKGroupTopicDirs(groupid,tp)
zkGTList += topicDirs
//Create a new zkClient to obtain and update offsets
val zkClient = new ZkClient(zkQuorum)
//Create a new InputDStream, if var, because there are two cases, consumption? No consumption? Assignment according to circumstances
var kafkaDStream :InputDStream[(String,String)] = null
//Create a Map, (key, value) – (corresponding time Topic and partition, offset)
var fromOffset = Map[TopicAndPartition,Long]()

//Get whether each topic has been consumed
var childrens:ListBuffer[Int] =new ListBuffer[Int]()
var flag = false //True if Topic is consumed
for (topicDir <- zkGTList){ //Cyclic storage of offset
//Get the offset ZKGroupTopicDirs object in each topic partition by zkClient. countChidren
val child: Int = zkClient.countChildren(topicDir.consumerOffsetDir)
childrens child
flag = true

for(z <- 0 until topics.size){ //Get the corresponding child and ZKGroup TopicDirs according to the following table of topicsList
val child = childrens(z)
val gpDirs = zkGTList(z)
val topicn = topicsList(z)
for(i <- 0 until child){
//Loop child, u gets the offset of each partition of topic by using the zkClient. readData method
val offset = zkClient.readData[String](gpDirs.consumerOffsetDir+”/”+i)
val tp = new TopicAndPartition( topicn,i)
fromOffset += tp -> offset.toLong
//The result is the key of kafka, which is null by default and value is the value in kafka.
val messageHandler (mmd:MessageAndMetadata[String,String])>{
//Create kafkaDStream
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](
}else{//Not read before
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](

/*val children1 = zkClient.countChildren(zKGroupTopicDirs1.consumerOffsetDir)
val children2 = zkClient.countChildren(zKGroupTopicDirs2.consumerOffsetDir)
if(children1>0 || children2>0){
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs1.consumerOffsetDir+”/”+i)
val tp = new TopicAndPartition(topic1,i)
fromOffset += tp ->offset.toLong
for (i <- 0 until children1){
val offset = zkClient.readData[String](zKGroupTopicDirs2.consumerOffsetDir+”/”+i)
val tp = new TopicAndPartition(topic2,i)
fromOffset += tp ->offset.toLong
val messageHandler =(mmd:MessageAndMetadata[String,String])=>{
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

var offsetRanges = Array[OffsetRange] //Partition offset for each topic used to record updates

//kafkaRDDIs a KafkaRDD that can be converted into HasOffsetRanges objects to obtain offsetRanges
offsetRanges= kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRDD.foreach(println) //Printing

for(o <- offsetRanges){
val topicNN: String = o.topic //Get topic
val offset: Long = o.untilOffset //Obtaining offset
val partition: Int = o.partition //Get partition
val i = topicsList.indexOf(topicNN) //Find the subscript of topic by topicList, and find the corresponding ZKGroup TopicDirs
val gpDir = zkGTList(i)
//Update offset through ZkUtils
}else if(topicNN.equals(topic2)){


You can view the offset in / consumers through the zookeeper client.
Of my three topics, WC and WC1 have only one partition, and you can see from the figure below that the offset of 0 partition of WC1 is 13.

Leave a Reply

Your email address will not be published. Required fields are marked *