In a directory, I have nearly 20k files. My requirement is to work on the files that are created yesterday. To do that, I came up with the below code which will get me the files that are created on the same day in that directory:
val spark = SparkSession.builder.master("yarn").appName("AutoCheck").enableHiveSupport().getOrCreate()
`import spark.implicits._`
val t = (x:Long) => { new SimpleDateFormat("yyyy-MM-dd").format(x)}
def getFileTree(f: File): Stream[File] =
f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree)
else Stream.empty)
val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = Instant.now
val today = now.toEpochMilli
val yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMilli
val todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"
// Steps to get the files of current date
val localFiles = getFileTree(new File("/tmp/hive_audits/")).filter(_.getName.endsWith(".log"))
val fileCrtDateDesc = localFiles.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFiles = fileCrtDateDesc.toList.map(y => (y._1,t(y._2)))
val filesToday = latestFiles.filter(y => y._2==todaySimpDate)
// Getting file into spark
val localFileNames = filesToday.map(y => local+y._1)
val fileName = localFileNames(2).split("/")(6)
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val localPath = new Path(localFileNames(2))
val hdfsPath = new Path(s"hdfs://fdldev/user/fdlhdpetl/dailylogs/${fileName}")
hdfs.copyFromLocalFile(localPath,hdfsPath)
val fileDF = spark.read.text("/user/fdlhdpetl/dailylogs")
Now I have the all the log files that are preent in the dir: /user/fdlhdpetl/dailylogs in the dataframe: fileDF
The content in the dataframe looks like below: (I am giving data of three log files here so the question doesn’t look too big)
There are three types of status in the log files: error, failure, success
Each file starts with the line: “jobID” and ends with “Updating the job keeper…” which can be seen below:
JobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column
[Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
I am trying to extracting all three cases into three different dataframes. Each for “error”, “failure”, “success”.
Could anyone let me know if it is possible & if it is, how can I achieve that.