How can I use the latest Spark 2.3.0 Version in CloudxLab
If you are using Python Jupyter Notebook you can use findspark:
import findspark
# Find all other installation using !ls /usr/
#findspark.init('/usr/spark2.3')
#findspark.init('/usr/spark2.3')
#findspark.init('/usr/spark2.3')
findspark.init('/usr/spark2.4.3')
import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("PysparkExample").getOrCreate()
sc = spark.sparkContext
I am completely new to CloudXlab and trying to run Spark code in which I am trying to load file into rdds
from pyspark import SparkContext
sc = SparkContext(“local[*]”, “wordcount”)
input1 = sc.textFile(‘D:\TrendyTech_BigData\Week9_Spark\SparkDatasets\newfile.txt’)
word = input1.flatMap(lambda x: x.split(" "))
wordcount = word.map(lambda x: (x, 1))
final_count = wordcount.reduceByKey(lambda x, y: x + y)
result = final_count.collect()
Can anyone please tell me how can i load this file into RDD while using this lab, please?
Thanks Sandeep for your help on this. It worked for me on Jupyter Notebook. Statements executed just fine. I am facing issues while I am running the below code.
from pyspark import SparkContext
sc = SparkContext(“local[*]”, “wordcount”)
input1 = sc.textFile(‘D:\TrendyTech_BigData\Week9_Spark\SparkDatasets\newfile.txt’)
word = input1.flatMap(lambda x: x.split(" "))
wordcount = word.map(lambda x: (x, 1))
final_count = wordcount.reduceByKey(lambda x, y: x + y)
result = final_count.collect()
Below is the error message I have been getting:
Py4JJavaError Traceback (most recent call last)
in
----> 1 finalcount = wordcount.reduceByKey(lambda x, y: (x + y))
/usr/spark2.4.3/python/pyspark/rdd.py in reduceByKey(self, func, numPartitions, partitionFunc)
1623 [(‘a’, 2), (‘b’, 1)]
1624 “”"
→ 1625 return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
1626
1627 def reduceByKeyLocally(self, func):
/usr/spark2.4.3/python/pyspark/rdd.py in combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions, partitionFunc)
1851 “”"
1852 if numPartitions is None:
→ 1853 numPartitions = self._defaultReducePartitions()
1854
1855 serializer = self.ctx.serializer
/usr/spark2.4.3/python/pyspark/rdd.py in _defaultReducePartitions(self)
2261 return self.ctx.defaultParallelism
2262 else:
→ 2263 return self.getNumPartitions()
2264
2265 def lookup(self, key):
/usr/spark2.4.3/python/pyspark/rdd.py in getNumPartitions(self)
2515
2516 def getNumPartitions(self):
→ 2517 return self._prev_jrdd.partitions().size()
2518
2519 @property
/usr/spark2.4.3/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
→ 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/spark2.4.3/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 “An error occurred while calling {0}{1}{2}.\n”.
→ 328 format(target_id, “.”, name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o40.partitions.
: java.io.IOException: No FileSystem for scheme: D
** at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)**
** at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)**
** at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)**
** at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)**
** at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)**
** at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)**
** at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)**
** at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)**
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
You should upload the file to cloudxlab first, and then provide its path(its location in cloudxlab).
You can refer to https://cloudxlab.com/faq/20/how-should-i-upload-files-from-my-local-machine-to-cloudxlab for it.