How to interact with Spark using Python 2 from a python program (not notebook)

Dear @sgiri & Team,

When I am executing script using pyspark. I am getting error as “Import Error: No module named pyspark”.
Kindly check and provide me solution for the same.
PFA for more details.

Please follow the code in this notebook to inititalize the spark context.

Let me know you are not able to follow it.

Hi,

I followed the notebook mentioned in my previous reply above.

I have created a new file retail_db/src/main/python/GetRevenuePerProductId_sg.py by copying your code. The content of the code look like the following. I have basically added 10 lines to your code.

import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python2.7"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python2.7"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Daily Revenue Per Product").setMaster("yarn")
sc = SparkContext(conf=conf)

orders = sc.textFile("Data/retail_db/orders/")
orderItems = sc.textFile("Data/retail_db/order_items/")

ordersFilter = orders. \
filter(lambda o: o.split(",")[3] in ("COMPLETE","CLOSED")). \
map(lambda o : (int(o.split(",")[0]), o.split(",")[1]))

orderItemsFilter = orderItems. \
map(lambda oi : (int(oi.split(",")[1]), (int(oi.split(",")[2]), float(oi.split(",")[4]))))

orderJoin = ordersFilter.join(orderItemsFilter)

orderItemsSel = orderJoin.map(lambda o : ((o[1][0],o[1][1][0]),o[1][1][1]))

orderItemsSum = orderItemsSel.reduceByKey(lambda a,b: a+b)

##productsRaw = open("/user/gowthambabu82699/Data/retail_db/products/").read().splitlines()
##products = sc.parallelize(productsRaw)
products = sc.textFile("Data/retail_db/products/")
productsRdd = products.map(lambda p: (int(p.split(",")[0]), p.split(",")[2]))

orderItemsJoin = orderItemsSum. \
map(lambda oi : (int(oi[0][1]), (oi[0][0],oi[1])))

finalJoin = orderItemsJoin.join(productsRdd)

finalResult = finalJoin.\
map(lambda o: ((o[1][0][0],-o[1][0][1]),(o[1][0][0]+","+str(o[1][0][1])+","+o[1][1]))). \
sortByKey()

finalResultToFile = finalResult.map(lambda rec: rec[1])

finalResultToFile. \
coalesce(2). \
saveAsTextFile("/user/gowthambabu82699/pyspark-result/daily_revenue_text_python2")

Then, I deleted the output folder else it would throw error:

hadoop fs -rmr pyspark-result/daily_revenue_text_python2

Then I executed it:

[gowthambabu82699@cxln4 ~]$ python retail_db/src/main/python/GetRevenuePerProductId_sg.py
Setting default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Once it finished, the resulting folder looks like this:

[gowthambabu82699@cxln4 ~]$ hadoop fs -ls pyspark-result/daily_revenue_text_python2/
Found 3 items
-rw-r–r-- 3 gowthambabu82699 gowthambabu82699 0 2020-04-24 17:59 pyspark-result/daily_revenue_text_python2/_SUCCESS
-rw-r–r-- 3 gowthambabu82699 gowthambabu82699 320166 2020-04-24 17:59 pyspark-result/daily_revenue_text_python2/part-00000
-rw-r–r-- 3 gowthambabu82699 gowthambabu82699 283725 2020-04-24 17:59 pyspark-result/daily_revenue_text_python2/part-00001

This is how you can take a look at the results.

[gowthambabu82699@cxln4 ~]$ hadoop fs -tail pyspark-result/daily_revenue_text_python2/part-00000
Grip
2014-01-31 00:00:00.0,14.99,Hirzl Men’s Hybrid Golf Glove
2014-02-01 00:00:00.0,11999.4,Field & Stream Sportsman 16 Gun Fire Safe
2014-02-01 00:00:00.0,8158.64,Perfect Fitness Perfect Rip Deck
2014-02-01 00:00:00.0,7799.48,Diamondback Women’s Serene Classic Comfort Bi
2014-02-01 00:00:00.0,6999.3,Nike Men’s Free 5.0+ Running Shoe
2014-02-01 00:00:00.0,6100.0,Nike Men’s Dri-FIT Victory Golf Polo
2014-02-01 00:00:00.0,4159.68,Nike Men’s CJ Elite 2 TD Football Cleat
2014-02-01 00:00:00.0,4038.99,Under Armour Girls’ Toddler Spine Surge Runni
2014-02-01 00:00:00.0,3999.8,Pelican Sunstream 100 Kayak
2014-02-01 00:00:00.0,3898.44,O’Brien Men’s Neoprene Life Vest
2014-02-01 00:00:00.0,499.95,Nike Kids’ Grade School KD VI Basketball Shoe
2014-02-01 00:00:00.0,455.0,LIJA Women’s Eyelet Sleeveless Golf Polo
2014-02-01 00:00:00.0,399.99,GoPro HERO3+ Black Edition Camera
2014-02-01 00:00:00.0,399.8,Fitbit The One Wireless Activity & Sleep Trac
2014-02-01 00:00:00.0,360.0,Nike Men’s Deutschland Weltmeister Winners Bl

Hope that solves your problem.