Unable to write data into Hive using Structured Streaming


#1

I am using structured streaming to read data from a socket do a simple word count and then writing the output of the word count back to a hive table. The code that I am using is as below:-

First I have already created a managed table in hive as below:-

create table wordcounts_table(words String, count int) stored as parquet;
Then I am writing the below code in spark-shell:-

val lines = spark.readStream.format(“socket”).option(“host”, “localhost”).option(“port”, 9999).load().as[String]
val words = lines.flatMap(_.split(" "))
val query = wordCounts.writeStream.outputMode(“complete”).format(“parquet”).option(“metastore”, “thrift://ip-172-31-13-154.ec2.internal:9083”).option(“db”, “struct_stream”).option(“table”, “hdfs://ip-172-31-53-48.ec2.internal:8020/apps/hive/warehouse/struct_stream.db/wordcounts_table”).queryName(“socket-hive-streaming”).start()
When I execute the last line I get the below exception:-

java.lang.IllegalArgumentException: ‘path’ is not specified
The detailed screenshot of the exception I have attached. I have tried a multiple no. of things but still I am not able to understand what or how to resolve this issue.


#2

Hi @Rajnil_Guha,

Hive metastore IP has been changed. Please find the new IP in the Ambari.

Thanks


#3

I changed the hive metastore url to the new one available in hive but still it gives the same error:-

scala> val query = wordCounts.writeStream.format(“parquet”).option(“metastore”, “thrift://ip-172-31-20-247.ec2.internal:9083”).option(“db”, “struct_stream”).op
tion(“table”, “wordcounts_table”).queryName(“socket-hive-streaming”).start()
java.lang.IllegalArgumentException: ‘path’ is not specified
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:310)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:310)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:309)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
… 49 elided

Thanks & Regards
Rajnil Guha


#4

Hi @Rajnil_Guha ,
PLease try with the below line for writing.
val query = wordCounts.writeStream.format(“parquet”).start("/apps/hive/warehouse/struct_stream.db/wordcounts_table")
query.awaitTermination()
(use the path of the table you want to write into)
What this line does is that it will write the data as parquet file in the location that you specify.
After processing some data, just a select * from

query should give you the data.

Thanks,
Sumanth Sharma