Real time processing of Tweets with apache Storm


In this post I’m going to explain in an easy way how to process information on real time by using Apache Storm.

Before to start

Before to start with the configuration of the project and the explanation about Apache Storm it is necessary to create a developer account on Twitter, for this purpose you need to sign in in the following url: https://apps.twitter.com/ and register an application, once you created the application Twitter will provide you the following information:

oauth.consumerSecret=
oauth.consumerKey=
oauth.accessToken=
oauth.accessTokenSecret=

Configuration

The application will be a Java application created with Maven, so we need to add the following dependencies:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.0.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-core</artifactId>
        <version>4.0.4</version>
    </dependency>
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.4</version>
    </dependency>
</dependencies>

The application will use an api to get information on real time named twitter4j-stream, in order to configure it we will create a file named twitter4j.properties in the folder src/main/resources with the following properties:

oauth.consumerSecret={Tu consumerSecret}
oauth.consumerKey={Tu consumerKey}
oauth.accessToken={Tu accessToken}
oauth.accessTokenSecret={Tu accessTokenSecret}

Apache Storm

Apache storm is a tool that allows you to process information in real time by using the following components:

  • Spout: The spouts are components used to read information from a source of data and emit this information to the components that are going to execute the logic. In this example the source of data will be Twitter and the information to emit will be the tweets related with Java, JavaScript,PHP and Phyton.
  • Bolts: Bolts are components that will receive the information from the spouts and execute the business logic of the application. In this example the bolt will receive all the tweets coming from the spout and will execute the following logic:
    • Receive tweets related with programming languages
    • Validate what king of language is talking
    • Write the tweet in a file depending of the programing language
  • Stream groupings: Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt’s tasks. In the example we will specify that our spout will emit the information to our bolt.
  • Topology: All the logic of a real time application will be executed in a topology (spouts+bolts), this is equivalent to aMapReduce job in Hadoop, the difference is that a MapReduce job  will finish and the topology will be running all the time.

Writing the code

The first step is to define an enum with all the supported languages that we will monitor in the social network with the file names that are going to store the tweets.

/**
 *
 */
package com.raidentrance.util;

/**
 * @author alex @raidentrance
 *
 */
public enum Languages {
    JAVA("java", "java.txt"), JAVASCRIPT("javascript", "javascript.txt"), PHP("php", "php.txt"), PYTHON("python",
            "python.txt");

    private String name;
    private String fileName;

    private Languages(String name, String fileName) {
        this.name = name;
        this.fileName = fileName;
    }
    public String getName() {
        return name;
    }
    public String getFileName() {
        return fileName;
    }
}

The next step will be create a file manager, in this example it will write the tweets in the corresponding file.

/**
 *
 */
package com.raidentrance.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class FileManager implements Serializable {

    private static final long serialVersionUID = -3987517536486344388L;

    public void writeTweet(Status tweet, String fileName) throws IOException {
        File file = new File(fileName);
        if (!file.exists()) {
            file.createNewFile();
        }

        FileWriter fileWritter = new FileWriter(file.getName(),true);
        BufferedWriter bufferWritter = new BufferedWriter(fileWritter);
        bufferWritter.write("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount() + "\n Tweet id "
                + tweet.getId() + "\n User id" + tweet.getUser().getName()
                + "\n----------------------------------------");
        bufferWritter.close();
    }
}

Once we create all our helper classes, we will start with our storm components, the first one will be the spout that will read the information from Twitter:

/**
 *
 */
package com.raidentrance.spout;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import com.raidentrance.util.Languages;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

/**
 * @author alex @raidentrance
 *
 */
public class TweetStreamSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private LinkedBlockingQueue<Status> queue;
    private TwitterStream twitterStream;

    private static final long serialVersionUID = 4256154244602991768L;

    public void nextTuple() {
        final Status status = queue.poll();
        if (status == null) {
            Utils.sleep(50);
        } else {
            collector.emit(new Values(status));
        }
    }

    public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext context, SpoutOutputCollector collector) {
        System.out.println("Opening the bolt");
        this.collector = collector;
        this.twitterStream = new TwitterStreamFactory().getInstance();
        this.queue = new LinkedBlockingQueue<>();
        StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
                queue.offer(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {
            }

            @Override
            public void onTrackLimitationNotice(int i) {
            }

            @Override
            public void onScrubGeo(long l, long l1) {
            }

            @Override
            public void onException(Exception e) {
            }

            @Override
            public void onStallWarning(StallWarning warning) {
            }
        };
        twitterStream.addListener(listener);
        FilterQuery filterQuery = new FilterQuery();

        for (Languages language : Languages.values()) {
            filterQuery.track(language.getName());
        }

        twitterStream.filter(filterQuery);
    }

    @Override
    public void activate() {
    };

    @Override
    public void deactivate() {
        twitterStream.cleanUp();
    };

    @Override
    public void close() {
        twitterStream.shutdown();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("status"));
    }
}

The spout TweetStreamSpout uses the api twitter4j-stream  to receive as a real time stream all the tweets related with the words java, javascript, php and python and it will emit the data using a output variable named status to the bolt.

Once we have the source of data the next step will be create a bolt to execute the business logic of our application:

/**
 *
 */
package com.raidentrance.bolt;

import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.raidentrance.util.FileManager;
import com.raidentrance.util.Languages;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterAnalyzerBolt extends BaseRichBolt {

    private FileManager manager = new FileManager();
    private OutputCollector collector;

    private static final long serialVersionUID = 8465078768241865446L;

    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        final Status tweet = (Status) tuple.getValueByField("status");
        for (Languages language : Languages.values()) {
            if (tweet.getText().toLowerCase().contains(language.getName())) {
                try {
                    manager.writeTweet(tweet, language.getFileName());
                } catch (IOException e) {
                    collector.fail(tuple);
                }
            }
        }

        System.out.println("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount()
                + "\n Tweet id " + tweet.getId() + "\n User id" + tweet.getUser().getName()
                + "\n----------------------------------------");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

The bolt TwitterPrinterBolt will get all the information coming from the spout by using the variable status and will execute the logic to save the tweet in the corresponding file

The last step will be create the topology, in this component we will define the spouts and bolts, the relationship between them and the deployment mechanism.

/**
 *
 */
package com.raidentrance.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

import com.raidentrance.bolt.TwitterAnalyzerBolt;
import com.raidentrance.spout.TweetStreamSpout;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterTopology {
    public static void main(String args[]) {
            TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("twitterSpout", new TweetStreamSpout());
        builder.setBolt("twitterAnalyzerBolt", new TwitterAnalyzerBolt(), 1).shuffleGrouping("twitterSpout");

        Config conf = new Config();
        conf.setDebug(false);

        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("twitterTopology", conf, builder.createTopology());
    }
}

To create the topology we will use the class TopologyBuilder and with this we will define the spouts, bolts and relationships.
To define the deployment mechanism we will use for this example the class LocalCluster, it will start storm instance to deploy our application, this class is just for development, for production you have to follow a different process.

Execution the application

Once we wrote all the code the next step will be execute the class TwitterTopology. It will print all the tweets that is processing and generate the files per programming language wi the received tweets.

You can download the complete code in the following link https://github.com/raidentrance/storm-sample.

Sources: http://storm.apache.org

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Procesamiento en tiempo real de Tweets utilizando Apache Storm en español


En este post se explicará de forma sencilla cómo procesar información utilizando Apache Storm en tiempo real, el caso de ejemplo será el procesamiento de Tweets.

Antes de iniciar

Antes de iniciar con la configuración del proyecto y la explicación sobre Apache Storm es necesario crear una cuenta de desarrollador en Twitter para esto es necesario registrarse en la siguiente URL https://apps.twitter.com/ y registrar una aplicación, con esto se proveerán los siguientes datos:

oauth.consumerSecret=
oauth.consumerKey=
oauth.accessToken=
oauth.accessTokenSecret=

Configuración

La aplicación a desarrollar será una aplicación Java construida con Apache Maven, para esto se deben agregar las siguientes dependencias:

<dependencies>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>1.0.1</version>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
	<dependency>
		<groupId>org.twitter4j</groupId>
		<artifactId>twitter4j-core</artifactId>
		<version>4.0.4</version>
	</dependency>
	<dependency>
		<groupId>org.twitter4j</groupId>
		<artifactId>twitter4j-stream</artifactId>
		<version>4.0.4</version>
	</dependency>
</dependencies>

El siguiente punto será crear un archivo de configuración llamado twitter4j.properties en la carpeta src/main/resources con las siguientes propiedades:

oauth.consumerSecret={Tu consumerSecret}
oauth.consumerKey={Tu consumerKey}
oauth.accessToken={Tu accessToken}
oauth.accessTokenSecret={Tu accessTokenSecret}

Apache Storm

Apache storm es una herramienta que permite realizar procesamiento en tiempo real basado en los siguientes componentes:

  • Spouts: Los spouts son componentes utilizados para leer información de una fuente de datos y emitirla a los componentes que ejecutarán la lógica de la aplicación. En el caso de ejemplo la fuente de datos será Twitter y la información a emitir serán los Tweets relacionados con Java, JavaScript,PHP y Python.
  • Bolts: Los bolts son componentes que recibirán la información emitida por los spouts y ejecutarán el procesamiento, es decir, la lógica de negocio en la aplicación.En el caso de ejemplo el bolt que se escribirá recibirá los tweets que emita el spout y ejecutará la siguiente lógica:
    • Recibirá tweets relacionados con los lenguajes de programación
    • Verificará el lenguaje del que se habla en el tweet y lo escribirá en el archivo que le corresponde
  • Shuffle Grouping: Define cómo va a fluir la información, es decir, que spout emitirá información a que bolt.
  • Topology: La lógica de una aplicación en tiempo real será definida en una topología (spouts + bolts), es equivalente a un MapReduce job en Hadoop, la diferencia es que un MapReduce job termina en algún momento y la topología se ejecuta todo el tiempo.

Escribiendo código

El primer paso a seguir para realizar este procesamiento es definir una enumeración con los lenguajes de programación que se desean monitorear en la red social así como los archivos que se utilizarán para cada uno de ellos.

/**
 *
 */
package com.raidentrance.util;

/**
 * @author alex @raidentrance
 *
 */
public enum Languages {
	JAVA("java", "java.txt"), JAVASCRIPT("javascript", "javascript.txt"), PHP("php", "php.txt"), PYTHON("python",
			"python.txt");

	private String name;
	private String fileName;

	private Languages(String name, String fileName) {
		this.name = name;
		this.fileName = fileName;
	}
	public String getName() {
		return name;
	}
	public String getFileName() {
		return fileName;
	}
}

El siguiente paso es crear un administrador de archivos que para este ejemplo será el que escribirá los tweets en el archivo correspondiente.

/**
 *
 */
package com.raidentrance.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class FileManager implements Serializable {

	private static final long serialVersionUID = -3987517536486344388L;

	public void writeTweet(Status tweet, String fileName) throws IOException {
		File file = new File(fileName);
		if (!file.exists()) {
			file.createNewFile();
		}

		FileWriter fileWritter = new FileWriter(file.getName(),true);
        BufferedWriter bufferWritter = new BufferedWriter(fileWritter);
        bufferWritter.write("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount() + "\n Tweet id "
				+ tweet.getId() + "\n User id" + tweet.getUser().getName()
				+ "\n----------------------------------------");
        bufferWritter.close();
	}
}

El siguiente paso para escribir la topología será escribir el spout que leerá la información de Twitter y la emitirá a los bolts que ejecutarán la lógica.

/**
 *
 */
package com.raidentrance.spout;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import com.raidentrance.util.Languages;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

/**
 * @author alex @raidentrance
 *
 */
public class TweetStreamSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private LinkedBlockingQueue<Status> queue;
	private TwitterStream twitterStream;

	private static final long serialVersionUID = 4256154244602991768L;

	public void nextTuple() {
		final Status status = queue.poll();
		if (status == null) {
			Utils.sleep(50);
		} else {
			collector.emit(new Values(status));
		}
	}

	public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext context, SpoutOutputCollector collector) {
		System.out.println("Opening the bolt");
		this.collector = collector;
		this.twitterStream = new TwitterStreamFactory().getInstance();
		this.queue = new LinkedBlockingQueue<>();
		StatusListener listener = new StatusListener() {
			@Override
			public void onStatus(Status status) {
				queue.offer(status);
			}

			@Override
			public void onDeletionNotice(StatusDeletionNotice sdn) {
			}

			@Override
			public void onTrackLimitationNotice(int i) {
			}

			@Override
			public void onScrubGeo(long l, long l1) {
			}

			@Override
			public void onException(Exception e) {
			}

			@Override
			public void onStallWarning(StallWarning warning) {
			}
		};
		twitterStream.addListener(listener);
		FilterQuery filterQuery = new FilterQuery();

		for (Languages language : Languages.values()) {
			filterQuery.track(language.getName());
		}

		twitterStream.filter(filterQuery);
	}

	@Override
	public void activate() {
	};

	@Override
	public void deactivate() {
		twitterStream.cleanUp();
	};

	@Override
	public void close() {
		twitterStream.shutdown();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("status"));
	}
}

El spout TweetStreamSpout utiliza el api twitter4j-stream para recibir como un flujo en tiempo real  los tweets relacionados con las palabras java, javascript, php y python y emitirá dichas publicaciones utilizando como variable de salida la palabra “status” al bolt que se defina.

Una vez que se cuenta con la fuente de información el siguiente paso es crear el bolt que ejecutará la lógica a ejecutar:

/**
 *
 */
package com.raidentrance.bolt;

import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.raidentrance.util.FileManager;
import com.raidentrance.util.Languages;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterAnalyzerBolt extends BaseRichBolt {

	private FileManager manager = new FileManager();
	private OutputCollector collector;

	private static final long serialVersionUID = 8465078768241865446L;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		final Status tweet = (Status) tuple.getValueByField("status");
		for (Languages language : Languages.values()) {
			if (tweet.getText().toLowerCase().contains(language.getName())) {
				try {
					manager.writeTweet(tweet, language.getFileName());
				} catch (IOException e) {
					collector.fail(tuple);
				}
			}
		}

		System.out.println("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount()
				+ "\n Tweet id " + tweet.getId() + "\n User id" + tweet.getUser().getName()
				+ "\n----------------------------------------");
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
}

El bolt TwitterPrinterBolt obtendrá la información emitida por el spout utilizando la variable status y ejecuta la lógica para guardar dicho tweet en el archivo correspondiente.

El último paso será crear la topología, en esta se definirán los spouts, bolts, la relación entre ellos y el modo de despliegue.

/**
 *
 */
package com.raidentrance.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

import com.raidentrance.bolt.TwitterAnalyzerBolt;
import com.raidentrance.spout.TweetStreamSpout;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterTopology {
	public static void main(String args[]) {
	        TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("twitterSpout", new TweetStreamSpout());
		builder.setBolt("twitterAnalyzerBolt", new TwitterAnalyzerBolt(), 1).shuffleGrouping("twitterSpout");

		Config conf = new Config();
		conf.setDebug(false);

		final LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("twitterTopology", conf, builder.createTopology());
	}
}

Para crear la topología se utilizará la clase TopologyBuilder y con esta se definirán los spouts, shuffleGrouping y bolts así como el modo de despliegue, como se puede observar se esta utilizando la clase LocalCluster, la cual iniciará el servicio de storm así como todas sus dependencias en un entorno de desarrollo. Si se deseara desplegar en un cluster de producción se debe seguir un procedimiento distinto

Ejecutando la aplicación

Una vez que se escribió todo el código se ejecutará la clase TwitterTopology. Esta imprimirá los tweets que se están procesando y generará los archivos por lenguaje con los tweets recibidos.

Puedes encontrar el código completo en el siguiente link: https://github.com/raidentrance/storm-sample

Fuentes : http://storm.apache.org

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com