AttributeError: 'function' object has no attribute '_jrdd'


#1

I am trying to create a temporary table using Spark Data Frame from the Kafka Streaming data. So that, I can execute query on the table.

My code is as follows.

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row

from operator import add

def getSqlContextInstance(sparkContext):
if (‘sqlContextSingletonInstance’ not in globals()):
globals()[‘sqlContextSingletonInstance’] = SQLContext(sparkContext)
return globals()[‘sqlContextSingletonInstance’]

def process(time,rdd):

print("========= %s =========" % str(time))

	try:

	 # Get the singleton instance of SQLContext
    	sqlContext = getSqlContextInstance(rdd.context)

	rowRDD = rdd.map(lambda rec: Row(RequestID=rec.split(' ')[0],Date=split(' ')[3].replace('[',''),Product=rec.split(' ')[6].split('/')[2]))

	rdsDataFrame = sqlContext.createDataFrame(rowRdd)
	
	# Register as table
    	wordsDataFrame.registerTempTable("words")

    	# Do word count on table using SQL and print it
    	wordCountsDataFrame = sqlContext.sql("select count(*) as total from word")

    	wordCountsDataFrame.show()

except:
    	pass

if name == ‘main’:

conf =  SparkConf().setAppName("Flume-Kafka-Spark Streaming WordCount").setMaster("yarn-client")

sc = SparkContext(conf = conf)
ssc =  StreamingContext(sc, 10)
topic = ["project-wordcount"]

brokers = {"metadata.broker.list": "Kafka broker lists....."}

directKafkaStream = KafkaUtils. \
createDirectStream(ssc,topic,brokers)
messages = directKafkaStream.map(lambda rec:rec[1])
departmentMessages = messages.filter(lambda rec:rec.split(' ')[6].split('/')[1] == 'department')
departmentMessages.foreachRDD(lambda rdd:process)

ssc.start()
ssc.awaitTermination()

While executing spark-submit, I am getting following error:

Error running job streaming job 1544149840000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File “/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/streaming/util.py”, line 67, in call
return r._jrdd
AttributeError: ‘function’ object has no attribute ‘_jrdd’

Can anyone tell me what is wrong here and how can I fix it?


#2

Hi Shamik,

There have been breaking changes in Kafka API. So, the way we connect to kafka has change. The error you are facing is due to that specifically in Spark 2.
We have an example of connecting and reading from kafka in our github repository here: