Kafka streams: Primera aplicación con Kafka streams


El primer ejemplo que escribiremos será una topología que leerá mensajes de un topic de Kafka y contará el número de palabras en el mismo emitiendo el resultado a un topic de salida, para hacerlo se realizarán las siguientes operaciones:

  • Lee un mensaje de kafka
  • Transforma el mensaje en minúsculas
  • Separa el mensaje por espacios en un arreglo de Strings
  • Selecciona la palabra como una llave
  • Agrupa los mensajes por el nombre de la llave
  • Emite el resultado a un topic de salida

Para hacer lo anterior seguiremos los siguientes pasos:

1 Iniciar el cluster de Zookeeper y Kafka

El primer paso será iniciar nuestro cluster de Kafka con los siguientes comandos:

Zookeeper

sh zookeeper-server-start.sh ../config/zookeeper.properties

Kafka

sh kafka-server-start.sh ../config/server.properties

Con lo anterior tendremos un cluster de Kafka y Zookeeper corriendo en nuestra máquina.

2 Crear los topics de Kafka a utilizar

En este ejemplo utilizaremos 2 topics, un topic de entrada y uno de salida, así que el siguiente paso es crearlos, para hacerlo ejecutaremos los siguientes comandos:

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic input-topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic output-topic

Con lo anterior contaremos con dos topics input-topic y output-topic cada uno con 1 replica y 2 particiones. Si deseas confirmar que los topics se crearon correctamente, puedes ejecutar el siguiente comando para listar los topics:

sh kafka-topics.sh --list --zookeeper localhost:2181

Esto te mostrará los topics que existen en tu cluster.

Paso 3 Configuración del proyecto

Una vez que ya iniciamos nuestro cluster y creamos nuestros topics, el siguiente paso será crear nuestro proyecto de Java que leerá de un topic, hará modificaciones a la información y colocará el resultado en un topic de salida.

Para esto agregaremos las siguientes dependencias a nuestro proyecto:

pom.xml

Paso 4 Programando nuestra topología

Una vez que ya tenemos configurado nuestro proyecto, el siguiente paso será programar nuestra topología, veamos el siguiente código:


import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class StreammingApplication {

	public static Properties getKafkaConfiguration() {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		return props;
	}

	public static void main(String[] args) {
		KStreamBuilder builder = new KStreamBuilder();

		KStream wordCountsInput = builder.stream("input-topic");
		KTable counts = wordCountsInput.mapValues(String::toLowerCase)
				.flatMapValues(value -> Arrays.asList(value.split(" "))).selectKey((ignoredKey, word) -> word)
				.groupByKey().count();

		counts.to(Serdes.String(), Serdes.Long(), "output-topic");

		KafkaStreams streams = new KafkaStreams(builder, getKafkaConfiguration());
		streams.start();
		Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
	}
}

En el código anterior podemos ver lo siguiente:

  • Hay un método que devuelve la configuración de Kafka a utilizar
  • Creamos un objeto de tipo KStreamBuilder que nos permitirá definir nuestro topología de Kafka streams.
  • KStreamBuilder.stream() devuelve un flujo de entrada, el cual es básicamente un consumer, a una referencia de tipo KStream<String, String> contiene 2 genéricos porque uno será para la llave del mensaje y el otro para el valor.
  • KStream nos permitirá realizar las siguientes operaciones:
    • mapValues(..) : Lo utilizaremos para transformar nuestro mensaje a minúsculas.
    • flatMapValues(..): Lo utilizaremos para separar nuestro mensaje en multiples mensajes, uno por cada palabra.

    • selectKey(..) : Nos permitirá seleccionar una llave, lo utilizaremos para utilizar la palabra como llave de nuestro mensaje.

    • groupByKey() : Nos permitirá agrupar por una llave, en este caso será la palabra.

    • count(): Nos permitirá el número de mensajes.
  • Lo anterior nos devolverá un objeto de tipo KTable<String,Long> como respuesta en el primer genérico tendremos las palabras y en el segundo el conteo.
  • Utilizaremos counts.to(Serdes.String(), Serdes.Long(), “output-topic”); para emitir la respuesta a otro topic llamado “output-topic“, utilizamos Serdes.String(), Serdes.Long() para definir el topi de dato de la llave y el valor del mensaje de salida.

Paso 5 probando nuestra aplicación

Una vez que ya entendimos el código el siguiente paso será probarlo, para esto abriremos 2 consumers y un producer del siguiente modo:

  • Consumer 1 : Leerá los mensajes que se envíen al topic “input-topic“.
  • Consumer 2: Leerá los mensajes que se envíen al topic “output-topic
  • Producer 1: Enviará mensajes al topic “input-topic

Para hacerlo utilizaremos los siguientes comandos en diferentes terminales:

Consumer1

 sh kafka-console-consumer.sh --zookeeper localhost:2181  --topic input-topic

Consumer2

 sh kafka-console-consumer.sh --zookeeper localhost:2181  --topic output-topic --formatter kafka.tools.DefaultMessageFormatter --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

Producer1

sh kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic

Como podemos ver el consumer 2 es un poco más complejo que el uno ya que especificaremos serializers y deserializers diferentes a los que están por defecto, más adelante hablaremos sobre esto.

Una vez que ya contamos con nuestros producers y consumers listos tendremos que ejecutar nuestra aplicación como cualquier otra aplicación java, una vez hecho esto el último paso será escribir mensajes y presionar enter en la consola donde tenemos el producer, en nuestro caso escribiremos el mensaje:

devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré

Una vez hecho esto, veremos en el Consumer1 lo siguiente:

devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré

Este es el mensaje tal cuál que enviamos, lo interesante viene cuando vemos el Consumer2:

devs4j	1
es	3
sitio	1
web	1
y	2
seguiré	1
el	1
mejor	1
de	1
programación	1
lo	1
buscaré	1
en	1
facebook	1
twitter	1
los	1

Como vemos la frase se separó en diferentes mensajes cada uno con su propio número de ocurrencias.

Como vemos Kafka streams es muy fácil de utilizar y es posible realizar transformación a grandes flujos de información en tiempo real.

Si quieres ver el código completo puedes verlo en el siguiente enlace https://github.com/raidentrance/kafka-streams-example.

En este ejemplo vimos algo básico de Kafka streams en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Apache Kafka: Producers


El punto más importante al utilizar Apache Kafka es el envío y consumo de mensajes, en este post explicaremos como funcionan los producers y consumers, si tienes dudas sobre como instalar tu cluster de kafka te recomendamos el post Primeros pasos con Apache Kafka en Español !.

Producers

Un producer es un componente que escribe mensajes en un topic, para poder hacerlo requiere de la siguiente información:

  • Uno o muchos brokers de kafka
  • El nombre de un topic al cuál va a escribir
  • El mensaje que va a depositar en el topic

Acknowledges

La forma en la que podemos asegurar que un mensaje se entrego de forma exitosa es a través del uso de acknowledges, en los producers de kafka contamos con los siguientes:

  • acks=0 : Define que no es necesario recibir un ack por cada mensaje, esto mejora el performance de forma importante pero es posible perder mensajes.
  • acks=1 : Define que es necesario un ack del broker líder que recibe el mensaje, esto asegura que el líder recibió el mensaje pero no asegura que todas las replicas lo tienen.
  • acks=all : Define que es necesario un ack del broker líder y todas las replicas, esto asegura que tanto el líder como todas las replicas recibieron el mensaje.

Podemos concluir con lo anterior que entre menor sea el número de acks tendremos un mejor performance pero más probabilidades de perder datos y entre más acks tendremos un menor performance pero sin perdida de datos. Debemos considerar esto a la hora de diseñar nuestra aplicación.

Messages keys

Cuando utilizamos un producer podemos enviar cualquier información que necesitemos, pero recordemos que al contar con muchas particiones nosotros no podemos definir a que partición se mandará el mensaje, pero esto trae la pregunta, ¿Cómo puedo asegurar el orden de mis mensajes? La respuesta es Haciendo uso de message keys.

Adicional al mensaje es posible indicar un message key, con esto, todos los mensajes que cuenten con la misma message key se entregarán en la misma partición, si recordamos kafka asegura el orden de los mensajes que se encuentren dentro de la misma partición.

Programando nuestro producer

Una vez que entendemos la teoría el siguiente paso es programarlo, para esto programaremos nuestro kafka producer, para esto sigamos los siguientes pasos:

Paso 1 Configurando el proyecto

El primer paso será configurar nuestro proyecto, para esto agregaremos las siguientes entradas a nuestro archivo pom.xml:


	
		org.apache.kafka
		kafka-clients
		1.0.1
	


	
		
			maven-compiler-plugin
			3.2
			
				1.8
				1.8
			
		
	

Con lo anterior definiremos la dependencia a utilizar para crear un producer y el plugin para definir la versión 1.8 de java.

Paso 2 Definiendo de errores

El siguiente paso será crear algunas excepciones a utilizar en nuestro producer:


/**
 * @author raidentrance
 *
 */
public enum KafkaErrors {
	PRODUCER_NOT_CONNECTED("Not alive connection available");

	private String message;

	private KafkaErrors(String message) {
		this.message = message;
	}

	public String getMessage() {
		return message;
	}
}

La enumeración anterior será utilizada para definir los mensajes de error que se generen en nuestro producer.

/**
 * @author raidentrance
 *
 */
public class KafkaProducerException extends Exception {

	private static final long serialVersionUID = -2134618105767008561L;

	public KafkaProducerException(String message) {
		super(message);
	}
}

La clase anterior define la excepción que se propagará en caso de que una excepción con Kafka suceda.

Paso 3 Creando el kafka producer

Una ve que contamos con las clases necesarias crearemos un pequeño controller para enviar los mensajes a kafka.


import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.raidentrance.kafka.error.KafkaErrors;
import com.raidentrance.kafka.error.KafkaProducerException;

/**
 * @author raidentrance
 *
 */
public class KafkaMessageProducer {
	private Properties properties;
	private Producer producer;

	public KafkaMessageProducer(Properties properties) {
		this.properties = properties;
	}

	public void connect() {
		producer = new KafkaProducer(properties);
	}

	public void sendMessage(String topic, String key, String value) throws KafkaProducerException {
		if (producer != null) {
			producer.send(new ProducerRecord(topic, key, value));
		} else {
			throw new KafkaProducerException(KafkaErrors.PRODUCER_NOT_CONNECTED.getMessage());
		}
	}

	public void sendMessage(String topic, String value) throws KafkaProducerException {
		if (producer != null) {
			producer.send(new ProducerRecord(topic, value));
		} else {
			throw new KafkaProducerException(KafkaErrors.PRODUCER_NOT_CONNECTED.getMessage());
		}
	}

	public void disconnect() {
		producer.close();
	}
}

El controller anterior define los siguientes métodos:

  • El método connect será utilizado para crear nuestro producer
  • El método

    void sendMessage(String topic, String key, String value) :  Se utilizará para enviar mensajes al topic especificado indicando una llave.

  • El método

    void sendMessage(String topic, String value) : Se utilizará para enviar mensajes al topic especificado sin indicar una llave.

Paso 4 Probando todo junto

Una vez que tenemos todo listo el siguiente paso será enviar mensajes, para esto crearemos el siguiente método main:

public static void main(String[] args) throws KafkaProducerException {
	Properties props = new Properties();
	props.setProperty("bootstrap.servers", "kafka_host:9092");
	props.setProperty("key.serializer", StringSerializer.class.getName());
	props.setProperty("value.serializer", StringSerializer.class.getName());
	props.setProperty("acks", "1");
	props.setProperty("retries", "3");
	KafkaMessageProducer producer = new KafkaMessageProducer(props);
	producer.connect();
	producer.sendMessage("geeks-mexico", "geek-key", "raidentrance");
	producer.disconnect();
}

El código anterior muestra las propiedades necesarias para enviar y recibir mensajes a kafka y cómo mandar mensajes al topic geeks-mexico utilizando la llave geek-key el mensaje raidentrance.

Si te gusta el contenido y quieres enterarte cuando realicemos un post nuevo síguenos en nuestras redes sociales https://twitter.com/geeks_mx y https://www.facebook.com/geeksJavaMexico/.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com