Error while loading .csv file in Spark


#1

Hi,

I’m loading a file in Spark and I’m getting an error

Input path does not exist: hdfs://ip-172-31-53-48.ec2.internal:8020/home/dhksk09430/gh-driver-schedule.csv

I’ve uploaded gh-driver-schedule.csv in HDFS also, but still the same error.

How should I fix it?


#2

Hi,

Do not get confused between local Linux file system (which is in web console) and HDFS.

/home/dhksk09430 is your home directory on Linux console (web console)

/user/dhksk09430 is your home directory on HDFS.

Please note that on HDFS and web console, you will have permissions to create files only in your home directory.

We can load files either from HDFS or web console.

Load files from HDFS

To load files from HDFS use one the below commands

  • sc.textFile("hdfs:///user/dhksk09430/gh-driver-schedule.csv")
  • sc.textFile("hdfs://gh-driver-schedule.csv")
  • sc.textFile("hdfs://ip-172-31-53-48.ec2.internal:8020/user/dhksk09430/gh-driver-schedule.csv")

Load files from web console or Linux file system

To load files from web console use below command

sc.textFile("file:///home/dhksk09430/gh-driver-schedule.csv")

Hope this helps. Feel free to let me know if you have any queries.


#3

Hello Abhinav,

I got the same kind of error.

Kindly help.

Traceback (most recent call last):
File “”, line 1, in
File “/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py”, line 773, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File “/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py”, line 538, in call
File “/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py”, line 36, in deco
return f(*a, **kw)
File “/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py”, line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://ip-172-31-53-48.ec2.internal:8020/home/cvkonduru7525/text.txt
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Cheers,
Chaitanya Varma


#4

Hi,

The above answer by me will help you in solving your problem. Do not get confused between the local file system on web console and HDFS.


#5

Hi Abhinav,

I copied the file successfully to my HDFS and I am trying to read the file from HDFS. It’s not working.

[cvkonduru7525@ip-172-31-20-58 ~]$ hadoop fs -ls /user/cvkonduru7525
Found 3 items
drwx------ - cvkonduru7525 cvkonduru7525 0 2017-07-30 16:29 /user/cvkonduru7525/.Trash
-rw-r–r-- 3 cvkonduru7525 cvkonduru7525 380 2017-07-30 16:21 /user/cvkonduru7525/Audio.txt
-rw-r–r-- 3 cvkonduru7525 cvkonduru7525 380 2017-07-30 16:07 /user/cvkonduru7525/text.txt
[cvkonduru7525@ip-172-31-20-58 ~]$

text = sc.textFile(“hdfs:///user/cvkonduru7525/Audio.txt”)
17/07/30 16:30:29 INFO MemoryStore: ensureFreeSpace(201752) called with curMem=0, maxMem=278019440
17/07/30 16:30:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 197.0 KB, free 264.9 MB)
17/07/30 16:30:29 INFO MemoryStore: ensureFreeSpace(18970) called with curMem=201752, maxMem=278019440
17/07/30 16:30:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.5 KB, free 264.9 MB)
17/07/30 16:30:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:33975 (size: 18.5 KB, free: 265.1 MB)
17/07/30 16:30:29 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2

text = sc.textFile(“hdfs://ip-172-31-20-58/user/cvkonduru7525/Audio.txt”)
17/07/30 16:32:39 INFO MemoryStore: ensureFreeSpace(201792) called with curMem=220722, maxMem=2780194 40
17/07/30 16:32:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 197. 1 KB, free 264.7 MB)
17/07/30 16:32:39 INFO MemoryStore: ensureFreeSpace(18970) called with curMem=422514, maxMem=27801944 0
17/07/30 16:32:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated siz e 18.5 KB, free 264.7 MB)
17/07/30 16:32:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:33975 (size: 18.5 KB, free: 265.1 MB)
17/07/30 16:32:39 INFO SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.ja va:-2

I am getting the same error. Kindly help me with this.

Cheers,
Chaitanya Varma


#6

Hi @Siva_Chaitanya_Varma,

NameNode host is wrong in sc.textFile. Namenode is located at ip-172-31-53-48.ec2.internal (which you can find in Ambari)

Below command should work

text = sc.textFile("hdfs://ip-172-31-53-48.ec2.internal:8020/user/cvkonduru7525/Audio.txt")

or

text = sc.textFile("hdfs:///user/cvkonduru7525/Audio.txt")

Hope this helps


#7

Hi Abhinav sir,

I tried by using the two commands that you adviced. It didn’t work. Kindly help me please.

Using Python version 2.7.5 (default, Sep 15 2016 22:37:39)
SparkContext available as sc, HiveContext available as sqlContext.

text = sc.textFile(“hdfs:///user/cvkonduru7525/Audio.txt”)
17/07/31 13:13:45 INFO MemoryStore: ensureFreeSpace(201752) called with curMem=0, maxMem=278019440
17/07/31 13:13:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 197.0 KB, free 264.9 MB)
17/07/31 13:13:45 INFO MemoryStore: ensureFreeSpace(18970) called with curMem=201752, maxMem=278019440
17/07/31 13:13:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.5 KB, free 264.9 MB)
17/07/31 13:13:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58803 (size: 18.5 KB, free: 265.1 MB)
17/07/31 13:13:45 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2

text = sc.textFile(“hdfs://ip-172-31-53-48.ec2.internal:8020/user/cvkonduru7525/Audio.txt”)
17/07/31 13:14:37 INFO MemoryStore: ensureFreeSpace(201792) called with curMem=220722, maxMem=278019440
17/07/31 13:14:37 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 197.1 KB, free 264.7 MB)
17/07/31 13:14:37 INFO MemoryStore: ensureFreeSpace(18970) called with curMem=422514, maxMem=278019440
17/07/31 13:14:37 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 18.5 KB, free 264.7 MB)
17/07/31 13:14:37 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:58803 (size: 18.5 KB, free: 265.1 MB)
17/07/31 13:14:37 INFO SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2

Cheers,
Chaitanya Varma


#8

@Wright_Jim. Is this issue resolved for you?


#9

Yes @Siva_Chaitanya_Varma,

@abhinav reply helped me.

I just checked out your code. Your code looks good. Can you point out what is the error as I do not see any error :slight_smile:

Just type text.take(2) and you will see that your code is working. Below command should help

text = sc.textFile("hdfs:///user/cvkonduru7525/Audio.txt")
text.take(2)

#10

Hi,

I specified the file path as you mentioned for Linux(web console) as below -

distFile = sc.textFile(“file:///home/rjsohoni9630/data.txt”)

and then am trying to do this - distFile.take(5)

Still it is throwing error as -

Py4JJavaError: An error occurred while calling o288.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs:/user/rjsohoni9630/data.txt
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)

Please help me with this.