for launching pyspark
pyspark --packages com.databricks:spark-avro_2.10:2.0.1
orders = sc.textFile(“itversity_data/retail_db/orders”)
orderItems = sc.textFile(“itversity_data/retail_db/order_items”)
ordersFiltered = orders.filter(lambda o: o.split(",")[3] in [“COMPLETE”,“CLOSED”])
ordersMap = ordersFiltered.map(lambda o: (int(o.split(",")[0]), o.split(",")[1]))
orderItemsMap = orderItems.map(lambda oi:
(int(oi.split(",")[1]), (int(oi.split(",")[2]), float(oi.split(",")[4]))))
ordersJoin = ordersMap.join(orderItemsMap)
ordersJoinMap = ordersJoin.map(lambda o: ((o[1][0], o[1][1][0]), o[1][1][1]))
from operator import add
dailyRevenuePerProductId = ordersJoinMap.reduceByKey(add)
productsRaw = open("/home/akellasreevani4799/data/retail_db/products/part-00000").read().splitlines()
products = sc.parallelize(productsRaw)
productsMap = products.map(lambda p: (int(p.split(",")[0]), p.split(",")[2]))
dailyRevenuePerProductIdMap = dailyRevenuePerProductId.map(lambda r:
(r[0][1], (r[0][0],r[1])))
dailyRevenuePerProductJoin = dailyRevenuePerProductIdMap.join(productsMap)
dailyRevenuePerProduct = dailyRevenuePerProductJoin.map(lambda t:
((t[1][0][0],-t[1][0][1]), (t[1][0][0], round(t[1][0][1], 2), t[1][1])))
dailyRevenuePerProductSorted = dailyRevenuePerProduct.sortByKey()
dailyRevenuePerProductName = dailyRevenuePerProductSorted.map(lambda r: r[1])
dailyRevenuePerProductNameDF = dailyRevenuePerProductName.toDF(schema=[“order_date”, “revenue_per_product”, “product_name”]).coalesce(2)
dailyRevenuePerProductNameDF.show()
up to here no issues when i try to save the dailyRevenuePerProductNameDF into avro by using this code
dailyRevenuePerProductNameDF.write.format(“avro”).save(“itversity_data/retail_db/daily_revenue_avro_python”)
Traceback (most recent call last):
File “”, line 1, in
File “/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py”, line 550, in save
self._jwrite.save(path)
File “/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”,
line 1133, in call
File “/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py”, line 69, in deco
raise AnalysisException(s.split(’: ‘, 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u’Failed to find data source: avro. Please find an Avro
package at http://spark.apache.org/third-party-projects.html;’
type(dailyRevenuePerProductNameDF)
<class ‘pyspark.sql.dataframe.DataFrame’>from pyspark import since, SparkContext
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.column import Column, _to_java_column
from pyspark.util import _print_missing_jar
Traceback (most recent call last):
File “”, line 1, in
ImportError: No module named util
even used like this
dailyRevenuePerProductNameDF.save(“daily_revenue_avro_python”, “avro”)
Traceback (most recent call last):
File “”, line 1, in
File “/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py”, line 973, in getat
tr
“’%s’ object has no attribute ‘%s’” % (self.class.name, name))
AttributeError: ‘DataFrame’ object has no attribute ‘save’
even this also dint work
dailyRevenuePerProductNameDF.
… save(“itversity_data/retail_db/daily_revenue_avro_python”, “com.databricks.spark.avro”)
Traceback (most recent call last):
File “”, line 1, in
File “/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py”, line 973, in getat
tr
“’%s’ object has no attribute ‘%s’” % (self.class.name, name))
AttributeError: ‘DataFrame’ object has no attribute ‘save’
Please help me