Databinding en Angular 4 Parte 1


Databinding en Angular 4 es utilizado para comunicar los templates (Archivos html) con typescript (La lógica de negocio de nuestra aplicación). Esto nos permitirá generar aplicaciones web dinámicas que puedan generar contenido diferente de acuerdo a los usuarios.

Existen distintos modos de comunicación:

  • Para envío de información (Typescript->HTML):Permite tomar información de nuestro código typescript, este puede obtenerlo de un servicio web o de un cálculo y desplegarlo en una pantalla HTML, realizar este envío de información es posible a través de las siguientes opciones:
    • String interpolation:{{data}}
    • Property binding:[property]=”data”
  • Para la reacción a eventos del usuario(HTML-> Typescript): Los archivos HTML son la forma en la que el usuario interactúa con una aplicación, por esto es necesario disparar eventos hacia el código que es capaz de ejecutar la lógica de negocio, para hacerlo lo haremos con lo siguiente:
    • Event binding : (event)=”expression”
  • Combinación de ambos (Two way binding): También seremos capaces de ejecutar ambos al mismo tiempo con esto seremos capaces de disparar eventos y mostrar información al mismo del siguiente modo:
    • Two way data binding : [(ngModel)]=”data”

Con esto seremos capaces de realizar la comunicación entre nuestro sitio código typescript y nuestros templates html, veamos algunos ejemplos de cada modo. Se tomará como base para este post el ejemplo Crear componentes en Angular 4.

String interpolation

String interpolation funciona muy parecido a expression language, con esto es posible tanto declarar variables en el archivo typescript e utilizarlas dentro de nuestro html como escribir cualquier expresión que pueda ser traducida a un String veamos un ejemplo editando nuestro componente posts.components:

posts.components.ts

import { Component } from '@angular/core'
@Component({
    selector: 'app-posts',
    templateUrl: './posts.component.html'
})
export class PostsComponent {
    author='@raidentrance';
    title='Angular from scratch';

    getTitle(){
        return this.title;
    }
}

Como se puede observar, dentro de nuestro componente PostsComponent declararemos 2 atributos el atributo author y el atributo title y los inicializaremos con algunos valores por defecto. Es importante mencionar que es posible declarar métodos que devuelvan el valor de alguno de nuestros atributos, muestra de esto es el método getTitle().

posts.component.html

<div align="center">
    <h3>Post List</h3>
    <div>
        <table  class="table">
            <tr>
                <th>Author Twitter</th>
                <th>Post name</th>
                <th>Post date</th>
            </tr>
            <tr>
                <td>{{author}}</td>
                <td>{{getTitle()}}</td>
                <td> {{'19/08/1988'}}</td>
            </tr>
        </table>
    </div>
</div>

Es momento de analizar el funcionamiento del archivo HTML, como se puede observar en la tabla se utilizarán los símbolos {{}} para definir que utilizaremos String interpolation y se colocarán las variables definidas dentro de nuestra clase en typescript, a demás es importante resaltar que la fecha fue declarada como una expresión String, por esto es posible colocarla dentro de nuestro String interpolation y que del mismo modo que podemos obtener el valor de un atributo podemos obtener el resultado de un método.

Property binding

Una vez que entendimos como funciona String interpolation es tiempo de entender como funciona property binding, para esto agregaremos un botón a nuestro componente posts y lo modificaremos de acuerdo a un evento dirigido por un componente typescript, veamos el ejemplo:

posts.component.ts

import { Component } from '@angular/core'
@Component({
    selector: 'app-posts',
    templateUrl: './posts.component.html'
})
export class PostsComponent {
    author = '@raidentrance';
    title = 'Angular from scratch';
    disableNewPost = true;

    constructor() {
        setTimeout(() => { this.disableNewPost = false }, 2000);
    }

    getTitle() {
        return this.title;
    }
}

Podrás notar que al código anterior se realizaron las siguientes modificaciones:

  • Se le agregó el atributo disableNewPost inicializado en true, con esto seremos capaces de determinar que un botón que se agregará se encontrará desactivado al principio
  • Se declaró un constructor el cual se ejecutará cuando se cree el objeto de nuestra clase
  • Se utilizó la función setTimeout() la cual definirá que una vez pasados 2 segundos se cambiará el valor de la variable disableNewPost a false lo cual permitirá que el botón se active después de este periodo de tiempo

posts.components.html

<div align="center">
    <h3>Post List</h3>

    <button class="btn btn-primery" [disabled]="disableNewPost">Create</button>
    <div>
        <table  class="table">
            <tr>
                <th>Author Twitter</th>
                <th>Post name</th>
                <th>Post date</th>
            </tr>
            <tr>
                <td>{{author}}</td>
                <td>{{getTitle()}}</td>
                <td> {{'19/08/1988'}}</td>
            </tr>
        </table>
    </div>
</div>

Al principio del archivo HTML se agregó un botón nuevo con el texto Create, como ya sabrán el tag button contiene una propiedad tipo boolean llamada disabled, con esta es posible determinar si el elemento será habilitado o no. Utilizando property binding determinaremos el valor de este atributos de acuerdo al valor del atributo disableNewPost que se encuentra en nuestro archivo typescript, con esto el botón se encontrará des habilitado durante 2 segundos y pasando esto se habilitará de nuevo sin necesidad de actualizar algún elemento del DOM.

En el siguiente post se explicarán los otros modos disponibles en Angular 4 para realizar el databinding en nuestras aplicaciones. Si tienes alguna duda o comentario no dudes en hacerlo en la sección de comentarios o en nuestras redes sociales.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Crear componentes en Angular 4


Como se explicó en el primer post de Angular 4, la forma en la que se crea una aplicación es a través de componentes. En este post se explicará como crear componentes manualmente y haciendo uso de la CLI(Command line interface). Se tomará como base el post Configura Bootstrap en Angular 4.

Una vez que se tiene un proyecto construido veremos un componente llamado app.component que será utilizado para cargar el contenido al archivo index.html. A continuación veremos como crear componentes adicionales a nuestra aplicación.

Creación de componentes manualmente

A continuación crearemos un componente nuevo llamado posts siguiendo los siguientes pasos:

  • Crear un folder con el nombre del componente dentro del folder src/app/ , en este caso nuestro folder sería /src/app/posts.
  • Dentro del folder /src/app/posts crearemos los siguientes archivos:
    • posts.component.html : Contendrá el HTML que se desplegará donde se inserte el componente
    • posts.component.ts : Contendrá una clase typescript que definirá el componente.
  • Dentro del folder /src/app editaremos el archivo app.modules.ts para registrar el nuevo componente.

Paso 1 Editar el archivo app.component.html

El primer paso para este ejemplo será editar el archivo app.component.html con el siguiente código:

<div align="center">
<h1>Geeks México</h1>
<h3>Base component</h3>
<img class="img-fluid" src="https://geeksjavamexico.files.wordpress.com/2017/09/5.png?w=400" width="25%" height="25%" alt=""></div>

Con esto la aplicación se verá como se muestra en la siguiente imagen:

Captura de pantalla 2017-10-11 a las 21.13.14

Paso 2: Agregar un nuevo componente

Una vez que tenemos el componente base que es app.component, el siguiente paso es crear el componente posts. Para esto seguiremos los siguientes pasos:

1.- Crear un folder vacío en el folder /src/app/ llamado posts

2.- Crear el archivo posts.components.html dentro del folder /src/app/posts.

<div align="center">
<h3>Post List</h3>
<div>
<table  class="table">
<tr>
<th>Author Twitter</th>
<th>Post name</th>
<th>Post date</th>
</tr>
<tr>
<td>@raidentrance</td>
<td>Angular from scratch</td>
<td> 10/10/2017</td>
</tr>
</table>
</div>
</div>

3.- Crear el archivo posts.components.ts dentro del folder /src/app/posts.

import { Component } from '@angular/core'
@Component({
    selector: 'app-posts',
    templateUrl: './posts.component.html'
})
export class PostsComponent {

}

Paso 3: Incluir el nuevo componente en el archivo app.modules.ts

Una vez que ya se creó el componente el siguiente paso es registrarlo dentro del archivo app.modules.ts. Para hacerlo debemos editar el archivo como se muestra a continuación:

import { BrowserModule } from '@angular/platform-browser';
import { NgModule } from '@angular/core';

import { AppComponent } from './app.component';
import { PostsComponent } from './posts/posts.component';

@NgModule({
  declarations: [
    AppComponent,PostsComponent
  ],
  imports: [
    BrowserModule
  ],
  providers: [],
  bootstrap: [AppComponent]
})
export class AppModule { }

Las modificaciones que se harán al archivo app.modules.ts son las siguientes:

  1. Incluir la sentencia import para PostsComponent desde ./posts/posts.component
  2. Incluir PostsComponent en el objeto declarations

Paso 4: Incluir el nuevo componente en el componente principal

Para hacer esto lo único que debemos hacer es editar el archivo app.components.html Como se muestra a continuación:

<div align="center">
<h1>Geeks México</h1>
<h3>Base component</h3>
<img class="img-fluid" src="https://geeksjavamexico.files.wordpress.com/2017/09/5.png?w=400" width="25%" height="25%" alt=""></div>
<app-posts></app-posts>

Como se puede observar,la única modificación fue agregar al final el selector de nuestro componente llamado .

Una vez concluido lo anterior la página se verá como se muestra a continuación:
Captura de pantalla 2017-10-11 a las 21.42.39

Creando un componente utilizando la CLI

Crear un componente utilizando la línea de comandos es mucho más sencillo, solo debes ejecutar el siguiente comando:

ng generate component posts

Una vez hecho esto verás que todos los archivos necesarios fueron creados automáticamente y que lo único que resta es editar los archivos para tener todo listo. Si tienes alguna pregunta sobre como hacerlo tu mismo, agrégala a los comentarios.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

 

Real time processing of Tweets with apache Storm


In this post I’m going to explain in an easy way how to process information on real time by using Apache Storm.

Before to start

Before to start with the configuration of the project and the explanation about Apache Storm it is necessary to create a developer account on Twitter, for this purpose you need to sign in in the following url: https://apps.twitter.com/ and register an application, once you created the application Twitter will provide you the following information:

oauth.consumerSecret=
oauth.consumerKey=
oauth.accessToken=
oauth.accessTokenSecret=

Configuration

The application will be a Java application created with Maven, so we need to add the following dependencies:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.0.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-core</artifactId>
        <version>4.0.4</version>
    </dependency>
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.4</version>
    </dependency>
</dependencies>

The application will use an api to get information on real time named twitter4j-stream, in order to configure it we will create a file named twitter4j.properties in the folder src/main/resources with the following properties:

oauth.consumerSecret={Tu consumerSecret}
oauth.consumerKey={Tu consumerKey}
oauth.accessToken={Tu accessToken}
oauth.accessTokenSecret={Tu accessTokenSecret}

Apache Storm

Apache storm is a tool that allows you to process information in real time by using the following components:

  • Spout: The spouts are components used to read information from a source of data and emit this information to the components that are going to execute the logic. In this example the source of data will be Twitter and the information to emit will be the tweets related with Java, JavaScript,PHP and Phyton.
  • Bolts: Bolts are components that will receive the information from the spouts and execute the business logic of the application. In this example the bolt will receive all the tweets coming from the spout and will execute the following logic:
    • Receive tweets related with programming languages
    • Validate what king of language is talking
    • Write the tweet in a file depending of the programing language
  • Stream groupings: Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt’s tasks. In the example we will specify that our spout will emit the information to our bolt.
  • Topology: All the logic of a real time application will be executed in a topology (spouts+bolts), this is equivalent to aMapReduce job in Hadoop, the difference is that a MapReduce job  will finish and the topology will be running all the time.

Writing the code

The first step is to define an enum with all the supported languages that we will monitor in the social network with the file names that are going to store the tweets.

/**
 *
 */
package com.raidentrance.util;

/**
 * @author alex @raidentrance
 *
 */
public enum Languages {
    JAVA("java", "java.txt"), JAVASCRIPT("javascript", "javascript.txt"), PHP("php", "php.txt"), PYTHON("python",
            "python.txt");

    private String name;
    private String fileName;

    private Languages(String name, String fileName) {
        this.name = name;
        this.fileName = fileName;
    }
    public String getName() {
        return name;
    }
    public String getFileName() {
        return fileName;
    }
}

The next step will be create a file manager, in this example it will write the tweets in the corresponding file.

/**
 *
 */
package com.raidentrance.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class FileManager implements Serializable {

    private static final long serialVersionUID = -3987517536486344388L;

    public void writeTweet(Status tweet, String fileName) throws IOException {
        File file = new File(fileName);
        if (!file.exists()) {
            file.createNewFile();
        }

        FileWriter fileWritter = new FileWriter(file.getName(),true);
        BufferedWriter bufferWritter = new BufferedWriter(fileWritter);
        bufferWritter.write("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount() + "\n Tweet id "
                + tweet.getId() + "\n User id" + tweet.getUser().getName()
                + "\n----------------------------------------");
        bufferWritter.close();
    }
}

Once we create all our helper classes, we will start with our storm components, the first one will be the spout that will read the information from Twitter:

/**
 *
 */
package com.raidentrance.spout;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import com.raidentrance.util.Languages;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

/**
 * @author alex @raidentrance
 *
 */
public class TweetStreamSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    private LinkedBlockingQueue<Status> queue;
    private TwitterStream twitterStream;

    private static final long serialVersionUID = 4256154244602991768L;

    public void nextTuple() {
        final Status status = queue.poll();
        if (status == null) {
            Utils.sleep(50);
        } else {
            collector.emit(new Values(status));
        }
    }

    public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext context, SpoutOutputCollector collector) {
        System.out.println("Opening the bolt");
        this.collector = collector;
        this.twitterStream = new TwitterStreamFactory().getInstance();
        this.queue = new LinkedBlockingQueue<>();
        StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
                queue.offer(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {
            }

            @Override
            public void onTrackLimitationNotice(int i) {
            }

            @Override
            public void onScrubGeo(long l, long l1) {
            }

            @Override
            public void onException(Exception e) {
            }

            @Override
            public void onStallWarning(StallWarning warning) {
            }
        };
        twitterStream.addListener(listener);
        FilterQuery filterQuery = new FilterQuery();

        for (Languages language : Languages.values()) {
            filterQuery.track(language.getName());
        }

        twitterStream.filter(filterQuery);
    }

    @Override
    public void activate() {
    };

    @Override
    public void deactivate() {
        twitterStream.cleanUp();
    };

    @Override
    public void close() {
        twitterStream.shutdown();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("status"));
    }
}

The spout TweetStreamSpout uses the api twitter4j-stream  to receive as a real time stream all the tweets related with the words java, javascript, php and python and it will emit the data using a output variable named status to the bolt.

Once we have the source of data the next step will be create a bolt to execute the business logic of our application:

/**
 *
 */
package com.raidentrance.bolt;

import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.raidentrance.util.FileManager;
import com.raidentrance.util.Languages;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterAnalyzerBolt extends BaseRichBolt {

    private FileManager manager = new FileManager();
    private OutputCollector collector;

    private static final long serialVersionUID = 8465078768241865446L;

    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        final Status tweet = (Status) tuple.getValueByField("status");
        for (Languages language : Languages.values()) {
            if (tweet.getText().toLowerCase().contains(language.getName())) {
                try {
                    manager.writeTweet(tweet, language.getFileName());
                } catch (IOException e) {
                    collector.fail(tuple);
                }
            }
        }

        System.out.println("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount()
                + "\n Tweet id " + tweet.getId() + "\n User id" + tweet.getUser().getName()
                + "\n----------------------------------------");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

The bolt TwitterPrinterBolt will get all the information coming from the spout by using the variable status and will execute the logic to save the tweet in the corresponding file

The last step will be create the topology, in this component we will define the spouts and bolts, the relationship between them and the deployment mechanism.

/**
 *
 */
package com.raidentrance.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

import com.raidentrance.bolt.TwitterAnalyzerBolt;
import com.raidentrance.spout.TweetStreamSpout;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterTopology {
    public static void main(String args[]) {
            TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("twitterSpout", new TweetStreamSpout());
        builder.setBolt("twitterAnalyzerBolt", new TwitterAnalyzerBolt(), 1).shuffleGrouping("twitterSpout");

        Config conf = new Config();
        conf.setDebug(false);

        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("twitterTopology", conf, builder.createTopology());
    }
}

To create the topology we will use the class TopologyBuilder and with this we will define the spouts, bolts and relationships.
To define the deployment mechanism we will use for this example the class LocalCluster, it will start storm instance to deploy our application, this class is just for development, for production you have to follow a different process.

Execution the application

Once we wrote all the code the next step will be execute the class TwitterTopology. It will print all the tweets that is processing and generate the files per programming language wi the received tweets.

You can download the complete code in the following link https://github.com/raidentrance/storm-sample.

Sources: http://storm.apache.org

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Procesamiento en tiempo real de Tweets utilizando Apache Storm en español


En este post se explicará de forma sencilla cómo procesar información utilizando Apache Storm en tiempo real, el caso de ejemplo será el procesamiento de Tweets.

Antes de iniciar

Antes de iniciar con la configuración del proyecto y la explicación sobre Apache Storm es necesario crear una cuenta de desarrollador en Twitter para esto es necesario registrarse en la siguiente URL https://apps.twitter.com/ y registrar una aplicación, con esto se proveerán los siguientes datos:

oauth.consumerSecret=
oauth.consumerKey=
oauth.accessToken=
oauth.accessTokenSecret=

Configuración

La aplicación a desarrollar será una aplicación Java construida con Apache Maven, para esto se deben agregar las siguientes dependencias:

<dependencies>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>1.0.1</version>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
	<dependency>
		<groupId>org.twitter4j</groupId>
		<artifactId>twitter4j-core</artifactId>
		<version>4.0.4</version>
	</dependency>
	<dependency>
		<groupId>org.twitter4j</groupId>
		<artifactId>twitter4j-stream</artifactId>
		<version>4.0.4</version>
	</dependency>
</dependencies>

El siguiente punto será crear un archivo de configuración llamado twitter4j.properties en la carpeta src/main/resources con las siguientes propiedades:

oauth.consumerSecret={Tu consumerSecret}
oauth.consumerKey={Tu consumerKey}
oauth.accessToken={Tu accessToken}
oauth.accessTokenSecret={Tu accessTokenSecret}

Apache Storm

Apache storm es una herramienta que permite realizar procesamiento en tiempo real basado en los siguientes componentes:

  • Spouts: Los spouts son componentes utilizados para leer información de una fuente de datos y emitirla a los componentes que ejecutarán la lógica de la aplicación. En el caso de ejemplo la fuente de datos será Twitter y la información a emitir serán los Tweets relacionados con Java, JavaScript,PHP y Python.
  • Bolts: Los bolts son componentes que recibirán la información emitida por los spouts y ejecutarán el procesamiento, es decir, la lógica de negocio en la aplicación.En el caso de ejemplo el bolt que se escribirá recibirá los tweets que emita el spout y ejecutará la siguiente lógica:
    • Recibirá tweets relacionados con los lenguajes de programación
    • Verificará el lenguaje del que se habla en el tweet y lo escribirá en el archivo que le corresponde
  • Shuffle Grouping: Define cómo va a fluir la información, es decir, que spout emitirá información a que bolt.
  • Topology: La lógica de una aplicación en tiempo real será definida en una topología (spouts + bolts), es equivalente a un MapReduce job en Hadoop, la diferencia es que un MapReduce job termina en algún momento y la topología se ejecuta todo el tiempo.

Escribiendo código

El primer paso a seguir para realizar este procesamiento es definir una enumeración con los lenguajes de programación que se desean monitorear en la red social así como los archivos que se utilizarán para cada uno de ellos.

/**
 *
 */
package com.raidentrance.util;

/**
 * @author alex @raidentrance
 *
 */
public enum Languages {
	JAVA("java", "java.txt"), JAVASCRIPT("javascript", "javascript.txt"), PHP("php", "php.txt"), PYTHON("python",
			"python.txt");

	private String name;
	private String fileName;

	private Languages(String name, String fileName) {
		this.name = name;
		this.fileName = fileName;
	}
	public String getName() {
		return name;
	}
	public String getFileName() {
		return fileName;
	}
}

El siguiente paso es crear un administrador de archivos que para este ejemplo será el que escribirá los tweets en el archivo correspondiente.

/**
 *
 */
package com.raidentrance.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class FileManager implements Serializable {

	private static final long serialVersionUID = -3987517536486344388L;

	public void writeTweet(Status tweet, String fileName) throws IOException {
		File file = new File(fileName);
		if (!file.exists()) {
			file.createNewFile();
		}

		FileWriter fileWritter = new FileWriter(file.getName(),true);
        BufferedWriter bufferWritter = new BufferedWriter(fileWritter);
        bufferWritter.write("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount() + "\n Tweet id "
				+ tweet.getId() + "\n User id" + tweet.getUser().getName()
				+ "\n----------------------------------------");
        bufferWritter.close();
	}
}

El siguiente paso para escribir la topología será escribir el spout que leerá la información de Twitter y la emitirá a los bolts que ejecutarán la lógica.

/**
 *
 */
package com.raidentrance.spout;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import com.raidentrance.util.Languages;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

/**
 * @author alex @raidentrance
 *
 */
public class TweetStreamSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private LinkedBlockingQueue<Status> queue;
	private TwitterStream twitterStream;

	private static final long serialVersionUID = 4256154244602991768L;

	public void nextTuple() {
		final Status status = queue.poll();
		if (status == null) {
			Utils.sleep(50);
		} else {
			collector.emit(new Values(status));
		}
	}

	public void open(@SuppressWarnings("rawtypes") Map map, TopologyContext context, SpoutOutputCollector collector) {
		System.out.println("Opening the bolt");
		this.collector = collector;
		this.twitterStream = new TwitterStreamFactory().getInstance();
		this.queue = new LinkedBlockingQueue<>();
		StatusListener listener = new StatusListener() {
			@Override
			public void onStatus(Status status) {
				queue.offer(status);
			}

			@Override
			public void onDeletionNotice(StatusDeletionNotice sdn) {
			}

			@Override
			public void onTrackLimitationNotice(int i) {
			}

			@Override
			public void onScrubGeo(long l, long l1) {
			}

			@Override
			public void onException(Exception e) {
			}

			@Override
			public void onStallWarning(StallWarning warning) {
			}
		};
		twitterStream.addListener(listener);
		FilterQuery filterQuery = new FilterQuery();

		for (Languages language : Languages.values()) {
			filterQuery.track(language.getName());
		}

		twitterStream.filter(filterQuery);
	}

	@Override
	public void activate() {
	};

	@Override
	public void deactivate() {
		twitterStream.cleanUp();
	};

	@Override
	public void close() {
		twitterStream.shutdown();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("status"));
	}
}

El spout TweetStreamSpout utiliza el api twitter4j-stream para recibir como un flujo en tiempo real  los tweets relacionados con las palabras java, javascript, php y python y emitirá dichas publicaciones utilizando como variable de salida la palabra “status” al bolt que se defina.

Una vez que se cuenta con la fuente de información el siguiente paso es crear el bolt que ejecutará la lógica a ejecutar:

/**
 *
 */
package com.raidentrance.bolt;

import java.io.IOException;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import com.raidentrance.util.FileManager;
import com.raidentrance.util.Languages;

import twitter4j.Status;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterAnalyzerBolt extends BaseRichBolt {

	private FileManager manager = new FileManager();
	private OutputCollector collector;

	private static final long serialVersionUID = 8465078768241865446L;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		final Status tweet = (Status) tuple.getValueByField("status");
		for (Languages language : Languages.values()) {
			if (tweet.getText().toLowerCase().contains(language.getName())) {
				try {
					manager.writeTweet(tweet, language.getFileName());
				} catch (IOException e) {
					collector.fail(tuple);
				}
			}
		}

		System.out.println("\n" + tweet.getText() + "\n Retweet count : " + tweet.getUser().getFollowersCount()
				+ "\n Tweet id " + tweet.getId() + "\n User id" + tweet.getUser().getName()
				+ "\n----------------------------------------");
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
}

El bolt TwitterPrinterBolt obtendrá la información emitida por el spout utilizando la variable status y ejecuta la lógica para guardar dicho tweet en el archivo correspondiente.

El último paso será crear la topología, en esta se definirán los spouts, bolts, la relación entre ellos y el modo de despliegue.

/**
 *
 */
package com.raidentrance.topologies;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

import com.raidentrance.bolt.TwitterAnalyzerBolt;
import com.raidentrance.spout.TweetStreamSpout;

/**
 * @author alex @raidentrance
 *
 */
public class TwitterTopology {
	public static void main(String args[]) {
	        TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("twitterSpout", new TweetStreamSpout());
		builder.setBolt("twitterAnalyzerBolt", new TwitterAnalyzerBolt(), 1).shuffleGrouping("twitterSpout");

		Config conf = new Config();
		conf.setDebug(false);

		final LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("twitterTopology", conf, builder.createTopology());
	}
}

Para crear la topología se utilizará la clase TopologyBuilder y con esta se definirán los spouts, shuffleGrouping y bolts así como el modo de despliegue, como se puede observar se esta utilizando la clase LocalCluster, la cual iniciará el servicio de storm así como todas sus dependencias en un entorno de desarrollo. Si se deseara desplegar en un cluster de producción se debe seguir un procedimiento distinto

Ejecutando la aplicación

Una vez que se escribió todo el código se ejecutará la clase TwitterTopology. Esta imprimirá los tweets que se están procesando y generará los archivos por lenguaje con los tweets recibidos.

Puedes encontrar el código completo en el siguiente link: https://github.com/raidentrance/storm-sample

Fuentes : http://storm.apache.org

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com