below code worked for me,improvements or suggestions are welcome for best practice in Real Time scenarios.
-Thanks
Code:
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.bson.Document
object MongoLoad {
def main(args: Array[String]) {
println(s"Hi.. welcome to spark")
//val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.enableHiveSupport()
.getOrCreate()
val mdf=sparkSession.sql("select * from akash_spark.bucketed_cards where type='gold'")
MongoSpark.save(mdf.write.option("collection", "cards").mode("overwrite"))
val df = MongoSpark.load(sparkSession)
df.printSchema()
df.show
val mdf2=sparkSession.sql("select * from akash_spark.bucketed_cards where type='classic'")
MongoSpark.save(mdf2.write.option("collection", "cards").mode("overwrite"))
val testDf = MongoSpark.load(sparkSession)
testDf.printSchema()
testDf.show
}
}
and
sbt clean && sbt package && spark-submit --name hello --master yarn --deploy-mode client --executor-memory 1g --executor-cores 1 --num-executors 1 --class com.first.load.MongoLoad --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/akash_spark.cards?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/akash_spark.cards" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0 /home/udaychitukula6587/first.load/target/scala-2.11/helloworld_2.11-1.0.jar