Flume : Error while fetching Twitter Data

Hi,

Am trying to fetching twitter data using the below twitter conf file.

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type=com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey= xxx
TwitterAgent.sources.Twitter.consumerSecret=xxx
TwitterAgent.sources.Twitter.accessToken =
xxx
TwitterAgent.sources.Twitter.accessTokenSecret =
xxx
TwitterAgent.sources.Twitter.keywords = flipkart, flipkart online, marketing, online marketing
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = xxx
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionalCapacity = 100

But am getting the below error:

18/01/30 14:08:07 ERROR lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } - Exception follows.
java.lang.IllegalStateException: Authentication credentials are missing. See http://twitter4j.org/configuration.html for the detail.
        at twitter4j.TwitterBaseImpl.ensureAuthorizationEnabled(TwitterBaseImpl.java:200)
        at twitter4j.TwitterStreamImpl.filter(TwitterStreamImpl.java:287)
        at com.cloudera.flume.source.TwitterSource.start(TwitterSource.java:151)
        at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Am wondering whether i have to place the jar file “flume-sources-1.0-SNAPSHOT” in flume path in cloudxlab or not as everything is pre-built here. If so, kindly share the flume path to place the jar files.

Please specify the twitter credentials in the configuration file.

Thanks for your time.

Can you be specific about the twitter credentials? Example please

Hi @sammsundar4905,

This blog will help you in streaming Twitter data using Flume

Hope this helps.

Thanks

Hi @abhinav,

As per the blog you suggested, below is my new flume conf file

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
 
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = xxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxx
TwitterAgent.sources.Twitter.accessToken = xxxx
												  
TwitterAgent.sources.Twitter.accessTokenSecret = xxxx
TwitterAgent.sources.Twitter.keywords = theinterview, 17YearsOfNash, Warnock, RioCompetition, cpfc, Palace, London, Christmas, New Years
 
################## SINK #################################

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://ip-172-31-53-48.ec2.internal:8020/user/sammsundar4905/flume/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
 
#################### CHANNEL #########################
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 100
#default - TwitterAgent.channels.MemChannel.capacity = 100
TwitterAgent.channels.MemChannel.transactionCapacity = 100

And ran the flume job using the below command:

flume-ng agent -n TwitterAgent -Dtwitter4j.streamBaseURL=https://stream.twitter.com/1.1/-c conf -f /home/sammsundar4905/datasets/flumetwitter.conf

`Am getting the below error:

18/02/01 16:56:43 INFO twitter4j.TwitterStreamImpl: Waiting for 240000 milliseconds
18/02/01 17:00:43 INFO twitter4j.TwitterStreamImpl: Establishing connection.
18/02/01 17:00:43 INFO twitter4j.TwitterStreamImpl: 404:The URI requested is invalid or the resource requested, such as a user, does not exists. Also returned when the requested format is not supported by the requested method.
Unknown URL. See Twitter Streaming API documentation at http://dev.twitter.com/pages/streaming_api    

Please suggest me in this case

Hi @sammsundar4905,

I hope you replaced the twitter credentials before running the code.

Yes @abhinav. Twitter credentials was replaced before running the code. The error persists.

Any solution for this ?

Issue Fixed:

It turned out that while creating twitter application, one needs to set Callback URL to https://api.twitter.com/oauth/request_token Once it is set - you will see Twitter Authorize screen.

Thanks.

1 Like

Thats great!

Thanks for the info