Getting compilation error for custom producer in kafka


#1

I have return a small code for creating a custom producer and I am getting the below error.

error : " <console>:27: error: object kafka is not a member of package org.apache"

Could you please help me in that.


#2

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

//import scala.collection.JavaConverters._
//import org.apache.kafka.clients.Producer.KafkaProducer
import java.util.Properties
//import java.lang.Object
//import org.apache.kafka.clients.Producer.*;

object kafkaproducer_test {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ssc = new StreamingContext(conf,Seconds(2))
val sqlc = new SQLContext(sc)
val hc = new HiveContext(sc)

    val props = new Properties()
    props.put("bootstrap.servers","ip-172-31-20-247.ec2.internal:2181");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    val producer = new KafkaProducer[String, String](props)
    
    for( i <- 1 to 10) {
      //producer.send(new ProducerRecord[String, String]("jin", "test", "test"));
        val record = new ProducerRecord("jin", "key", "the end ")
        producer.send(record) }

producer.close();
}
}

Attaching the code for the reference …


#3

Hi @Jinesh_Choudhary,

I think the error is self-explanatory? Did you check online for the solution?


#4

Yes @abhinavsingh, My guess was that there is some kafka lib need to be imported but that is not the case. I am unable to import any of the kafka lib. In Jupyter notebook, If we try to import any kafka lib, it gives the same error. Please suggest me.


#5

Hi @Jinesh_Choudhary,

You will have to pass the Kafka JAR. You may have to check the Spark and jupyter documentation on how to pass the JARs in the notebook.

Hope this helps.

Thanks.


#6

Hi Jinesh,
Are you able to resolve the issue? I am also trying to produce a message through java code, but unable to do so. Can you please help me with the steps to produce and consume messages using java code.

java -cp .:kafka-clients-0.10.0.0.jar:prod6.jar com.kafka.produce.ProduceMessage

I am getting the below error:
Exception in thread “main” java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:131)
at com.kafka.produce.ProduceMessage.main(ProduceMessage.java:24)
Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Regards,Pradeep


#7

Hi Pradeep,

First you need to include all the dependent libraries in your code and then we need to built the FAT JAR file with all the dependent jars.

For creating the FATjar, you can use either use Maven or SBT. Once you have fat jar, you will not get his error.


#8

Thanks Jinesh for providing the inputs. I will try and update you.

Regards,
Pradeep.