Spark streaming and Kafka integration

Hi Team,

I have written a java code using spark streaming library to read the data from kafka topic . Basically spark streaming app is reading data from kafka topic. When I tried to execute on the cluster , I got the below error:-

[vardhan17126602@cxln4 ~]$ java -cp DataFlow.jar:/home/vardhan17126602/* com.vardhan.SparkStreamingApp
Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:76)
at org.apache.spark.SparkConf.(SparkConf.scala:71)
at org.apache.spark.SparkConf.(SparkConf.scala:58)
at com.vardhan.SparkStreamingApp.main(SparkStreamingApp.java:21)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 4 more

I checked StackOverflow for this error and found below link

It suggests some changes in spark configuration files for which I won’t have access. Can you please take a look and let me know if something can be done or if I am doing something wrong.

Code and related jars are available at: /home/vardhan17126602
DataFlow.jar

Regards
Vardhan Bhoumik

We can pass the spark configuration from the command line.

Hi Abhinav,

I tried few things but couldn’t get it work. Can you please help me with the command which can help resolve this issue.

Regards
Vardhan Bhoumik

What exactly are you trying to do here? This is not the right way to run spark application

Yes Abhinav, I was not writing the correct command.

I used the below command and now the job is getting initiated but failing with class not found issue.

spark-submit --jars $(echo /home/vardhan17126602/*.jar | tr ’ ’ ‘,’) --class com.vardhan.SparkStreamingApp /home/vardhan17126602/DataFlow.jar cxln4.c.thelab-240901.internal:6667

Issue:
20/01/01 18:48:15 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.streaming.kafka.KafkaRDD$$anonfun$4.apply(KafkaRDD.scala:100)
at org.apache.spark.streaming.kafka.KafkaRDD$$anonfun$4.apply(KafkaRDD.scala:100)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Have you faced this issue anytime.

Regards
Vardhan Bhoumik