Find the missing numbers in sequence using Spark (Interview Question)

You are given a list of numbers in a textfile. These numbers are from 1 to 10000000.

There are some numbers missing from the sequence.

How would find the missing numbers?

Hi,
Here which i have taken few missing numbers from 1 to 10.

scala> val num = sc.parallelize(List(1,3,5,9,10))
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at :21

scala> val maxvalue = num.max()
maxvalue: Int = 10

scala> val fdata = sc.parallelize(1 to maxvalue).subtract(num)
fdata: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at subtract at :25

scala> fdata.collect
res14: Array[Int] = Array(4, 8, 2, 6, 7)

Thx,
Kiran

3 Likes

Will “(1 to maxvalue)” not overflow the local value if the maxvalue is really big?

3 Likes

Hi Sandeep,

I have taken a sample of random numbers between 1 to 100 in text file using the sample function. File is saved as ‘mysample.txt’ in my home directory.

I wrote the below code to get the missing numbers in the sequence

Step1:

val myrdd = sc.textFile(“mysample.txt”).flatMap(_.split(",")).map(x=>x.toInt)

Step2:

def missnum(x:Int,y:Int):Int = {
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import java.io.FileWriter
var missing = ArrayBufferInt
val writer = new FileWriter(“output.txt”)
if((y-x)>1){
for(a<- x+1 to y-1){
missing += a;
writer.write(a)
}
return y;
}else{
return y;
}
}

Step3:

val missrdd = myrdd.reduce(missnum)

I am able to successfully execute this code and got the output.txt in my local directory. But this file is empty. Could you please help me to check why the output.txt file is empty and why it is not saving the results?

Thanks.

Here the assumption is that the reduce will be called always on adjacent values. In a distributed environment, it is never guaranteed that reduce will be called on adjacent values. So, the answer is going to be wrong.

Second, in distributed environment, the “output.txt” will be created parallely on all the nodes where the tasks are going to run.

Also, looks like you forgot to close the file, that’s why the output.txt might be empty.

1 Like

def findMissing(xs: List[Int]): List[Int] = {
xs match {
case (a :: b :: tail) if (b - a != 1) => ((a + 1 until b) toList) ++ findMissing(tail)
case (a :: b :: tail) if (b - a == 1) => findMissing(tail)
case(a :: Nil) => Nil
case (Nil) => Nil
}
}

Can we do it like this. Working on to convert it to RDD but it seams not all operations are applicable in RDD.

Hi Sandeep,
Can you pls check if this makes sense.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,sum,lag}

object entry {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(“Weather Analysis”)
.config(“spark.master”, “local”)
.getOrCreate()

spark.sparkContext.setLogLevel(“ERROR”)

import spark.implicits._
val sc = spark.sparkContext
val rdd = sc.parallelize(List(1, 3, 5, 9, 10))
val w = Window.partitionBy().orderBy(“num”)
val df = rdd.toDF(“num”).withColumn(“prev_value”, lag(“num”,1).over(w))

//df.withColumn(“prev_value”, lag(df[“num”],1).over(w))
val x = df.filter(x => x.get(1) != null).map(x => (x.getInt(1)+1 until x.getInt(0)).toList).reduce( ++ )
println(x)
}
}

Output#:
List(2, 4, 6, 7, 8)

I doubt if this solution will work in distributed case as when we combine we need to find the previous element so result may not be correct. Your comments please.