El primer ejemplo que escribiremos será una topología que leerá mensajes de un topic de Kafka y contará el número de palabras en el mismo emitiendo el resultado a un topic de salida, para hacerlo se realizarán las siguientes operaciones:

  • Lee un mensaje de kafka
  • Transforma el mensaje en minúsculas
  • Separa el mensaje por espacios en un arreglo de Strings
  • Selecciona la palabra como una llave
  • Agrupa los mensajes por el nombre de la llave
  • Emite el resultado a un topic de salida

Para hacer lo anterior seguiremos los siguientes pasos:

1 Iniciar el cluster de Zookeeper y Kafka

El primer paso será iniciar nuestro cluster de Kafka con los siguientes comandos:

Zookeeper

sh zookeeper-server-start.sh ../config/zookeeper.properties

Kafka

sh kafka-server-start.sh ../config/server.properties

Con lo anterior tendremos un cluster de Kafka y Zookeeper corriendo en nuestra máquina.

2 Crear los topics de Kafka a utilizar

En este ejemplo utilizaremos 2 topics, un topic de entrada y uno de salida, así que el siguiente paso es crearlos, para hacerlo ejecutaremos los siguientes comandos:

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic input-topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic output-topic

Con lo anterior contaremos con dos topics input-topic y output-topic cada uno con 1 replica y 2 particiones. Si deseas confirmar que los topics se crearon correctamente, puedes ejecutar el siguiente comando para listar los topics:

sh kafka-topics.sh --list --zookeeper localhost:2181

Esto te mostrará los topics que existen en tu cluster.

Paso 3 Configuración del proyecto

Una vez que ya iniciamos nuestro cluster y creamos nuestros topics, el siguiente paso será crear nuestro proyecto de Java que leerá de un topic, hará modificaciones a la información y colocará el resultado en un topic de salida.

Para esto agregaremos las siguientes dependencias a nuestro proyecto:

pom.xml

Paso 4 Programando nuestra topología

Una vez que ya tenemos configurado nuestro proyecto, el siguiente paso será programar nuestra topología, veamos el siguiente código:


import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class StreammingApplication {

	public static Properties getKafkaConfiguration() {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		return props;
	}

	public static void main(String[] args) {
		KStreamBuilder builder = new KStreamBuilder();

		KStream wordCountsInput = builder.stream("input-topic");
		KTable counts = wordCountsInput.mapValues(String::toLowerCase)
				.flatMapValues(value -> Arrays.asList(value.split(" "))).selectKey((ignoredKey, word) -> word)
				.groupByKey().count();

		counts.to(Serdes.String(), Serdes.Long(), "output-topic");

		KafkaStreams streams = new KafkaStreams(builder, getKafkaConfiguration());
		streams.start();
		Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
	}
}

En el código anterior podemos ver lo siguiente:

  • Hay un método que devuelve la configuración de Kafka a utilizar
  • Creamos un objeto de tipo KStreamBuilder que nos permitirá definir nuestro topología de Kafka streams.
  • KStreamBuilder.stream() devuelve un flujo de entrada, el cual es básicamente un consumer, a una referencia de tipo KStream<String, String> contiene 2 genéricos porque uno será para la llave del mensaje y el otro para el valor.
  • KStream nos permitirá realizar las siguientes operaciones:
    • mapValues(..) : Lo utilizaremos para transformar nuestro mensaje a minúsculas.
    • flatMapValues(..): Lo utilizaremos para separar nuestro mensaje en multiples mensajes, uno por cada palabra.

    • selectKey(..) : Nos permitirá seleccionar una llave, lo utilizaremos para utilizar la palabra como llave de nuestro mensaje.

    • groupByKey() : Nos permitirá agrupar por una llave, en este caso será la palabra.

    • count(): Nos permitirá el número de mensajes.
  • Lo anterior nos devolverá un objeto de tipo KTable<String,Long> como respuesta en el primer genérico tendremos las palabras y en el segundo el conteo.
  • Utilizaremos counts.to(Serdes.String(), Serdes.Long(), «output-topic»); para emitir la respuesta a otro topic llamado «output-topic«, utilizamos Serdes.String(), Serdes.Long() para definir el topi de dato de la llave y el valor del mensaje de salida.

Paso 5 probando nuestra aplicación

Una vez que ya entendimos el código el siguiente paso será probarlo, para esto abriremos 2 consumers y un producer del siguiente modo:

  • Consumer 1 : Leerá los mensajes que se envíen al topic «input-topic«.
  • Consumer 2: Leerá los mensajes que se envíen al topic «output-topic«
  • Producer 1: Enviará mensajes al topic «input-topic«

Para hacerlo utilizaremos los siguientes comandos en diferentes terminales:

Consumer1

 sh kafka-console-consumer.sh --zookeeper localhost:2181  --topic input-topic

Consumer2

 sh kafka-console-consumer.sh --zookeeper localhost:2181  --topic output-topic --formatter kafka.tools.DefaultMessageFormatter --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

Producer1

sh kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic

Como podemos ver el consumer 2 es un poco más complejo que el uno ya que especificaremos serializers y deserializers diferentes a los que están por defecto, más adelante hablaremos sobre esto.

Una vez que ya contamos con nuestros producers y consumers listos tendremos que ejecutar nuestra aplicación como cualquier otra aplicación java, una vez hecho esto el último paso será escribir mensajes y presionar enter en la consola donde tenemos el producer, en nuestro caso escribiremos el mensaje:

«devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré«

Una vez hecho esto, veremos en el Consumer1 lo siguiente:

«devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré«

Este es el mensaje tal cuál que enviamos, lo interesante viene cuando vemos el Consumer2:

devs4j	1
es	3
sitio	1
web	1
y	2
seguiré	1
el	1
mejor	1
de	1
programación	1
lo	1
buscaré	1
en	1
facebook	1
twitter	1
los	1

Como vemos la frase se separó en diferentes mensajes cada uno con su propio número de ocurrencias.

Como vemos Kafka streams es muy fácil de utilizar y es posible realizar transformación a grandes flujos de información en tiempo real.

Si quieres ver el código completo puedes verlo en el siguiente enlace https://github.com/raidentrance/kafka-streams-example.

En este ejemplo vimos algo básico de Kafka streams en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.