Hi,
I am trying to connect to MySQL database through a pyspark code. In the code I am building a data pipeline involving Kafka, spark streaming. I am using the command below:
spark-submit --class mysql-connector-java-5.1.36-bin.jar --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3
,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1 --jars spark-core_2.11-1.5.2.logging.jar, mysql-connector-java-5.1.36-bin.jar ./kafka-spark-
hive.py
and I am getting the below error:
20/04/14 06:01:06 WARN deploy.SparkSubmit$$anon$2: Failed to load mysql-connector-java-5.1.36-bin.jar.
java.lang.ClassNotFoundException: mysql-connector-java-5.1.36-bin.jar
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:810)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/04/14 06:01:06 INFO util.ShutdownHookManager: Shutdown hook called
20/04/14 06:01:06 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-8b1dae2f-54eb-4c62-a94a-8a7c35a8a22e
below is the code trying to run
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
#from pyspark_llap.sql.session import HiveWarehouseSessionImpl
#from os.path import expanduser, join, abspath
#from pyspark.sql.types import StructType
#from pyspark.sql.types import StructField
#from pyspark.sql.types import *
#from pyspark.sql.types.DataType import *
#warehouse_location = abspath(“hdfs://usr/hive/warehouse”)
sc = SparkContext(“local[2]”, “KafkaStreamingConsumer”)
#spark = SparkSession.builder.appName
spark = SparkSession.builder.appName(“SparkByExample”).config(“spark.sql.warehouse.dir”, warehouse_location).enableHiveSupport().getOrCreate()
ssc = StreamingContext(sc, 5)
kafkaStream = KafkaUtils.createStream(ssc, “cxln1.c.thelab-240901.internal:2181”, “consumer-group”, {“csv-file”: 1})
#readStream
df = spark.readStream.format(“kafka”).option(“kafka.bootstrap.servers”, “cxln4.c.thelab-240901.internal:9092”).option(“subscribe”, “csv-file”)
.load("/home/ubuntu/kafka-data.csv")
#writeStream
df.writeStream.format(“console”).option(“truncate”, “false”)
#driver=‘com.mysql.jdbc.Driver’,dbtable=‘kafka_consumer_messages’,user=‘sqoopuser’,password=‘NHkkP876rp’).mode(‘append’).save()
print("Event recieved in window: ", kafkaStream.pprint())
jdbcDF = spark.read
.format(“jdbc”)
.option(“url”, “jdbc:mysql://cxln2.c.thelab-240901.internal/sqoopex”)
.option(“dbtable”, “widgets”)
.option(“user”, “sqoopuser”)
.option(“password”, “NHkkP876rp”)
.load()
jdbcDf.show()
ssc.start()
ssc.awaitTermination()