import os
import sys
os.environ[“SPARK_HOME”] = “/usr/hdp/current/spark-client”
os.environ[“PYLIB”] = os.environ[“SPARK_HOME”] + “/python/lib”
sys.path.insert(0, os.environ[“PYLIB”] +"/py4j-0.8.2.1-src.zip")
sys.path.insert(0, os.environ[“PYLIB”] +"/pyspark.zip")
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext, Row
def main():
conf = SparkConf().setAppName(“PySpark Hive ACID table POC”)
sc = SparkContext()
hiveContext = HiveContext(sc)
hiveContext.sql(“DROP TABLE IF EXISTS chaitya.hive_acid”)
hiveContext.sql(“CREATE TABLE chaitya.hive_acid (key int, name string, zip string, is_current boolean) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS ORC LOCATION ‘/user/hive/warehouse/testdb.db/’ TBLPROPERTIES (‘transactional’=‘true’)”)
list1 = [(1, ‘bob’, ‘95136’, True), (2, ‘joe’, ‘70068’, True), (3, ‘steve’, ‘22150’, True)]
RDD1 = sc.parallelize(list1)
RDD1.toDF(“key”, “name”, “zip”, “is_current”).write.mode(“append”).format(‘orc’).saveAsTable(“chaitya.hive_acid”)
testRDD = hiveContext.sql(“SELECT * FROM vthota2.hive_acid”)
rowVals = testRDD.map(lambda row: (row[0], row[1], row[2], row[3]))
for row1 in rowVals.collect() :
print(row1)
if name == “main”:
main()