Hi Team,
I have been working on Spark streaming and Kafka integration. Am using this class to counts the words in the message:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafkaWordCount {
def main(args: Array[String]): Unit ={
if(args.length < 3){
System.err.println("Usage: KafkaWordCount ")
}
val Array(zkQuorum, consumerGroup, topic) = args
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> zkQuorum,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> consumerGroup,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaWordCount")
val ssc = new StreamingContext(conf,Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
val stream = KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](Array(topic),kafkaParams)
)
var lines = stream.map(record => record.value)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
There after I had
- Created the jar
- Created a new topic - alokkafka
- Submitted on cluster using this command
spark-submit --class “KafkaWordCount” --jars spark-streaming-kafka-0-10-assembly_2.11-2.3.0.jar target/scala-2.11/wordcount_2.11-0.1.jar ip-172-31-38-146.ec2.internal:2181 spark-streaming-consumer-group alokkafka 2>/dev/null - On different session started producer
kafka-console-producer.sh --broker-list ip-172-31-35-141.ec2.internal:6667 --topic alokkafka
I am not getting the results on consumer side. screenshot of consumer attached below