El primer ejemplo que escribiremos será una topología que leerá mensajes de un topic de Kafka y contará el número de palabras en el mismo emitiendo el resultado a un topic de salida, para hacerlo se realizarán las siguientes operaciones:
- Lee un mensaje de kafka
- Transforma el mensaje en minúsculas
- Separa el mensaje por espacios en un arreglo de Strings
- Selecciona la palabra como una llave
- Agrupa los mensajes por el nombre de la llave
- Emite el resultado a un topic de salida
Para hacer lo anterior seguiremos los siguientes pasos:
1 Iniciar el cluster de Zookeeper y Kafka
El primer paso será iniciar nuestro cluster de Kafka con los siguientes comandos:
Zookeeper
sh zookeeper-server-start.sh ../config/zookeeper.properties
Kafka
sh kafka-server-start.sh ../config/server.properties
Con lo anterior tendremos un cluster de Kafka y Zookeeper corriendo en nuestra máquina.
2 Crear los topics de Kafka a utilizar
En este ejemplo utilizaremos 2 topics, un topic de entrada y uno de salida, así que el siguiente paso es crearlos, para hacerlo ejecutaremos los siguientes comandos:
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic input-topic sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic output-topic
Con lo anterior contaremos con dos topics input-topic y output-topic cada uno con 1 replica y 2 particiones. Si deseas confirmar que los topics se crearon correctamente, puedes ejecutar el siguiente comando para listar los topics:
sh kafka-topics.sh --list --zookeeper localhost:2181
Esto te mostrará los topics que existen en tu cluster.
Paso 3 Configuración del proyecto
Una vez que ya iniciamos nuestro cluster y creamos nuestros topics, el siguiente paso será crear nuestro proyecto de Java que leerá de un topic, hará modificaciones a la información y colocará el resultado en un topic de salida.
Para esto agregaremos las siguientes dependencias a nuestro proyecto:
Paso 4 Programando nuestra topología
Una vez que ya tenemos configurado nuestro proyecto, el siguiente paso será programar nuestra topología, veamos el siguiente código:
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; public class StreammingApplication { public static Properties getKafkaConfiguration() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } public static void main(String[] args) { KStreamBuilder builder = new KStreamBuilder(); KStream wordCountsInput = builder.stream("input-topic"); KTable counts = wordCountsInput.mapValues(String::toLowerCase) .flatMapValues(value -> Arrays.asList(value.split(" "))).selectKey((ignoredKey, word) -> word) .groupByKey().count(); counts.to(Serdes.String(), Serdes.Long(), "output-topic"); KafkaStreams streams = new KafkaStreams(builder, getKafkaConfiguration()); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
En el código anterior podemos ver lo siguiente:
- Hay un método que devuelve la configuración de Kafka a utilizar
- Creamos un objeto de tipo KStreamBuilder que nos permitirá definir nuestro topología de Kafka streams.
- KStreamBuilder.stream() devuelve un flujo de entrada, el cual es básicamente un consumer, a una referencia de tipo KStream<String, String> contiene 2 genéricos porque uno será para la llave del mensaje y el otro para el valor.
- KStream nos permitirá realizar las siguientes operaciones:
- mapValues(..) : Lo utilizaremos para transformar nuestro mensaje a minúsculas.
-
flatMapValues(..): Lo utilizaremos para separar nuestro mensaje en multiples mensajes, uno por cada palabra.
-
selectKey(..) : Nos permitirá seleccionar una llave, lo utilizaremos para utilizar la palabra como llave de nuestro mensaje.
-
groupByKey() : Nos permitirá agrupar por una llave, en este caso será la palabra.
- count(): Nos permitirá el número de mensajes.
- Lo anterior nos devolverá un objeto de tipo KTable<String,Long> como respuesta en el primer genérico tendremos las palabras y en el segundo el conteo.
- Utilizaremos counts.to(Serdes.String(), Serdes.Long(), «output-topic»); para emitir la respuesta a otro topic llamado «output-topic«, utilizamos Serdes.String(), Serdes.Long() para definir el topi de dato de la llave y el valor del mensaje de salida.
Paso 5 probando nuestra aplicación
Una vez que ya entendimos el código el siguiente paso será probarlo, para esto abriremos 2 consumers y un producer del siguiente modo:
- Consumer 1 : Leerá los mensajes que se envíen al topic «input-topic«.
- Consumer 2: Leerá los mensajes que se envíen al topic «output-topic«
- Producer 1: Enviará mensajes al topic «input-topic«
Para hacerlo utilizaremos los siguientes comandos en diferentes terminales:
Consumer1
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic input-topic
Consumer2
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic output-topic --formatter kafka.tools.DefaultMessageFormatter --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true
Producer1
sh kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic
Como podemos ver el consumer 2 es un poco más complejo que el uno ya que especificaremos serializers y deserializers diferentes a los que están por defecto, más adelante hablaremos sobre esto.
Una vez que ya contamos con nuestros producers y consumers listos tendremos que ejecutar nuestra aplicación como cualquier otra aplicación java, una vez hecho esto el último paso será escribir mensajes y presionar enter en la consola donde tenemos el producer, en nuestro caso escribiremos el mensaje:
«devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré«
Una vez hecho esto, veremos en el Consumer1 lo siguiente:
«devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré«
Este es el mensaje tal cuál que enviamos, lo interesante viene cuando vemos el Consumer2:
devs4j 1 es 3 sitio 1 web 1 y 2 seguiré 1 el 1 mejor 1 de 1 programación 1 lo 1 buscaré 1 en 1 facebook 1 twitter 1 los 1
Como vemos la frase se separó en diferentes mensajes cada uno con su propio número de ocurrencias.
Como vemos Kafka streams es muy fácil de utilizar y es posible realizar transformación a grandes flujos de información en tiempo real.
Si quieres ver el código completo puedes verlo en el siguiente enlace https://github.com/raidentrance/kafka-streams-example.
En este ejemplo vimos algo básico de Kafka streams en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.