Spark streaming and Kafka integration


#1

Hi Team,

I have written a java code using spark streaming library to read the data from kafka topic . Basically spark streaming app is reading data from kafka topic. When I tried to execute on the cluster , I got the below error:-

[vardhan17126602@cxln4 ~]$ java -cp DataFlow.jar:/home/vardhan17126602/* com.vardhan.SparkStreamingApp
Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:76)
at org.apache.spark.SparkConf.(SparkConf.scala:71)
at org.apache.spark.SparkConf.(SparkConf.scala:58)
at com.vardhan.SparkStreamingApp.main(SparkStreamingApp.java:21)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FSDataInputStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 4 more

I checked StackOverflow for this error and found below link

It suggests some changes in spark configuration files for which I won’t have access. Can you please take a look and let me know if something can be done or if I am doing something wrong.

Code and related jars are available at: /home/vardhan17126602
DataFlow.jar

Regards
Vardhan Bhoumik


#2

We can pass the spark configuration from the command line.


#3

Hi Abhinav,

I tried few things but couldn’t get it work. Can you please help me with the command which can help resolve this issue.

Regards
Vardhan Bhoumik


#4

What exactly are you trying to do here? This is not the right way to run spark application


#5

Yes Abhinav, I was not writing the correct command.

I used the below command and now the job is getting initiated but failing with class not found issue.

spark-submit --jars $(echo /home/vardhan17126602/*.jar | tr ’ ’ ‘,’) --class com.vardhan.SparkStreamingApp /home/vardhan17126602/DataFlow.jar cxln4.c.thelab-240901.internal:6667

Issue:
20/01/01 18:48:15 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.streaming.kafka.KafkaRDD$$anonfun$4.apply(KafkaRDD.scala:100)
at org.apache.spark.streaming.kafka.KafkaRDD$$anonfun$4.apply(KafkaRDD.scala:100)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Have you faced this issue anytime.

Regards
Vardhan Bhoumik