Procesamiento en tiempo real de Tweets utilizando Apache Storm en español


En este post se explicará de forma sencilla cómo procesar información utilizando Apache Storm en tiempo real, el caso de ejemplo será el procesamiento de Tweets.

Antes de iniciar

Antes de iniciar con la configuración del proyecto y la explicación sobre Apache Storm es necesario crear una cuenta de desarrollador en Twitter para esto es necesario registrarse en la siguiente URL https://apps.twitter.com/ y registrar una aplicación, con esto se proveerán los siguientes datos:

oauth.consumerSecret=
oauth.consumerKey=
oauth.accessToken=
oauth.accessTokenSecret=

Configuración

La aplicación a desarrollar será una aplicación Java construida con Apache Maven, para esto se deben agregar las siguientes dependencias:

<dependencies>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>1.0.1</version>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
	<dependency>
		<groupId>org.twitter4j</groupId>
		<artifactId>twitter4j-core</artifactId>
		<version>4.0.4</version>
	</dependency>
	<dependency>
		<groupId>org.twitter4j</groupId>
		<artifactId>twitter4j-stream</artifactId>
		<version>4.0.4</version>
	</dependency>
</dependencies>

El siguiente punto será crear un archivo de configuración llamado twitter4j.properties en la carpeta src/main/resources con las siguientes propiedades:

oauth.consumerSecret={Tu consumerSecret}
oauth.consumerKey={Tu consumerKey}
oauth.accessToken={Tu accessToken}
oauth.accessTokenSecret={Tu accessTokenSecret}

Apache Storm

Apache storm es una herramienta que permite realizar procesamiento en tiempo real basado en los siguientes componentes:

  • Spouts: Los spouts son componentes utilizados para leer información de una fuente de datos y emitirla a los componentes que ejecutarán la lógica de la aplicación. En el caso de ejemplo la fuente de datos será Twitter y la información a emitir serán los Tweets relacionados con Java, JavaScript,PHP y Python.
  • Bolts: Los bolts son componentes que recibirán la información emitida por los spouts y ejecutarán el procesamiento, es decir, la lógica de negocio en la aplicación.En el caso de ejemplo el bolt que se escribirá recibirá los tweets que emita el spout y ejecutará la siguiente lógica:
    • Recibirá tweets relacionados con los lenguajes de programación
    • Verificará el lenguaje del que se habla en el tweet y lo escribirá en el archivo que le corresponde
  • Shuffle Grouping: Define cómo va a fluir la información, es decir, que spout emitirá información a que bolt.
  • Topology: La lógica de una aplicación en tiempo real será definida en una topología (spouts + bolts), es equivalente a un MapReduce job en Hadoop, la diferencia es que un MapReduce job termina en algún momento y la topología se ejecuta todo el tiempo.

Escribiendo código

El primer paso a seguir para realizar este procesamiento es definir una enumeración con los lenguajes de programación que se desean monitorear en la red social así como los archivos que se utilizarán para cada uno de ellos.

/**
 *
 */
package com.raidentrance.util;

/**
 * @author alex @raidentrance
 *
 */
public enum Languages {
	JAVA("java", "java.txt"), JAVASCRIPT("javascript", "javascript.txt"), PHP("php", "php.txt"), PYTHON("python",
			"python.txt");

	private String name;
	private String fileName;

	private Languages(String name, String fileName) {
		this.name = name;
		this.fileName = fileName;
	}
	public String getName() {
		return name;
	}
	public String getFileName() {
		return fileName;
	}
}

El siguiente paso es crear un administrador de archivos que para este ejemplo será el que escribirá los tweets en el archivo correspondiente.

/**
 *
 */
package com.raidentrance.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class FileManager implements Serializable {

	private static final long serialVersionUID = -3987517536486344388L;

	public void writeTweet(Status tweet, String fileName) throws IOException {
		File file = new File(fileName);
		if (!file.exists()) {
			file.createNewFile();
		}

		FileWriter fileWritter = new FileWriter(file.getName(),true);
        BufferedWriter bufferWritter = new BufferedWriter(fileWritter);
        bufferWritter.write("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount() + "\n Tweet id "
				+ tweet.getId() + "\n User id" + tweet.getUser().getName()
				+ "\n----------------------------------------");
        bufferWritter.close();
	}
}

El siguiente paso para escribir la topología será escribir el spout que leerá la información de Twitter y la emitirá a los bolts que ejecutarán la lógica.

/**
 *
 */
package com.raidentrance.spout;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import com.raidentrance.util.Languages;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

/**
 * @author alex @raidentrance
 *
 */
public class TweetStreamSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private LinkedBlockingQueue<Status> queue;
	private TwitterStream twitterStream;

	private static final long serialVersionUID = 4256154244602991768L;

	public void nextTuple() {
		final Status status = queue.poll();
		if (status == null) {
			Utils.sleep(50);
		} else {
			collector.emit(new Values(status));
		}
	}

	public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext context, SpoutOutputCollector collector) {
		System.out.println("Opening the bolt");
		this.collector = collector;
		this.twitterStream = new TwitterStreamFactory().getInstance();
		this.queue = new LinkedBlockingQueue<>();
		StatusListener listener = new StatusListener() {
			@Override
			public void onStatus(Status status) {
				queue.offer(status);
			}

			@Override
			public void onDeletionNotice(StatusDeletionNotice sdn) {
			}

			@Override
			public void onTrackLimitationNotice(int i) {
			}

			@Override
			public void onScrubGeo(long l, long l1) {
			}

			@Override
			public void onException(Exception e) {
			}

			@Override
			public void onStallWarning(StallWarning warning) {
			}
		};
		twitterStream.addListener(listener);
		FilterQuery filterQuery = new FilterQuery();

		for (Languages language : Languages.values()) {
			filterQuery.track(language.getName());
		}

		twitterStream.filter(filterQuery);
	}

	@Override
	public void activate() {
	};

	@Override
	public void deactivate() {
		twitterStream.cleanUp();
	};

	@Override
	public void close() {
		twitterStream.shutdown();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("status"));
	}
}

El spout TweetStreamSpout utiliza el api twitter4j-stream para recibir como un flujo en tiempo real  los tweets relacionados con las palabras java, javascript, php y python y emitirá dichas publicaciones utilizando como variable de salida la palabra “status” al bolt que se defina.

Una vez que se cuenta con la fuente de información el siguiente paso es crear el bolt que ejecutará la lógica a ejecutar:

/**
 *
 */
package com.raidentrance.bolt;

import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.raidentrance.util.FileManager;
import com.raidentrance.util.Languages;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterAnalyzerBolt extends BaseRichBolt {

	private FileManager manager = new FileManager();
	private OutputCollector collector;

	private static final long serialVersionUID = 8465078768241865446L;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		final Status tweet = (Status) tuple.getValueByField("status");
		for (Languages language : Languages.values()) {
			if (tweet.getText().toLowerCase().contains(language.getName())) {
				try {
					manager.writeTweet(tweet, language.getFileName());
				} catch (IOException e) {
					collector.fail(tuple);
				}
			}
		}

		System.out.println("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount()
				+ "\n Tweet id " + tweet.getId() + "\n User id" + tweet.getUser().getName()
				+ "\n----------------------------------------");
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
}

El bolt TwitterPrinterBolt obtendrá la información emitida por el spout utilizando la variable status y ejecuta la lógica para guardar dicho tweet en el archivo correspondiente.

El último paso será crear la topología, en esta se definirán los spouts, bolts, la relación entre ellos y el modo de despliegue.

/**
 *
 */
package com.raidentrance.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

import com.raidentrance.bolt.TwitterAnalyzerBolt;
import com.raidentrance.spout.TweetStreamSpout;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterTopology {
	public static void main(String args[]) {
	        TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("twitterSpout", new TweetStreamSpout());
		builder.setBolt("twitterAnalyzerBolt", new TwitterAnalyzerBolt(), 1).shuffleGrouping("twitterSpout");

		Config conf = new Config();
		conf.setDebug(false);

		final LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("twitterTopology", conf, builder.createTopology());
	}
}

Para crear la topología se utilizará la clase TopologyBuilder y con esta se definirán los spouts, shuffleGrouping y bolts así como el modo de despliegue, como se puede observar se esta utilizando la clase LocalCluster, la cual iniciará el servicio de storm así como todas sus dependencias en un entorno de desarrollo. Si se deseara desplegar en un cluster de producción se debe seguir un procedimiento distinto

Ejecutando la aplicación

Una vez que se escribió todo el código se ejecutará la clase TwitterTopology. Esta imprimirá los tweets que se están procesando y generará los archivos por lenguaje con los tweets recibidos.

Puedes encontrar el código completo en el siguiente link: https://github.com/raidentrance/storm-sample

Fuentes : http://storm.apache.org

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Anuncios

1 comentario »

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s