Real time processing of Tweets with apache Storm


In this post I’m going to explain in an easy way how to process information on real time by using Apache Storm.

Before to start

Before to start with the configuration of the project and the explanation about Apache Storm it is necessary to create a developer account on Twitter, for this purpose you need to sign in in the following url: https://apps.twitter.com/ and register an application, once you created the application Twitter will provide you the following information:

oauth.consumerSecret=
oauth.consumerKey=
oauth.accessToken=
oauth.accessTokenSecret=

Configuration

The application will be a Java application created with Maven, so we need to add the following dependencies:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.0.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-core</artifactId>
        <version>4.0.4</version>
    </dependency>
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.4</version>
    </dependency>
</dependencies>

The application will use an api to get information on real time named twitter4j-stream, in order to configure it we will create a file named twitter4j.properties in the folder src/main/resources with the following properties:

oauth.consumerSecret={Tu consumerSecret}
oauth.consumerKey={Tu consumerKey}
oauth.accessToken={Tu accessToken}
oauth.accessTokenSecret={Tu accessTokenSecret}

Apache Storm

Apache storm is a tool that allows you to process information in real time by using the following components:

  • Spout: The spouts are components used to read information from a source of data and emit this information to the components that are going to execute the logic. In this example the source of data will be Twitter and the information to emit will be the tweets related with Java, JavaScript,PHP and Phyton.
  • Bolts: Bolts are components that will receive the information from the spouts and execute the business logic of the application. In this example the bolt will receive all the tweets coming from the spout and will execute the following logic:
    • Receive tweets related with programming languages
    • Validate what king of language is talking
    • Write the tweet in a file depending of the programing language
  • Stream groupings: Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt’s tasks. In the example we will specify that our spout will emit the information to our bolt.
  • Topology: All the logic of a real time application will be executed in a topology (spouts+bolts), this is equivalent to aMapReduce job in Hadoop, the difference is that a MapReduce job  will finish and the topology will be running all the time.

Writing the code

The first step is to define an enum with all the supported languages that we will monitor in the social network with the file names that are going to store the tweets.

/**
 *
 */
package com.raidentrance.util;

/**
 * @author alex @raidentrance
 *
 */
public enum Languages {
    JAVA("java", "java.txt"), JAVASCRIPT("javascript", "javascript.txt"), PHP("php", "php.txt"), PYTHON("python",
            "python.txt");

    private String name;
    private String fileName;

    private Languages(String name, String fileName) {
        this.name = name;
        this.fileName = fileName;
    }
    public String getName() {
        return name;
    }
    public String getFileName() {
        return fileName;
    }
}

The next step will be create a file manager, in this example it will write the tweets in the corresponding file.

/**
 *
 */
package com.raidentrance.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class FileManager implements Serializable {

    private static final long serialVersionUID = -3987517536486344388L;

    public void writeTweet(Status tweet, String fileName) throws IOException {
        File file = new File(fileName);
        if (!file.exists()) {
            file.createNewFile();
        }

        FileWriter fileWritter = new FileWriter(file.getName(),true);
        BufferedWriter bufferWritter = new BufferedWriter(fileWritter);
        bufferWritter.write("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount() + "\n Tweet id "
                + tweet.getId() + "\n User id" + tweet.getUser().getName()
                + "\n----------------------------------------");
        bufferWritter.close();
    }
}

Once we create all our helper classes, we will start with our storm components, the first one will be the spout that will read the information from Twitter:

/**
 *
 */
package com.raidentrance.spout;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import com.raidentrance.util.Languages;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

/**
 * @author alex @raidentrance
 *
 */
public class TweetStreamSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private LinkedBlockingQueue<Status> queue;
    private TwitterStream twitterStream;

    private static final long serialVersionUID = 4256154244602991768L;

    public void nextTuple() {
        final Status status = queue.poll();
        if (status == null) {
            Utils.sleep(50);
        } else {
            collector.emit(new Values(status));
        }
    }

    public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext context, SpoutOutputCollector collector) {
        System.out.println("Opening the bolt");
        this.collector = collector;
        this.twitterStream = new TwitterStreamFactory().getInstance();
        this.queue = new LinkedBlockingQueue<>();
        StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
                queue.offer(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {
            }

            @Override
            public void onTrackLimitationNotice(int i) {
            }

            @Override
            public void onScrubGeo(long l, long l1) {
            }

            @Override
            public void onException(Exception e) {
            }

            @Override
            public void onStallWarning(StallWarning warning) {
            }
        };
        twitterStream.addListener(listener);
        FilterQuery filterQuery = new FilterQuery();

        for (Languages language : Languages.values()) {
            filterQuery.track(language.getName());
        }

        twitterStream.filter(filterQuery);
    }

    @Override
    public void activate() {
    };

    @Override
    public void deactivate() {
        twitterStream.cleanUp();
    };

    @Override
    public void close() {
        twitterStream.shutdown();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("status"));
    }
}

The spout TweetStreamSpout uses the api twitter4j-stream  to receive as a real time stream all the tweets related with the words java, javascript, php and python and it will emit the data using a output variable named status to the bolt.

Once we have the source of data the next step will be create a bolt to execute the business logic of our application:

/**
 *
 */
package com.raidentrance.bolt;

import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.raidentrance.util.FileManager;
import com.raidentrance.util.Languages;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterAnalyzerBolt extends BaseRichBolt {

    private FileManager manager = new FileManager();
    private OutputCollector collector;

    private static final long serialVersionUID = 8465078768241865446L;

    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        final Status tweet = (Status) tuple.getValueByField("status");
        for (Languages language : Languages.values()) {
            if (tweet.getText().toLowerCase().contains(language.getName())) {
                try {
                    manager.writeTweet(tweet, language.getFileName());
                } catch (IOException e) {
                    collector.fail(tuple);
                }
            }
        }

        System.out.println("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount()
                + "\n Tweet id " + tweet.getId() + "\n User id" + tweet.getUser().getName()
                + "\n----------------------------------------");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

The bolt TwitterPrinterBolt will get all the information coming from the spout by using the variable status and will execute the logic to save the tweet in the corresponding file

The last step will be create the topology, in this component we will define the spouts and bolts, the relationship between them and the deployment mechanism.

/**
 *
 */
package com.raidentrance.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

import com.raidentrance.bolt.TwitterAnalyzerBolt;
import com.raidentrance.spout.TweetStreamSpout;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterTopology {
    public static void main(String args[]) {
            TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("twitterSpout", new TweetStreamSpout());
        builder.setBolt("twitterAnalyzerBolt", new TwitterAnalyzerBolt(), 1).shuffleGrouping("twitterSpout");

        Config conf = new Config();
        conf.setDebug(false);

        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("twitterTopology", conf, builder.createTopology());
    }
}

To create the topology we will use the class TopologyBuilder and with this we will define the spouts, bolts and relationships.
To define the deployment mechanism we will use for this example the class LocalCluster, it will start storm instance to deploy our application, this class is just for development, for production you have to follow a different process.

Execution the application

Once we wrote all the code the next step will be execute the class TwitterTopology. It will print all the tweets that is processing and generate the files per programming language wi the received tweets.

You can download the complete code in the following link https://github.com/raidentrance/storm-sample.

Sources: http://storm.apache.org

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s