Reading avro data to spark


#1

Hi Team,
I have been struggling with this problem since long. I am taking twitter feeds using flume to do some analysis.
Flume.properties:

Name the components on this agent

TwitterAgent.sources = Twitter
TwitterAgent.sinks = HDFS
TwitterAgent.channels = MemChannel

Describe/configure the source

TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = rmx6aKl8FwwAfhvb29bq0kGvo
TwitterAgent.sources.Twitter.consumerSecret = moOrx5g9GgDA8qePUtJfs9Sl7eiD25N7onM17u1E58ikn6ztgq
TwitterAgent.sources.Twitter.accessToken = 769400387598188545-txqC4Sl9MsShfFk85IYgCo2huGUGrhv
TwitterAgent.sources.Twitter.accessTokenSecret = C8UsWE0WJfCZ1JX43zIV0HhkVHapIuDRMJUw5PDiE3Jo1
TwitterAgent.sources.Twitter.keywords = India

TwitterAgent.sources.Twitter.language = en

Describe the sink

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://ip-172-31-35-141.ec2.internal:8020/user/alokdeosingh1995/twitter_data
TwitterAgent.sinks.HDFS.hdfs.filetype = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 100
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

Use a channel which buffers events in memory

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

I am getting a sequence file from twitter in the attached format(sample) where the text is in avro format.


The avro schema is like this:
{
“type”:“record”,“name”:“Doc”,“doc”:“adoc”,“fields”:[
{“name”:“id”,“type”:“string”},
{“name”:“user_friends_count”,“type”:[“int”,“null”]},
{“name”:“user_location”,“type”:[“string”,“null”]},
{“name”:“user_description”,“type”:[“string”,“null”]},
{“name”:“user_statuses_count”,“type”:[“int”,“null”]},
{“name”:“user_followers_count”,“type”:[“int”,“null”]},
{“name”:“user_name”,“type”:[“string”,“null”]},
{“name”:“user_screen_name”,“type”:[“string”,“null”]},
{“name”:“created_at”,“type”:[“string”,“null”]},
{“name”:“text”,“type”:[“string”,“null”]},
{“name”:“retweet_count”,“type”:[“long”,“null”]},
{“name”:“retweeted”,“type”:[“boolean”,“null”]},
{“name”:“in_reply_to_user_id”,“type”:[“long”,“null”]},
{“name”:“source”,“type”:[“string”,“null”]},
{“name”:“in_reply_to_status_id”,“type”:[“long”,“null”]},
{“name”:“media_url_https”,“type”:[“string”,“null”]},
{“name”:“expanded_url”,“type”:[“string”,“null”]}
]
}

The spark code begins like this:

import com.databricks.spark.avro.SchemaConverters
import org.apache.spark.sql.SparkSession
import org.apache.avro.Schema
import com.twitter.bijection.avro.GenericAvroCodecs
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object entry_1 {

val spark = SparkSession
.builder()
.appName(“Twitter Analysis”)
.config(“spark.master”, “local”)
.getOrCreate()
val TWEET_SCHEMA: String =
“{” +
““type”:“record”,“name”:“Doc”,“doc”:“adoc”,“fields”:[” +
“{“name”:“id”,“type”:“string”},” +
“{“name”:“user_friends_count”,“type”:“int”},” +
“{“name”:“user_location”,“type”:“string”},” +
“{“name”:“user_description”,“type”:“string”},” +
“{“name”:“user_statuses_count”,“type”:“int”},” +
“{“name”:“user_followers_count”,“type”:“int”},” +
“{“name”:“user_name”,“type”:“string”},” +
“{“name”:“user_screen_name”,“type”:“string”},” +
“{“name”:“created_at”,“type”:“string”},” +
“{“name”:“text”,“type”:“string”},” +
“{“name”:“retweet_count”,“type”:“long”},” +
“{“name”:“retweeted”,“type”:“boolean”},” +
“{“name”:“in_reply_to_user_id”,“type”:“long”},” +
“{“name”:“source”,“type”:“string”},” +
“{“name”:“in_reply_to_status_id”,“type”:“long”},” +
“{“name”:“media_url_https”,“type”:“string”},” +
“{“name”:“expanded_url”,“type”:“string”}” +
“]}”

case class tweet(id: String, user_friends_count: Int, user_location: String, user_description: String,
user_statuses_count: Int, user_followers_count: Int, user_name: String,
user_screen_name: String, created_at: String, text: String, retweet_count: Long,
retweeted: Boolean, in_reply_to_user_id: Long, source: String, in_reply_to_status_id: Long,
media_url_https: String, expanded_url: String)

val parser: Schema.Parser = new Schema.Parser()
val schema: Schema = parser.parse(TWEET_SCHEMA)
val recordInjection = GenericAvroCodecs.toBinary(schema)

def main(args: Array[String]): Unit = {

val messages = spark.sparkContext
.sequenceFileorg.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text
.map(x => (x._1.toString,Array(x._2.toString.toByte)))

val x = messages.map(r => recordInjection.invert(r._2).get)
val df_schema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
x.map(obj => {
val seq = (obj.)
})

}
}

My objective is to create dataframe from avro. I have the schema but I am not able to pull out individual fields from avro data. Where am I going wrong?