I am trying to create a temporary table using Spark Data Frame from the Kafka Streaming data. So that, I can execute query on the table.
My code is as follows.
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row
from operator import add
def getSqlContextInstance(sparkContext):
if (‘sqlContextSingletonInstance’ not in globals()):
globals()[‘sqlContextSingletonInstance’] = SQLContext(sparkContext)
return globals()[‘sqlContextSingletonInstance’]
def process(time,rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
sqlContext = getSqlContextInstance(rdd.context)
rowRDD = rdd.map(lambda rec: Row(RequestID=rec.split(' ')[0],Date=split(' ')[3].replace('[',''),Product=rec.split(' ')[6].split('/')[2]))
rdsDataFrame = sqlContext.createDataFrame(rowRdd)
# Register as table
wordsDataFrame.registerTempTable("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = sqlContext.sql("select count(*) as total from word")
wordCountsDataFrame.show()
except:
pass
if name == ‘main’:
conf = SparkConf().setAppName("Flume-Kafka-Spark Streaming WordCount").setMaster("yarn-client")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
topic = ["project-wordcount"]
brokers = {"metadata.broker.list": "Kafka broker lists....."}
directKafkaStream = KafkaUtils. \
createDirectStream(ssc,topic,brokers)
messages = directKafkaStream.map(lambda rec:rec[1])
departmentMessages = messages.filter(lambda rec:rec.split(' ')[6].split('/')[1] == 'department')
departmentMessages.foreachRDD(lambda rdd:process)
ssc.start()
ssc.awaitTermination()
While executing spark-submit, I am getting following error:
Error running job streaming job 1544149840000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File “/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/streaming/util.py”, line 67, in call
return r._jrdd
AttributeError: ‘function’ object has no attribute ‘_jrdd’
Can anyone tell me what is wrong here and how can I fix it?