You are given a list of numbers in a textfile. These numbers are from 1 to 10000000.
There are some numbers missing from the sequence.
How would find the missing numbers?
You are given a list of numbers in a textfile. These numbers are from 1 to 10000000.
There are some numbers missing from the sequence.
How would find the missing numbers?
Hi,
Here which i have taken few missing numbers from 1 to 10.
scala> val num = sc.parallelize(List(1,3,5,9,10))
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at :21
scala> val maxvalue = num.max()
maxvalue: Int = 10
scala> val fdata = sc.parallelize(1 to maxvalue).subtract(num)
fdata: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at subtract at :25
scala> fdata.collect
res14: Array[Int] = Array(4, 8, 2, 6, 7)
Thx,
Kiran
Will â(1 to maxvalue)â not overflow the local value if the maxvalue is really big?
Hi Sandeep,
I have taken a sample of random numbers between 1 to 100 in text file using the sample function. File is saved as âmysample.txtâ in my home directory.
I wrote the below code to get the missing numbers in the sequence
Step1:
val myrdd = sc.textFile(âmysample.txtâ).flatMap(_.split(",")).map(x=>x.toInt)
Step2:
def missnum(x:Int,y:Int):Int = {
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.FileWriter
var missing = ArrayBufferInt
val writer = new FileWriter(âoutput.txtâ)
if((y-x)>1){
for(a<- x+1 to y-1){
missing += a;
writer.write(a)
}
return y;
}else{
return y;
}
}
Step3:
val missrdd = myrdd.reduce(missnum)
I am able to successfully execute this code and got the output.txt in my local directory. But this file is empty. Could you please help me to check why the output.txt file is empty and why it is not saving the results?
Thanks.
Here the assumption is that the reduce will be called always on adjacent values. In a distributed environment, it is never guaranteed that reduce will be called on adjacent values. So, the answer is going to be wrong.
Second, in distributed environment, the âoutput.txtâ will be created parallely on all the nodes where the tasks are going to run.
Also, looks like you forgot to close the file, thatâs why the output.txt might be empty.
def findMissing(xs: List[Int]): List[Int] = {
xs match {
case (a :: b :: tail) if (b - a != 1) => ((a + 1 until b) toList) ++ findMissing(tail)
case (a :: b :: tail) if (b - a == 1) => findMissing(tail)
case(a :: Nil) => Nil
case (Nil) => Nil
}
}
Can we do it like this. Working on to convert it to RDD but it seams not all operations are applicable in RDD.
Hi Sandeep,
Can you pls check if this makes sense.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,sum,lag}
object entry {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(âWeather Analysisâ)
.config(âspark.masterâ, âlocalâ)
.getOrCreate()
spark.sparkContext.setLogLevel(âERRORâ)
import spark.implicits._
val sc = spark.sparkContext
val rdd = sc.parallelize(List(1, 3, 5, 9, 10))
val w = Window.partitionBy().orderBy(ânumâ)
val df = rdd.toDF(ânumâ).withColumn(âprev_valueâ, lag(ânumâ,1).over(w))
//df.withColumn(âprev_valueâ, lag(df[ânumâ],1).over(w))
val x = df.filter(x => x.get(1) != null).map(x => (x.getInt(1)+1 until x.getInt(0)).toList).reduce( ++ )
println(x)
}
}
Output#:
List(2, 4, 6, 7, 8)
I doubt if this solution will work in distributed case as when we combine we need to find the previous element so result may not be correct. Your comments please.