Spark Streaming + Kafka Issue


#1

Hi Team,
I have been working on Spark streaming and Kafka integration. Am using this class to counts the words in the message:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaWordCount {
def main(args: Array[String]): Unit ={
if(args.length < 3){
System.err.println("Usage: KafkaWordCount ")
}

val Array(zkQuorum, consumerGroup, topic) = args
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> zkQuorum,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> consumerGroup,
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaWordCount")
val ssc = new StreamingContext(conf,Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
val stream = KafkaUtils.createDirectStream[String,String](
  ssc,
  PreferConsistent,
  Subscribe[String,String](Array(topic),kafkaParams)
)

var lines = stream.map(record => record.value)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

}
}

There after I had

  1. Created the jar
  2. Created a new topic - alokkafka
  3. Submitted on cluster using this command
    spark-submit --class “KafkaWordCount” --jars spark-streaming-kafka-0-10-assembly_2.11-2.3.0.jar target/scala-2.11/wordcount_2.11-0.1.jar ip-172-31-38-146.ec2.internal:2181 spark-streaming-consumer-group alokkafka 2>/dev/null
  4. On different session started producer
    kafka-console-producer.sh --broker-list ip-172-31-35-141.ec2.internal:6667 --topic alokkafka

I am not getting the results on consumer side. screenshot of consumer attached below